2024-12-04 12:33:47 +08:00

609 lines
18 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
namespace app\task\controller\api;
set_time_limit(0);
ini_set('memory_limit','512M');
ini_set('default_socket_timeout', -1); //不超时
use app\merchant\service\OrderLastHistoryService;
use app\merchant\service\OrderService;
use app\merchant\service\OrderHistoryService;
use app\merchant\service\MerchantService;
use app\merchant\service\RebateService;
use app\gateway\service\RedisService;
use app\setting\service\StatService;
use app\setting\service\ChannelAccountService;
use app\channel\service\ProductService;
use app\merchant\service\MerchantLogHistoryService;
use dever\Log;
use think\exception\HttpResponseException;
/**
* 任务
* Class Test
* @package app\gateway\api
*/
class Task extends Core
{
# 是否检测数据
protected $check = false;
# redis连接
protected $redis = false;
public function cron()
{
while(1) {
$this->submit();
}
}
# 对submit提交的数据进行处理
public function submit()
{
try {
$settingUpdate = sysconf('settingUpdate');
if ($settingUpdate == 1) {
return true;
}
# 进程模式 1多进程 2单进程
$queue = sysconf('queue');
if ($queue == 1) {
# 获取当前执行的进程数量
$num = $this->getNum();
if ($num >= 1000) {
# 等会儿再执行
sleep(30);
return true;
}
$popen = true;
} else {
$popen = false;
}
$this->redis();
$data = $this->redis->pop('submit');
$this->run($data, $popen);
return true;
} catch (\Exception $e) {
$this->redis = false;
return true;
}
}
protected function redis()
{
if (!$this->redis) {
$this->redis = RedisService::getInstance();
}
}
protected function run($data, $popen = false)
{
if (isset($data['merchant']) && $data['merchant'] && isset($data['product']) && $data['product'] && isset($data['param']) && $data['param'] && isset($data['order']) && $data['order']) {
if ($popen) {
$this->popen($data['order']);
} else {
$this->channel($data['merchant'], $data['product'], false, $data['param'], $data['order']);
}
}
}
# 将数据推到子进程处理
public function popen($order)
{
#默认
$command = 'php /www/wwwroot/reapi/public/index.php task/api.task/runOne ' . $order . ' 1>/dev/null 2>&1 &';
exec($command);
}
# 获取当前执行的子进程数量
public function getNum()
{
$command = 'ps -ef | grep task/api.task/runOne | grep -v grep | wc -l';
$num = exec($command);
return $num;
}
# 对一条数据进行处理
public function runOne()
{
if (isset($_SERVER["argv"][2]) && $_SERVER["argv"][2]) {
$order = $_SERVER["argv"][2];
} else {
$order = input('order');
}
if (!$order) {
exit('error');
}
$nq = false;
if (strstr($order, '_nq')) {
$order = str_replace('_nq', '', $order);
$nq = true;
}
$orderService = OrderService::instance();
$info = $orderService->get($order);
if ($info && $info['status'] <= 1 && $info['request']) {
$info['request'] = json_decode($info['request'], true);
if ($nq) {
$info['request']['param']['s'] = 1;
}
$this->run($info['request']);
}
exit('ok');
}
# 直接对数据库中的订单数据进行处理 如果队列失效,就使用这个方法进行操作,谨慎使用该方法,会和队列同时执行,已取消
/*
public function runAll()
{
$orderService = OrderService::instance();
$data = $orderService->getData();
if ($data) {
foreach ($data as $k => $v) {
if ($v['request']) {
$v['request'] = json_decode($v['request'], true);
$this->run($v['request']);
}
}
}
}
*/
# 检测当前订单表里的status=1的订单如果超过时间5分钟还有status=1就直接执行可以每5分钟执行一次取代上述的runAll方法
public function runOther()
{
$orderService = OrderService::instance();
$data = $orderService->getOldData();
foreach ($data as $k => $v) {
if ($v['request'] && $v['status'] == 1) {
$v['request'] = json_decode($v['request'], true);
$this->run($v['request'], true);
}
}
/*
if ($state && $state > 0) {
//$this->restart();
//$this->runAll();
}
*/
}
# 对延迟下单的订单下单 status = -1 可以加入到计划任务中,每分钟执行一次
public function runTempOrder()
{
$orderService = OrderService::instance();
$result = $orderService->getTempOrder();
if ($result) {
$data = $result;
$time = time();
$account = array();
foreach ($data as $k => $v) {
if ($v['request']) {
$v['request'] = json_decode($v['request'], true);
if (isset($v['request']['time']) && $time > $v['request']['time']) {
if ($v['account']) {
if (isset($account[$v['account']]) && $account[$v['account']]) {
continue;
} else {
$account[$v['account']] = $v['account'];
}
}
$this->run($v['request'], true);
}
}
}
}
}
# 对超过数量的订单延迟下单 status = -3 可以加入到计划任务中,每分钟执行一次
public function runTimeOrder()
{
# 获取设置了延迟处理的渠道
$where['status'] = 1;
$where['is_deleted'] = 0;
$data = \app\channel\service\ChannelService::instance()->db()->where($where)->whereRaw('order_limit > 0')->select()->toArray();
if ($data) {
$orderService = OrderService::instance();
foreach ($data as $k1 => $v1) {
$result = $orderService->getTimeOrder($v1['id'], false, false, $v1['order_limit']);
if ($result) {
foreach ($result as $k => $v) {
if ($v['request']) {
$v['request'] = json_decode($v['request'], true);
if ($v['request']) {
$v['request']['order'] .= '_nq';
$this->run($v['request'], true);
}
}
}
}
}
}
}
// public function checkorederqd()
// {
// #=
//
//
// $orderService = OrderService::instance();
// foreach ($data as $k1 => $v1) {
// $result = $orderService->getTimeOrder($v1['id'], false, false, $v1['order_limit']);
//
// if ($result) {
// foreach ($result as $k => $v) {
// if ($v['request']) {
// $v['request'] = json_decode($v['request'], true);
// if ($v['request']) {
// $v['request']['order'] .= '_nq';
// $this->run($v['request'], true);
// }
// }
// }
// }
// }
//
// }
# 对暂停的的订单重新下单 status = -5
public function runStopOrder()
{
$orderService = OrderService::instance();
$result = $orderService->getStopOrder();
if ($result) {
$data = $result;
$time = time();
$account = array();
$service = \app\channel\service\ChannelService::instance();
foreach ($data as $k => $v) {
if ($v['param']) {
$v['param'] = json_decode($v['param'], true);
if ($v['param']) {
$param = $v['param'];
$order_id = $v['order_id'] . '_' . $v['num'];
$service->use($v['mid'], $v['pid'], $param, true, $order_id);
}
}
}
}
}
#删除日志
public function delLog()
{
#.extend/data/logs/gateway/2023/10/1/xxx.log
$currentTimes=strtotime(date("Y-m-d",time()));
$delTimes=strtotime('-4 months', $currentTimes);
$mounth=date('m',$delTimes);
$year=date("Y",$delTimes);
#TODO 换服务器修改地址
$path='/www/wwwroot/reapi/extend/data/logs/gateway/'.$year.'/'.$mounth;
$result=$this->delete_directory($path);
}
#删除目录文件夹
public function delete_directory($dir)
{
#file_exists(): open_basedir restriction in effect. File(/www/wwroot/bao111/extend/data/logs/gateway/2023/09) is not within the allowed path(s): (/www/wwwroot/bao111/:/tmp/)
if(!file_exists($dir)){
return true;
}
if(!is_dir($dir)){
return unlink($dir);
}
// var_dump(1111);die;
#rm -rf /www/www/bao111/gateway/
foreach(scandir($dir) as $item){
if($item =="."||$item ==".."){
continue;
}
if(!$this->delete_directory($dir .DIRECTORY_SEPARATOR.$item)){
return false;
}
}
return rmdir($dir);
}
# 对需要复冲下单的订单下单 status = -4 可以加入到计划任务中,每分钟执行一次
public function runFcOrder()
{
$orderService = OrderService::instance();
$result = $orderService->getFcOrder();
if ($result) {
$data = $result;
$time = time();
$account = array();
$service = \app\channel\service\ChannelService::instance();
foreach ($data as $k => $v) {
if ($v['param']) {
$v['param'] = json_decode($v['param'], true);
if ($v['param']) {
$param = $v['param'];
$order_id = $v['order_id'] . '_' . $v['num'];
$service->use($v['mid'], $v['pid'], $param, true, $order_id);
}
}
}
}
}
# 对回调再次处理如果渠道回调失败同时商户回调失败将使用该方法进行再次发起回调5分钟执行一次
public function call()
{
$orderService = OrderService::instance();
$data = $orderService->getErrorData();
// var_dump($data);die;
if ($data) {
foreach ($data as $k => $v) {
$this->callSend($v, 1);
}
}
}
# 对回调再次处理如果渠道回调失败同时商户回调失败将使用该方法进行再次发起回调5分钟执行一次
public function callHistory()
{
$orderService = OrderHistoryService::instance();
$data = $orderService->getErrorData();
if ($data) {
foreach ($data as $k => $v) {
$this->callSend($v, 2);
}
}
}
private function callSend($v, $type = 1)
{
OrderService::instance()->callSend($v, $type);
}
# 重启守护进程 每天凌晨1点执行吧
public function restart()
{
sysconf('settingUpdate', 1);
#修改地址
$command = 'python /www/wwwroot/reapi/kill.py 1>/dev/null 2>&1 &';
exec($command);
sysconf('settingUpdate', 2);
}
# 对完成订单进行处理转移成历史订单可以每天凌晨1点进行执行默认迁移前一天的数据
public function finish()
{
$day = input('day');
MerchantService::instance()->init($day);
/*
if (!$day) {
$day = date('Y-m-d', strtotime('-1 day'));
}
*/
$page = 1;
while($this->finishOne($day, $page)) {
$page++;
}
}
public function finishOne($day, $page)
{
$orderService = OrderService::instance();
$orderHistoryService = OrderHistoryService::instance();
$data = $orderService->getFinishData($day, $page);
if ($data) {
foreach ($data as $k => $v) {
# 先插入一条历史数据
$id = $v['id'];
unset($v['id']);
$state = $orderHistoryService->up($v);
if ($state) {
$orderService->del($id);
}
}
return true;
} else {
return false;
}
}
# 对历史的账户流水日志进行迁移 每月1号凌晨3点半执行
# 30 3 1 * * /www/server/php/74/bin/php /www/wwwroot/api/public/index.php gateway/api.task/moveAccountLog > /dev/null
public function moveAccountLog()
{
MerchantLogHistoryService::instance()->handle();
}
# 对商户进行返点
public function rebate()
{
RebateService::instance()->getData();
}
# 计算利润
public function profit()
{
$day = input('day');
if (!$day) {
$day = date('Y-m-d', strtotime('-1 day'));
}
// $orderHistoryService = OrderHistoryService::instance();
$orderHistoryService = OrderLastHistoryService::instance();
$data = $orderHistoryService->getTotalData($day);
// var_dump($data);
if ($data) {
StatService::instance()->handle($day, $data);
}
}
# 计算渠道统计数据
public function statChannel()
{
$day = input('day');
if (!$day) {
$day = date('Y-m-d', strtotime('-1 day'));
}
ChannelAccountService::instance()->stat($day);
}
# 将数据推到子进程处理
public function popenRunError()
{
$num = $this->getErrorNum();
if ($num < 1) {
$command = 'php /www/wwwroot/reapi/public/index.php task/api.task/runErrorOrder 1>/dev/null 2>&1 &';
#修改地址
exec($command);
}
$this->success('操作成功', array('msg' => 'ok'));
}
# 获取当前执行的错误的子进程数量
public function getErrorNum()
{
$command = 'ps -ef | grep task/api.task/runErrorOrder | grep -v grep | wc -l';
$num = exec($command);
return $num;
}
# 对失败的订单进行下单 status = -2 手动执行
public function runErrorOrder()
{
$orderService = OrderService::instance();
$data = $orderService->getErrorOrder();
if ($data) {
foreach ($data as $k => $v) {
if ($v['request']) {
$v['request'] = json_decode($v['request'], true);
// 转到天成
$v['request']['product'] = ProductService::instance()->get($v['request']['merchant'], 12);
print_r($v);
$this->run($v['request']);die;
}
}
}
}
/*
public function not()
{
$MerchantService = MerchantService::instance();
$data['status'] = 3;
$data['cash'] = 100;
$data['order_id'] = 'Q202010284980886664664866';
$data['merchant_order_id'] = '1603849810460200838';
$data['notify_num'] = 1;
$MerchantService->up(5, $data);
}
*/
# 对账统计 每天0点10分统计吧
public function account()
{
$day = input('day');
if (!$day) {
$day = date('Y-m-d', strtotime('-1 day'));
}
\app\setting\service\AccountLogService::instance()->handle($day);
}
public function account_all()
{
$yue = '2021-07-';
for ($i = 1; $i<=30; $i++) {
if ($i < 10) {
$k = '0'. $i;
} else {
$k = $i;
}
$day = $yue . $k;
\app\setting\service\AccountLogService::instance()->handle($day);
}
}
# 获取渠道余额
public function getYue(): void
{
$day = input('day');
if (!$day) {
$day = date('Y-m-d', strtotime('-1 day'));
}
if (isset($_SERVER["argv"][2]) && $_SERVER["argv"][2]) {
$cid = $_SERVER["argv"][2];
} else {
$cid = false;
}
if (!$cid) {
exit('error');
}
//10040_10026_10044
//10040_10026
$log = \app\setting\service\AccountLogService::instance();
$cid = explode('_', $cid);
foreach ($cid as $k => $v) {
$log->getYue($v, $day);
}
}
public function runerrorOther()
{
$time = '1.5';
$orderService = OrderService::instance();
$data = $orderService->getOldDataDiy($time);
// print_r($data);die;
foreach ($data as $k => $v) {
if ($v['request'] && $v['status'] == 1) {
$v['request'] = json_decode($v['request'], true);
\app\order\service\OrderService::instance()->setCallback(1, $v['order_id'], 3);
}
}
}
}