submit(); } } # 对submit提交的数据进行处理 public function submit() { try { $settingUpdate = sysconf('settingUpdate'); if ($settingUpdate == 1) { return true; } # 进程模式 1多进程 2单进程 $queue = sysconf('queue'); if ($queue == 1) { # 获取当前执行的进程数量 $num = $this->getNum(); if ($num >= 1000) { # 等会儿再执行 sleep(30); return true; } $popen = true; } else { $popen = false; } $this->redis(); $data = $this->redis->pop('submit'); $this->run($data, $popen); return true; } catch (\Exception $e) { $this->redis = false; return true; } } protected function redis() { if (!$this->redis) { $this->redis = RedisService::getInstance(); } } protected function run($data, $popen = false) { if (isset($data['merchant']) && $data['merchant'] && isset($data['product']) && $data['product'] && isset($data['param']) && $data['param'] && isset($data['order']) && $data['order']) { if ($popen) { $this->popen($data['order']); } else { $this->channel($data['merchant'], $data['product'], false, $data['param'], $data['order']); } } } # 将数据推到子进程处理 public function popen($order) { // $command = '/www/server/php/74/bin/php /www/wwwroot/bao111/public/index.php gateway/api.task/runOne ' . $order . ' 1>/dev/null 2>&1 &'; #默认 // $command = 'php /www/wwwroot/reapi/public/index.php gateway/api.task/runOne ' . $order . ' 1>/dev/null 2>&1 &'; $command = 'php /www/sites/reapi/index/public/index.php gateway/api.task/runOne ' . $order . ' 1>/dev/null 2>&1 &'; //$command = 'php /www/jiekou/public/index.php gateway/api.task/runOne ' . $order . ' 1>/dev/null 2>&1 &'; exec($command); } # 获取当前执行的子进程数量 public function getNum() { $command = 'ps -ef | grep gateway/api.task/runOne | grep -v grep | wc -l'; $num = exec($command); return $num; } # 对一条数据进行处理 public function runOne() { if (isset($_SERVER["argv"][2]) && $_SERVER["argv"][2]) { $order = $_SERVER["argv"][2]; } else { $order = input('order'); } if (!$order) { exit('error'); } $nq = false; if (strstr($order, '_nq')) { $order = str_replace('_nq', '', $order); $nq = true; } $orderService = OrderService::instance(); $info = $orderService->get($order); if ($info && $info['status'] <= 1 && $info['request']) { $info['request'] = json_decode($info['request'], true); if ($nq) { $info['request']['param']['s'] = 1; } $this->run($info['request']); } exit('ok'); } # 直接对数据库中的订单数据进行处理 如果队列失效,就使用这个方法进行操作,谨慎使用该方法,会和队列同时执行,已取消 /* public function runAll() { $orderService = OrderService::instance(); $data = $orderService->getData(); if ($data) { foreach ($data as $k => $v) { if ($v['request']) { $v['request'] = json_decode($v['request'], true); $this->run($v['request']); } } } } */ public function runtestdiy() { $time = '1'; $orderService = OrderService::instance(); $data = $orderService->getOldDataDiy($time); foreach ($data as $k => $v) { if ($v['request'] && $v['status'] == 1 && $v['order_id']=='R202402197619128992613340') { $v['request'] = json_decode($v['request'], true); $this->run($v['request']); } } /* if ($state && $state > 0) { //$this->restart(); //$this->runAll(); } */ } # 检测当前订单表里的status=1的订单,如果超过时间5分钟还有status=1,就直接执行,可以每5分钟执行一次,取代上述的runAll方法 public function runOther() { $orderService = OrderService::instance(); $data = $orderService->getOldData(); foreach ($data as $k => $v) { if ($v['request'] && $v['status'] == 1) { $v['request'] = json_decode($v['request'], true); $this->run($v['request'], true); } } /* if ($state && $state > 0) { //$this->restart(); //$this->runAll(); } */ } // public function runtest() // { // $orderService = OrderService::instance(); // $data = $orderService->getOldData(); // // foreach ($data as $k => $v) { // if ($v['request'] && $v['status'] == 1) { // $v['request'] = json_decode($v['request'], true); // $this->run($v['request'], true); // } // } // /* // if ($state && $state > 0) { // //$this->restart(); // //$this->runAll(); // } // */ // } # 对延迟下单的订单下单 status = -1 可以加入到计划任务中,每分钟执行一次 public function runTempOrder() { $orderService = OrderService::instance(); $result = $orderService->getTempOrder(); if ($result) { $data = $result; $time = time(); $account = array(); foreach ($data as $k => $v) { if ($v['request']) { $v['request'] = json_decode($v['request'], true); if (isset($v['request']['time']) && $time > $v['request']['time']) { if ($v['account']) { if (isset($account[$v['account']]) && $account[$v['account']]) { continue; } else { $account[$v['account']] = $v['account']; } } $this->run($v['request'], true); } } } } } # 对超过数量的订单延迟下单 status = -3 可以加入到计划任务中,每分钟执行一次 public function runTimeOrder() { # 获取设置了延迟处理的渠道 $where['status'] = 1; $where['is_deleted'] = 0; $data = \app\channel\service\ChannelService::instance()->db()->where($where)->whereRaw('order_limit > 0')->select()->toArray(); if ($data) { $orderService = OrderService::instance(); foreach ($data as $k1 => $v1) { $result = $orderService->getTimeOrder($v1['id'], false, false, $v1['order_limit']); if ($result) { foreach ($result as $k => $v) { if ($v['request']) { $v['request'] = json_decode($v['request'], true); if ($v['request']) { $v['request']['order'] .= '_nq'; $this->run($v['request'], true); } } } } } } } // public function checkorederqd() // { // #= // // // $orderService = OrderService::instance(); // foreach ($data as $k1 => $v1) { // $result = $orderService->getTimeOrder($v1['id'], false, false, $v1['order_limit']); // // if ($result) { // foreach ($result as $k => $v) { // if ($v['request']) { // $v['request'] = json_decode($v['request'], true); // if ($v['request']) { // $v['request']['order'] .= '_nq'; // $this->run($v['request'], true); // } // } // } // } // } // // } # 对暂停的的订单重新下单 status = -5 public function runStopOrder() { $orderService = OrderService::instance(); $result = $orderService->getStopOrder(); if ($result) { $data = $result; $time = time(); $account = array(); $service = \app\channel\service\ChannelService::instance(); foreach ($data as $k => $v) { if ($v['param']) { $v['param'] = json_decode($v['param'], true); if ($v['param']) { $param = $v['param']; $order_id = $v['order_id'] . '_' . $v['num']; $service->use($v['mid'], $v['pid'], $param, true, $order_id); } } } } } #删除日志 public function delLog() { #.extend/data/logs/gateway/2023/10/1/xxx.log $currentTimes=strtotime(date("Y-m-d",time())); $delTimes=strtotime('-4 months', $currentTimes); $mounth=date('m',$delTimes); $year=date("Y",$delTimes); #TODO 换服务器修改地址 // $path='/www/wwwroot/bao111/extend/data/logs/gateway/'.$year.'/'.$mounth; // $path='/www/wwwroot/reapi/extend/data/logs/gateway/'.$year.'/'.$mounth; $path='/www/sites/reapi/index/extend/data/logs/gateway/'.$year.'/'.$mounth; $result=$this->delete_directory($path); } #删除目录文件夹 public function delete_directory($dir) { #file_exists(): open_basedir restriction in effect. File(/www/wwroot/bao111/extend/data/logs/gateway/2023/09) is not within the allowed path(s): (/www/wwwroot/bao111/:/tmp/) if(!file_exists($dir)){ return true; } if(!is_dir($dir)){ return unlink($dir); } // var_dump(1111);die; #rm -rf /www/www/bao111/gateway/ foreach(scandir($dir) as $item){ if($item =="."||$item ==".."){ continue; } if(!$this->delete_directory($dir .DIRECTORY_SEPARATOR.$item)){ return false; } } return rmdir($dir); } # 对需要复冲下单的订单下单 status = -4 可以加入到计划任务中,每分钟执行一次 public function runFcOrder() { $orderService = OrderService::instance(); $result = $orderService->getFcOrder(); if ($result) { $data = $result; $time = time(); $account = array(); $service = \app\channel\service\ChannelService::instance(); foreach ($data as $k => $v) { if ($v['param']) { $v['param'] = json_decode($v['param'], true); if ($v['param']) { $param = $v['param']; $order_id = $v['order_id'] . '_' . $v['num']; $service->use($v['mid'], $v['pid'], $param, true, $order_id); } } } } } # 对回调再次处理,如果渠道回调失败,同时商户回调失败,将使用该方法进行再次发起回调,5分钟执行一次 public function call() { $orderService = OrderService::instance(); $data = $orderService->getErrorData(); // var_dump($data);die; if ($data) { foreach ($data as $k => $v) { $this->callSend($v, 1); } } } # 对回调再次处理,如果渠道回调失败,同时商户回调失败,将使用该方法进行再次发起回调,5分钟执行一次 public function callHistory() { $orderService = OrderHistoryService::instance(); $data = $orderService->getErrorData(); if ($data) { foreach ($data as $k => $v) { $this->callSend($v, 2); } } } private function callSend($v, $type = 1) { OrderService::instance()->callSend($v, $type); } # 重启守护进程 每天凌晨1点执行吧 public function restart() { sysconf('settingUpdate', 1); #修改地址 // $command = 'python /www/wwwroot/bao111/kill.py 1>/dev/null 2>&1 &'; // $command = 'python /www/wwwroot/reapi/kill.py 1>/dev/null 2>&1 &'; $command = 'python /www/sites/reapi/index/kill.py 1>/dev/null 2>&1 &'; exec($command); sysconf('settingUpdate', 2); } # 对完成订单进行处理,转移成历史订单,可以每天凌晨1点进行执行,默认迁移前一天的数据 public function finish() { $day = input('day'); MerchantService::instance()->init($day); /* if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } */ $page = 1; while($this->finishOne($day, $page)) { $page++; } } public function finishOne($day, $page) { $orderService = OrderService::instance(); $orderHistoryService = OrderHistoryService::instance(); $data = $orderService->getFinishData($day, $page); if ($data) { foreach ($data as $k => $v) { # 先插入一条历史数据 $id = $v['id']; unset($v['id']); $state = $orderHistoryService->up($v); if ($state) { $orderService->del($id); } } return true; } else { return false; } } # 对历史的账户流水日志进行迁移 每月1号凌晨3点半执行: # 30 3 1 * * /www/server/php/74/bin/php /www/wwwroot/api/public/index.php gateway/api.task/moveAccountLog > /dev/null public function moveAccountLog() { MerchantLogHistoryService::instance()->handle(); } # 对商户进行返点 public function rebate() { RebateService::instance()->getData(); } # 计算利润 public function profit() { $day = input('day'); if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } // $orderHistoryService = OrderHistoryService::instance(); $orderHistoryService = OrderLastHistoryService::instance(); $data = $orderHistoryService->getTotalData($day); // var_dump($data); if ($data) { StatService::instance()->handle($day, $data); } } # 计算渠道统计数据 public function statChannel() { $day = input('day'); if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } ChannelAccountService::instance()->stat($day); } # 将数据推到子进程处理 public function popenRunError() { $num = $this->getErrorNum(); if ($num < 1) { // $command = 'php /www/wwroot/reapi/public/index.php gateway/api.task/runErrorOrder 1>/dev/null 2>&1 &'; #修改地址 $command = 'php /www/sites/reapi/index/public/index.php gateway/api.task/runErrorOrder 1>/dev/null 2>&1 &'; // $command = '/www/server/php/74/bin/php /www/wwwroot/bao111/public/index.php gateway/api.task/runErrorOrder 1>/dev/null 2>&1 &'; //$command = 'php /www/jiekou/public/index.php gateway/api.task/runErrorOrder 1>/dev/null 2>&1 &'; exec($command); } $this->success('操作成功', array('msg' => 'ok')); } # 获取当前执行的错误的子进程数量 public function getErrorNum() { $command = 'ps -ef | grep gateway/api.task/runErrorOrder | grep -v grep | wc -l'; $num = exec($command); return $num; } # 对失败的订单进行下单 status = -2 手动执行 public function runErrorOrder() { $orderService = OrderService::instance(); $data = $orderService->getErrorOrder(); if ($data) { foreach ($data as $k => $v) { if ($v['request']) { $v['request'] = json_decode($v['request'], true); // 转到天成 $v['request']['product'] = ProductService::instance()->get($v['request']['merchant'], 12); print_r($v); $this->run($v['request']);die; } } } } /* public function not() { $MerchantService = MerchantService::instance(); $data['status'] = 3; $data['cash'] = 100; $data['order_id'] = 'Q202010284980886664664866'; $data['merchant_order_id'] = '1603849810460200838'; $data['notify_num'] = 1; $MerchantService->up(5, $data); } */ # 对账统计 每天0点10分统计吧 public function account() { $day = input('day'); if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } \app\setting\service\AccountLogService::instance()->handle($day); } public function account_all() { $yue = '2021-07-'; for ($i = 1; $i<=30; $i++) { if ($i < 10) { $k = '0'. $i; } else { $k = $i; } $day = $yue . $k; \app\setting\service\AccountLogService::instance()->handle($day); } } # 获取渠道余额 public function getYue(): void { $day = input('day'); if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } if (isset($_SERVER["argv"][2]) && $_SERVER["argv"][2]) { $cid = $_SERVER["argv"][2]; } else { $cid = false; } if (!$cid) { exit('error'); } //10040_10026_10044 //10040_10026 $log = \app\setting\service\AccountLogService::instance(); $cid = explode('_', $cid); foreach ($cid as $k => $v) { $log->getYue($v, $day); } } # 对订单列表拉取到缓存,,每小时58分钟时执行 public function pullredisOrder(): void { $orderService = OrderService::instance(); $data = $orderService->db()->field('status,merchant_order_id,create_at,account,mid')->select()->toArray(); $this->redis(); if($data){ foreach($data as $key=>$v){ $res= $this->redis->set('kuaishou_query'.$data[$key]['merchant_order_id'],$data[$key],10*60+rand(10,99)); if(!$res){ Log::write('gateway', 'kuaishou_query_task', '获取订单数据失败-'.$data[$key]['merchant_order_id']); } } } } /* 对快手黑名单队列进行退款处理正常执行 */ public function refundKuaishou() { try { $this->redis(); $redisData = $this->redis->pop('blackcall'); $param = array(); $param['orderId'] = $redisData['orderNo']; $param['status'] = 'FAILED'; $param['bizType'] = '20'; $param['failCode'] = '4013022'; $param['failMsg'] = '黑名单'; $param = json_encode($param); $signtime = time(); $sign = 'access_token=' . $redisData['access_token'] . '&appkey=' . $redisData['agentId'] . '&method=integration.virtual.topup.mobile.order.callback¶m=' . $param . '&signMethod=MD5×tamp=' . $signtime . '&version=1'; $signmd5 = md5($sign); $param = urlencode($param); $geturl = $redisData['notify_url'] . '?access_token=' . $redisData['access_token'] . '&appkey=' . $redisData['agentId'] . '&method=integration.virtual.topup.mobile.order.callback¶m=' . $param . '&signMethod=MD5×tamp=' . $signtime . '&version=1&sign=' . $signmd5; // $blackGet= ['result' => '1']; $blackGet = \app\gateway\controller\api\Coreks::get_curl_post($geturl); $blackGet = json_decode($blackGet, true); $response = $blackGet['result']; if ($response == '1') { Log::write('gateway', 'black', $blackGet); } else { Log::write('errorTip', 'black_request', $redisData); Log::write('gateway', 'black', $blackGet); } return 'ok'; } catch (\Exception $e) { // return $e; Log::write('errorTip', 'redis', $e->getMessage()); return 'error'; } } #对cardbuy购置的卡密同步响应数据进行异步处理订单回调状态,建议为每分钟一次 public function kami_chanel_notifly() { $orderService = OrderService::instance(); $where1 = [ 'status' => '7' , // 'product_key' => ['like','%cardbuy%'] ]; $where[] = [ 'product_key','like','%cardbuy%' ]; // $where = [ // 'status' => '7' , // 'product_key' => ['like','%cardbuy%'] // ]; $orderlist = $orderService->db()->where($where)->where($where1)->order('id asc')->select()->toArray(); // var_dump($orderlist);die; foreach($orderlist as $key=>$v){ $kami91server = Kami91OrderService::instance(); $info = $kami91server->db()->where(['order_id' => $v['order_id']])->find(); // var_dump($info);die; if(!$info){ $this->kami91order(); }elseif(isset($info['status']) && $info['status'] == '6'){ #查询卡的状态是否已发送 $kami91 =new \app\openapi\controller\Kami91($this->app); $check = $kami91->queryFaka($v['merchant_order_id']); // var_dump($check);die; if($check == 'ok'){ $msg = 'success'; $kami91server->upStatus($v['order_id'], 2 ,$msg); $kami91->huidiao($v['order_id']); } } elseif (empty($info['cardno'])){ $kami91=new \app\openapi\controller\Kami91($this->app); $kami91->uphuidiao($v['order_id']); } } } #关联91卡库订单号 public function kami91order() { $kami91server = Kami91OrderService::instance(); $where1 = [ 'status' => '1' , // 'product_key' => ['like','%cardbuy%'] ]; $where[] = [ 'product_key','like','%cardbuy%' ]; $orderlist = $kami91server->db()->where($where)->where($where1)->order('id asc')->select()->toArray(); // var_dump($orderlist);die; foreach($orderlist as $key=>$v){ $orderService = OrderService::instance(); $order = $orderService->get('', $v['merchant_order_id'] ,$v['mid']); if($order){ $data =array(); $data['order_id'] = $order['order_id']; $data['status'] = 4; $kami91server->db()->where(array('merchant_order_id' => $v['merchant_order_id']))->update($data); } } } public function runerrorOther() { $time = '1.5'; $orderService = OrderService::instance(); $data = $orderService->getOldDataDiy($time); // print_r($data);die; foreach ($data as $k => $v) { if ($v['request'] && $v['status'] == 1) { $v['request'] = json_decode($v['request'], true); \app\order\service\OrderService::instance()->setCallback(1, $v['order_id'], 3); } } } public function kami91orderstatus() { $kami91server = Kami91OrderService::instance(); $where1 = [ 'status' => '4' , // 'product_key' => ['like','%cardbuy%'] ]; $where[] = [ 'product_key','like','%cardbuy%' ]; $orderlist = $kami91server->db()->where($where)->where($where1)->order('id asc')->select()->toArray(); // var_dump($orderlist);die; foreach($orderlist as $key=>$v){ $orderService = OrderService::instance(); $order = $orderService->get('', $v['merchant_order_id'] ,$v['mid']); // var_dump($order);die; if($order){ $data = []; if($order['status'] == '3'){ if(empty($order['merchant_callback_msg']) || $order['merchant_callback_msg'] == '{}'){ $data['order_id'] = $order['order_id']; $data['status'] = '3'; $kami91server->db()->where(array('merchant_order_id' => $v['merchant_order_id']))->update($data); }elseif(isset($order['response']) ){ $response = json_decode($order['response'],true); if(isset($response['kami'])){ $data = [ 'cardno' =>$response['kami']['cardno'], 'cardpwd' =>$response['kami']['cardpwd'], 'expire_time' =>$response['kami']['expired'], ]; $data['order_id'] = $order['order_id']; $data['status'] = '7'; $kami91server->db()->where(array('merchant_order_id' => $v['merchant_order_id']))->update($data); }else{ $data['order_id'] = $order['order_id']; $data['status'] = '3'; $kami91server->db()->where(array('merchant_order_id' => $v['merchant_order_id']))->update($data); } }else{ $data['order_id'] = $order['order_id']; $data['status'] = '3'; $kami91server->db()->where(array('merchant_order_id' => $v['merchant_order_id']))->update($data); } }elseif($order['status'] == '2'){ if(isset($order['response'])) { $response = json_decode($order['response'], true); if (isset($response['kami'])) { $data = [ 'cardno' => $response['kami']['cardno'], 'cardpwd' => $response['kami']['cardpwd'], 'expire_time' => $response['kami']['expired'], ]; } } $data['order_id'] = $order['order_id']; $data['status'] = '2'; $kami91server->db()->where(array('merchant_order_id' => $v['merchant_order_id']))->update($data); } } } } public function kami91orderstatus2() { $kami91server = Kami91OrderService::instance(); $where1 = [ 'status' => '6' , // 'product_key' => ['like','%cardbuy%'] ]; $where[] = [ 'product_key','like','%cardbuy%' ]; $orderlist = $kami91server->db()->where($where)->where($where1)->order('id asc')->select()->toArray(); // var_dump($orderlist);die; foreach($orderlist as $key=>$v){ $orderService = OrderService::instance(); $order = $orderService->get('', $v['merchant_order_id'] ,$v['mid']); if($order){ $data = []; if($order['status'] == '3') { $kami91 = new \app\openapi\controller\Kami91($this->app); $check = $kami91->queryFaka($v['merchant_order_id']); // var_dump($check);die; if ($check == 'ok') { $msg = 'success'; $data['order_id'] = $order['order_id']; $data['status'] = '5'; $kami91server->db()->where(array('merchant_order_id' => $v['merchant_order_id']))->update($data); // $kami91server->upStatus($v['order_id'], 2 ,$msg); // $kami91->huidiao($v['order_id']); } else { // $msg = 'success'; $data['order_id'] = $order['order_id']; $data['status'] = '7'; $kami91server->db()->where(array('merchant_order_id' => $v['merchant_order_id']))->update($data); } }elseif ($order['status'] == '2'){ $data['order_id'] = $order['order_id']; $data['status'] = '2'; $kami91server->db()->where(array('merchant_order_id' => $v['merchant_order_id']))->update($data); } } } } public function kamiordertime() { $time = '30'; $orderService = OrderService::instance(); $data = $orderService->getKamiOldDataDiy($time); $kami91server = Kami91OrderService::instance(); foreach ($data as $k => $v) { if ($v['status'] == 7) { $info = $kami91server->db()->where(['order_id' => $v['order_id']])->find(); // var_dump($info);die; if(!$info){ \app\order\service\OrderService::instance()->setCallback(1, $v['order_id'], 3); }else { $kami91 = new \app\openapi\controller\Kami91($this->app); $check = $kami91->queryFaka($v['merchant_order_id']); if ($check == 'ok') { $msg = 'success'; $kami91server->upStatus($v['order_id'], 2, $msg); $kami91->huidiao($v['order_id']); } else { \app\order\service\OrderService::instance()->setCallback(1, $v['order_id'], 3); } } // $v['request'] = json_decode($v['request'], true); // \app\order\service\OrderService::instance()->setCallback(1, $v['order_id'], 3); } } } # 对卡密完成订单进行处理,转移成历史订单,可以每天凌晨1点进行执行,默认迁移前一天的数据 public function kami_finish() { $day = input('day'); MerchantService::instance()->init($day); /* if (!$day) { $day = date('Y-m-d', strtotime('-1 day')); } */ $page = 1; while($this->kami_finishOne($day, $page)) { $page++; } } public function kami_finishOne($day, $page) { $kami91server = Kami91OrderService::instance(); $kamiorderHistoryService = KamiOrderHistoryService::instance(); $data = $kami91server->getFinishData($day, $page); if ($data) { foreach ($data as $k => $v) { # 先插入一条历史数据 $id = $v['id']; unset($v['id']); $state = $kamiorderHistoryService->up($v); if ($state) { $kami91server->del($id); } } return true; } else { return false; } } public function kami_jc() { $kami91server = Kami91OrderService::instance(); $where1 = [ 'status' => '7', // 'product_key' => ['like','%cardbuy%'] ]; $where[] = [ 'product_key', 'like', '%cardbuy%' ]; $orderlist = $kami91server->db()->where($where)->where($where1)->order('id asc')->select()->toArray(); // var_dump($orderlist);die; $quantity = 0; $quantity1 = 0; foreach ($orderlist as $key => $v) { $orderService = OrderService::instance(); $order = $orderService->get('', $v['merchant_order_id'], $v['mid']); if ($order) { $data = []; if ($order['status'] == '3') { $kami91 = new \app\openapi\controller\Kami91($this->app); $check = $kami91->queryFaka($v['merchant_order_id']); // var_dump($check);die; if ($check == 'ok') { // $quantity++; // 每次循环增加1 // $quantity1 = $quantity1 + $order['cash']; // $quantity++; // echo $order['merchant_order_id'] . "\n"; // $kami91server->upStatus($v['order_id'], 2 ,$msg); // $kami91->huidiao($v['order_id']); } } } } } }