submit(); } } # 对submit提交的数据进行处理 public function submit() { try { $settingUpdate = sysconf('settingUpdate'); if ($settingUpdate == 1) { return true; } # 进程模式 1多进程 2单进程 $queue = sysconf('queue'); if ($queue == 1) { # 获取当前执行的进程数量 $num = $this->getNum(); if ($num >= 1000) { # 等会儿再执行 sleep(30); return true; } $popen = true; } else { $popen = false; } $this->redis(); $data = $this->redis->pop('submit'); $this->run($data, $popen); return true; } catch (\Exception $e) { $this->redis = false; return true; } } protected function redis() { if (!$this->redis) { $this->redis = RedisService::getInstance(); } } protected function run($data, $popen = false) { if (isset($data['merchant']) && $data['merchant'] && isset($data['product']) && $data['product'] && isset($data['param']) && $data['param'] && isset($data['order']) && $data['order']) { if ($popen) { $this->popen($data['order']); } else { $this->channel($data['merchant'], $data['product'], false, $data['param'], $data['order']); } } } # 将数据推到子进程处理 public function popen($order) { #默认 $command = 'php /www/wwwroot/reapi/public/index.php task/api.task/runOne ' . $order . ' 1>/dev/null 2>&1 &'; exec($command); } # 获取当前执行的子进程数量 public function getNum() { $command = 'ps -ef | grep task/api.task/runOne | grep -v grep | wc -l'; $num = exec($command); return $num; } # 对一条数据进行处理 public function runOne() { if (isset($_SERVER["argv"][2]) && $_SERVER["argv"][2]) { $order = $_SERVER["argv"][2]; } else { $order = input('order'); } if (!$order) { exit('error'); } $nq = false; if (strstr($order, '_nq')) { $order = str_replace('_nq', '', $order); $nq = true; } $orderService = OrderService::instance(); $info = $orderService->get($order); if ($info && $info['status'] <= 1 && $info['request']) { $info['request'] = json_decode($info['request'], true); if ($nq) { $info['request']['param']['s'] = 1; } $this->run($info['request']); } exit('ok'); } # 直接对数据库中的订单数据进行处理 如果队列失效,就使用这个方法进行操作,谨慎使用该方法,会和队列同时执行,已取消 /* public function runAll() { $orderService = OrderService::instance(); $data = $orderService->getData(); if ($data) { foreach ($data as $k => $v) { if ($v['request']) { $v['request'] = json_decode($v['request'], true); $this->run($v['request']); } } } } */ # 检测当前订单表里的status=1的订单,如果超过时间5分钟还有status=1,就直接执行,可以每5分钟执行一次,取代上述的runAll方法 public function runOther() { $orderService = OrderService::instance(); $data = $orderService->getOldData(); foreach ($data as $k => $v) { if ($v['request'] && $v['status'] == 1) { $v['request'] = json_decode($v['request'], true); $this->run($v['request'], true); } } /* if ($state && $state > 0) { //$this->restart(); //$this->runAll(); } */ } # 对延迟下单的订单下单 status = -1 可以加入到计划任务中,每分钟执行一次 public function runTempOrder() { $orderService = OrderService::instance(); $result = $orderService->getTempOrder(); if ($result) { $data = $result; $time = time(); $account = array(); foreach ($data as $k => $v) { if ($v['request']) { $v['request'] = json_decode($v['request'], true); if (isset($v['request']['time']) && $time > $v['request']['time']) { if ($v['account']) { if (isset($account[$v['account']]) && $account[$v['account']]) { continue; } else { $account[$v['account']] = $v['account']; } } $this->run($v['request'], true); } } } } } # 对超过数量的订单延迟下单 status = -3 可以加入到计划任务中,每分钟执行一次 public function runTimeOrder() { # 获取设置了延迟处理的渠道 $where['status'] = 1; $where['is_deleted'] = 0; $data = \app\channel\service\ChannelService::instance()->db()->where($where)->whereRaw('order_limit > 0')->select()->toArray(); if ($data) { $orderService = OrderService::instance(); foreach ($data as $k1 => $v1) { $result = $orderService->getTimeOrder($v1['id'], false, false, $v1['order_limit']); if ($result) { foreach ($result as $k => $v) { if ($v['request']) { $v['request'] = json_decode($v['request'], true); if ($v['request']) { $v['request']['order'] .= '_nq'; $this->run($v['request'], true); } } } } } } } // public function checkorederqd() // { // #= // // // $orderService = OrderService::instance(); // foreach ($data as $k1 => $v1) { // $result = $orderService->getTimeOrder($v1['id'], false, false, $v1['order_limit']); // // if ($result) { // foreach ($result as $k => $v) { // if ($v['request']) { // $v['request'] = json_decode($v['request'], true); // if ($v['request']) { // $v['request']['order'] .= '_nq'; // $this->run($v['request'], true); // } // } // } // } // } // // } # 对暂停的的订单重新下单 status = -5 public function runStopOrder() { $orderService = OrderService::instance(); $result = $orderService->getStopOrder(); if ($result) { $data = $result; $time = time(); $account = array(); $service = \app\channel\service\ChannelService::instance(); foreach ($data as $k => $v) { if ($v['param']) { $v['param'] = json_decode($v['param'], true); if ($v['param']) { $param = $v['param']; $order_id = $v['order_id'] . '_' . $v['num']; $service->use($v['mid'], $v['pid'], $param, true, $order_id); } } } } } #删除日志 public function delLog() { #.extend/data/logs/gateway/2023/10/1/xxx.log $currentTimes=strtotime(date("Y-m-d",time())); $delTimes=strtotime('-4 months', $currentTimes); $mounth=date('m',$delTimes); $year=date("Y",$delTimes); #TODO 换服务器修改地址 $path='/www/wwwroot/reapi/extend/data/logs/gateway/'.$year.'/'.$mounth; $result=$this->delete_directory($path); } #删除目录文件夹 public function delete_directory($dir) { #file_exists(): open_basedir restriction in effect. File(/www/wwroot/bao111/extend/data/logs/gateway/2023/09) is not within the allowed path(s): (/www/wwwroot/bao111/:/tmp/) if(!file_exists($dir)){ return true; } if(!is_dir($dir)){ return unlink($dir); } // var_dump(1111);die; #rm -rf /www/www/bao111/gateway/ foreach(scandir($dir) as $item){ if($item =="."||$item ==".."){ continue; } if(!$this->delete_directory($dir .DIRECTORY_SEPARATOR.$item)){ return false; } } return rmdir($dir); } # 对需要复冲下单的订单下单 status = -4 可以加入到计划任务中,每分钟执行一次 public function runFcOrder() { $orderService = OrderService::instance(); $result = $orderService->getFcOrder(); if ($result) { $data = $result; $time = time(); $account = array(); $service = \app\channel\service\ChannelService::instance(); foreach ($data as $k => $v) { if ($v['param']) { $v['param'] = json_decode($v['param'], true); if ($v['param']) { $param = $v['param']; $order_id = $v['order_id'] . '_' . $v['num']; $service->use($v['mid'], $v['pid'], $param, true, $order_id); } } } } } # 对回调再次处理,如果渠道回调失败,同时商户回调失败,将使用该方法进行再次发起回调,5分钟执行一次 public function call() { $orderService = OrderService::instance(); $data = $orderService->getErrorData(); // var_dump($data);die; if ($data) { foreach ($data as $k => $v) { $this->callSend($v, 1); } } } # 对回调再次处理,如果渠道回调失败,同时商户回调失败,将使用该方法进行再次发起回调,5分钟执行一次 public function callHistory() { $orderService = OrderHistoryService::instance(); $data = $orderService->getErrorData(); if ($data) { foreach ($data as $k => $v) { $this->callSend($v, 2); } } } private function callSend($v, $type = 1) { OrderService::instance()->callSend($v, $type); } # 重启守护进程 每天凌晨1点执行吧 public function restart() { sysconf('settingUpdate', 1); #修改地址 $command = 'python /www/wwwroot/reapi/kill.py 1>/dev/null 2>&1 &'; exec($command); sysconf('settingUpdate', 2); } # 对完成订单进行处理,转移成历史订单,可以每天凌晨1点进行执行,默认迁移前一天的数据 public function finish() { $day = input('day'); MerchantService::instance()->init($day); /* if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } */ $page = 1; while($this->finishOne($day, $page)) { $page++; } } public function finishOne($day, $page) { $orderService = OrderService::instance(); $orderHistoryService = OrderHistoryService::instance(); $data = $orderService->getFinishData($day, $page); if ($data) { foreach ($data as $k => $v) { # 先插入一条历史数据 $id = $v['id']; unset($v['id']); $state = $orderHistoryService->up($v); if ($state) { $orderService->del($id); } } return true; } else { return false; } } # 对历史的账户流水日志进行迁移 每月1号凌晨3点半执行: # 30 3 1 * * /www/server/php/74/bin/php /www/wwwroot/api/public/index.php gateway/api.task/moveAccountLog > /dev/null public function moveAccountLog() { MerchantLogHistoryService::instance()->handle(); } # 对商户进行返点 public function rebate() { RebateService::instance()->getData(); } # 计算利润 public function profit() { $day = input('day'); if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } // $orderHistoryService = OrderHistoryService::instance(); $orderHistoryService = OrderLastHistoryService::instance(); $data = $orderHistoryService->getTotalData($day); // var_dump($data); if ($data) { StatService::instance()->handle($day, $data); } } # 计算渠道统计数据 public function statChannel() { $day = input('day'); if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } ChannelAccountService::instance()->stat($day); } # 将数据推到子进程处理 public function popenRunError() { $num = $this->getErrorNum(); if ($num < 1) { $command = 'php /www/wwwroot/reapi/public/index.php task/api.task/runErrorOrder 1>/dev/null 2>&1 &'; #修改地址 exec($command); } $this->success('操作成功', array('msg' => 'ok')); } # 获取当前执行的错误的子进程数量 public function getErrorNum() { $command = 'ps -ef | grep task/api.task/runErrorOrder | grep -v grep | wc -l'; $num = exec($command); return $num; } # 对失败的订单进行下单 status = -2 手动执行 public function runErrorOrder() { $orderService = OrderService::instance(); $data = $orderService->getErrorOrder(); if ($data) { foreach ($data as $k => $v) { if ($v['request']) { $v['request'] = json_decode($v['request'], true); // 转到天成 $v['request']['product'] = ProductService::instance()->get($v['request']['merchant'], 12); print_r($v); $this->run($v['request']);die; } } } } /* public function not() { $MerchantService = MerchantService::instance(); $data['status'] = 3; $data['cash'] = 100; $data['order_id'] = 'Q202010284980886664664866'; $data['merchant_order_id'] = '1603849810460200838'; $data['notify_num'] = 1; $MerchantService->up(5, $data); } */ # 对账统计 每天0点10分统计吧 public function account() { $day = input('day'); if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } \app\setting\service\AccountLogService::instance()->handle($day); } public function account_all() { $yue = '2021-07-'; for ($i = 1; $i<=30; $i++) { if ($i < 10) { $k = '0'. $i; } else { $k = $i; } $day = $yue . $k; \app\setting\service\AccountLogService::instance()->handle($day); } } # 获取渠道余额 public function getYue(): void { $day = input('day'); if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } if (isset($_SERVER["argv"][2]) && $_SERVER["argv"][2]) { $cid = $_SERVER["argv"][2]; } else { $cid = false; } if (!$cid) { exit('error'); } //10040_10026_10044 //10040_10026 $log = \app\setting\service\AccountLogService::instance(); $cid = explode('_', $cid); foreach ($cid as $k => $v) { $log->getYue($v, $day); } } public function runerrorOther() { $time = '1.5'; $orderService = OrderService::instance(); $data = $orderService->getOldDataDiy($time); // print_r($data);die; foreach ($data as $k => $v) { if ($v['request'] && $v['status'] == 1) { $v['request'] = json_decode($v['request'], true); \app\order\service\OrderService::instance()->setCallback(1, $v['order_id'], 3); } } } }