submit(); } } # 对submit提交的数据进行处理 public function submit() { try { $settingUpdate = sysconf('settingUpdate'); if ($settingUpdate == 1) { return true; } # 进程模式 1多进程 2单进程 $queue = sysconf('queue'); if ($queue == 1) { # 获取当前执行的进程数量 $num = $this->getNum(); if ($num >= 1000) { # 等会儿再执行 sleep(30); return true; } $popen = true; } else { $popen = false; } $this->redis(); $data = $this->redis->pop('submit'); $this->run($data, $popen); return true; } catch (\Exception $e) { $this->redis = false; return true; } } protected function redis() { if (!$this->redis) { $this->redis = RedisService::getInstance(); } } protected function run($data, $popen = false) { if (isset($data['merchant']) && $data['merchant'] && isset($data['product']) && $data['product'] && isset($data['param']) && $data['param'] && isset($data['order']) && $data['order']) { if ($popen) { $this->popen($data['order']); } else { $this->channel($data['merchant'], $data['product'], false, $data['param'], $data['order']); } } } # 将数据推到子进程处理 public function popen($order) { #默认 $command = 'php /www/wwwroot/reapi/public/index.php task/task/runOne ' . $order . ' 1>/dev/null 2>&1 &'; exec($command); } # 获取当前执行的子进程数量 public function getNum() { $command = 'ps -ef | grep task/task/runOne | grep -v grep | wc -l'; $num = exec($command); return $num; } # 对一条数据进行处理 public function runOne() { if (isset($_SERVER["argv"][2]) && $_SERVER["argv"][2]) { $order = $_SERVER["argv"][2]; } else { $order = input('order'); } if (!$order) { exit('error'); } $nq = false; if (strstr($order, '_nq')) { $order = str_replace('_nq', '', $order); $nq = true; } $orderService = OrderService::instance(); $info = $orderService->get($order); if ($info && $info['status'] <= 1 && $info['request']) { $info['request'] = json_decode($info['request'], true); if ($nq) { $info['request']['param']['s'] = 1; } $this->run($info['request']); } exit('ok'); } # 检测当前订单表里的status=1的订单,如果超过时间5分钟还有status=1,就直接执行,可以每5分钟执行一次,取代上述的runAll方法 public function runOther() { $orderService = OrderService::instance(); $data = $orderService->getOldData(); foreach ($data as $k => $v) { if ($v['request'] && $v['status'] == 1) { $v['request'] = json_decode($v['request'], true); $this->run($v['request'], true); } } } # 对延迟下单的订单下单 status = -1 可以加入到计划任务中,每分钟执行一次 public function runTempOrder() { $orderService = OrderService::instance(); $result = $orderService->getTempOrder(); if ($result) { $data = $result; $time = time(); $account = array(); foreach ($data as $k => $v) { if ($v['request']) { $v['request'] = json_decode($v['request'], true); if (isset($v['request']['time']) && $time > $v['request']['time']) { if ($v['account']) { if (isset($account[$v['account']]) && $account[$v['account']]) { continue; } else { $account[$v['account']] = $v['account']; } } $this->run($v['request'], true); } } } } } # 对超过数量的订单延迟下单 status = -3 可以加入到计划任务中,每分钟执行一次 public function runTimeOrder() { # 获取设置了延迟处理的渠道 $where['status'] = 1; $where['is_deleted'] = 0; $data = \app\channel\service\ChannelService::instance()->db()->where($where)->whereRaw('order_limit > 0')->select()->toArray(); if ($data) { $orderService = OrderService::instance(); foreach ($data as $k1 => $v1) { $result = $orderService->getTimeOrder($v1['id'], false, false, $v1['order_limit']); if ($result) { foreach ($result as $k => $v) { if ($v['request']) { $v['request'] = json_decode($v['request'], true); if ($v['request']) { $v['request']['order'] .= '_nq'; $this->run($v['request'], true); } } } } } } } # 对暂停的的订单重新下单 status = -5 public function runStopOrderQueue() { $orderService = OrderService::instance(); $result = $orderService->getStopOrderQueue(); if ($result) { $data = $result; $time = time(); $account = array(); $service = \app\channel\service\ChannelService::instance(); foreach ($data as $k => $v) { if ($v['param']) { $v['param'] = json_decode($v['param'], true); if ($v['param']) { $param = $v['param']; $order_id = $v['order_id'] . '_' . $v['num']; $service->use($v['mid'], $v['pid'], $param, true, $order_id); } } } } } # 对暂停的的订单重新下单 status = -5 public function runStopOrder() { $orderService = OrderService::instance(); $result = $orderService->getStopOrder(); if ($result) { $data = $result; foreach ($data as $k => $v) { $id = $orderService->db()->where('id', $v['id'])->update(['status' => -8]); } } } # 对需要复冲下单的订单下单 status = -4 可以加入到计划任务中,每分钟执行一次 public function runFcOrder() { $orderService = OrderService::instance(); $result = $orderService->getFcOrder(); if ($result) { $data = $result; $time = time(); $account = array(); $service = \app\channel\service\ChannelService::instance(); foreach ($data as $k => $v) { if ($v['param']) { $v['param'] = json_decode($v['param'], true); if ($v['param']) { $param = $v['param']; $order_id = $v['order_id'] . '_' . $v['num']; $service->use($v['mid'], $v['pid'], $param, true, $order_id); } } } } } # 对回调再次处理,如果渠道回调失败,同时商户回调失败,将使用该方法进行再次发起回调,5分钟执行一次 public function call() { $orderService = OrderService::instance(); $data = $orderService->getErrorData(); if ($data) { foreach ($data as $k => $v) { $this->callSend($v, 1); } } } # 对回调再次处理,如果渠道回调失败,同时商户回调失败,将使用该方法进行再次发起回调,5分钟执行一次 public function callHistory() { $orderService = OrderHistoryService::instance(); $data = $orderService->getErrorData(); if ($data) { foreach ($data as $k => $v) { $this->callSend($v, 2); } } } private function callSend($v, $type = 1) { OrderService::instance()->callSend($v, $type); } # 重启守护进程 每天凌晨1点执行吧 public function restart() { sysconf('settingUpdate', 1); #修改地址 $command = 'python /www/wwwroot/reapi/kill.py 1>/dev/null 2>&1 &'; exec($command); sysconf('settingUpdate', 2); } # 对历史的账户流水日志进行迁移 每月1号凌晨3点半执行: # 30 3 1 * * /www/server/php/74/bin/php /www/wwwroot/api/public/index.php gateway/api.task/moveAccountLog > /dev/null public function moveAccountLog() { MerchantLogHistoryService::instance()->handle(); } # 对商户进行返点 public function rebate() { RebateService::instance()->getData(); } # 计算利润 public function profit() { $day = input('day'); if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } // $orderHistoryService = OrderHistoryService::instance(); $orderHistoryService = OrderLastHistoryService::instance(); $data = $orderHistoryService->getTotalData($day); // var_dump($data); if ($data) { StatService::instance()->handle($day, $data); } } # 计算渠道统计数据 public function statChannel() { $day = input('day'); if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } ChannelAccountService::instance()->stat($day); } # 将数据推到子进程处理 public function popenRunError() { $num = $this->getErrorNum(); if ($num < 1) { $command = 'php /www/wwwroot/reapi/public/index.php task/task/runErrorOrder 1>/dev/null 2>&1 &'; #修改地址 exec($command); } $this->success('操作成功', array('msg' => 'ok')); } # 获取当前执行的错误的子进程数量 public function getErrorNum() { $command = 'ps -ef | grep task/task/runErrorOrder | grep -v grep | wc -l'; $num = exec($command); return $num; } # 对失败的订单进行下单 status = -2 手动执行 public function runErrorOrder() { $orderService = OrderService::instance(); $data = $orderService->getErrorOrder(); if ($data) { foreach ($data as $k => $v) { if ($v['request']) { $v['request'] = json_decode($v['request'], true); // 转到天成 $v['request']['product'] = ProductService::instance()->get($v['request']['merchant'], 12); print_r($v); $this->run($v['request']);die; } } } } # 对账统计 每天0点10分统计吧 public function account() { $day = input('day'); if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } \app\setting\service\AccountLogService::instance()->handle($day); } public function account_all() { $yue = '2021-07-'; for ($i = 1; $i<=30; $i++) { if ($i < 10) { $k = '0'. $i; } else { $k = $i; } $day = $yue . $k; \app\setting\service\AccountLogService::instance()->handle($day); } } # 获取渠道余额 public function getYue(): void { $day = input('day'); if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } // var_dump($_SERVER["argv"]);die; if (isset($_SERVER["argv"][2]) && $_SERVER["argv"][2]) { $cid = $_SERVER["argv"][2]; } else { $cid = false; } $cid = 11; if (!$cid) { exit('error'); } //10040_10026_10044 //10040_10026 $log = \app\setting\service\AccountLogService::instance(); $cid = explode('_', $cid); foreach ($cid as $k => $v) { $log->getYue($v, $day); } } public function runerrorOther() { $time = '1.5'; $orderService = OrderService::instance(); $data = $orderService->getOldDataDiy($time); // print_r($data);die; foreach ($data as $k => $v) { if ($v['request'] && $v['status'] == 1) { $v['request'] = json_decode($v['request'], true); \app\order\service\OrderService::instance()->setCallback(1, $v['order_id'], 3); } } } }