setName('xQueue:OrderQuery')->setDescription('[ 订单列表 ] 订单主动查单'); } /** * @param Input $input * @param Output $output * @throws \think\admin\Exception */ protected function execute(Input $input, Output $output) { ini_set('memory_limit', '1024M'); $channelService = ChannelService::instance(); $channelList = $channelService->db()->where(['query_status'=>1,'status'=>1])->select()->toArray(); if(!$channelList) $this->setQueueSuccess("未找到支持查询的渠道"); $channelList_num = count($channelList); $count_sum = 0; $total_sum=0; $redisStatus = sysconf('settingRedisQueryOrder'); foreach($channelList as $channel) { $class = OrderService::instance(); $where['cid'] = $channel['id']; $where['status'] = 4; if($redisStatus == 1){ #多线程此处只传入队列 $orderList = $class->db()->field('cid,order_id')->where($where)->order('id asc')->select()->toArray(); foreach ($orderList as $vo) { $this->queue('query', $vo); } // $this->queue('query', $orderList); }else{ list($count, $total) = [0, $class->db()->where($where)->order('id asc')->count()]; $total_sum += $total; // $data = $class->db()->where($where)->limit(100)->select()->toArray(); $class->db()->where($where)->order('id asc')->chunk(1000, function (Collection $data) use (&$count, $total,$channel,$channelService,&$count_sum) { $array = $data->toArray(); foreach ($array as $k => $vo) { if(isset($vo['num']) && $vo['num']>0) { $vo['order_id'] = $vo['order_id'].'_'.$vo['num']; } $result = $channelService->call('query', $vo['cid'], $vo); if ($result['status'] != 4) { $count_sum++; $count++; $notifyClass = new Notify($this->app); $notifyClass->queue_query($vo['order_id'], $result); $this->setQueueProgress("查询 {$vo['order_id']} 新的状态进行主动更新", $count / $total * 100); } } }); } } $this->setQueueSuccess("共处理 {$channelList_num} 个渠道,共有 {$total_sum} 个订单处理查询中, 完成{$count_sum} 个订单查询新状态更新!"); } # 队列 protected function queue($key, $value = false) { $redis = RedisService::getInstance(); if (!$value) { return $redis->pop($key); } else { return $redis->push($key, $value); } } }