484 lines
14 KiB
PHP
Raw Normal View History

2024-12-04 12:33:47 +08:00
<?php
namespace app\task\controller;
2024-12-04 12:33:47 +08:00
set_time_limit(0);
ini_set('memory_limit','512M');
ini_set('default_socket_timeout', -1); //不超时
use app\channel\service\ProductService;
use app\gateway\service\RedisService;
use app\merchant\service\MerchantLogHistoryService;
use app\merchant\service\OrderHistoryService;
2024-12-04 12:33:47 +08:00
use app\merchant\service\OrderLastHistoryService;
use app\merchant\service\OrderLastweekHistoryService;
2024-12-04 12:33:47 +08:00
use app\merchant\service\OrderService;
use app\merchant\service\RebateService;
use app\setting\service\ChannelAccountService;
use app\setting\service\StatService;
2024-12-04 12:33:47 +08:00
/**
* 任务
* Class Test
* @package app\task
2024-12-04 12:33:47 +08:00
*/
class Task extends Core
{
# 是否检测数据
protected $check = false;
# redis连接
protected $redis = false;
public function cron()
{
while(1) {
$this->submit();
}
}
# 对submit提交的数据进行处理
public function submit()
{
try {
$settingUpdate = sysconf('settingUpdate');
if ($settingUpdate == 1) {
return true;
}
# 进程模式 1多进程 2单进程
$queue = sysconf('queue');
if ($queue == 1) {
# 获取当前执行的进程数量
$num = $this->getNum();
if ($num >= 1000) {
# 等会儿再执行
sleep(30);
return true;
}
$popen = true;
} else {
$popen = false;
}
$this->redis();
$data = $this->redis->pop('submit');
$this->run($data, $popen);
return true;
} catch (\Exception $e) {
$this->redis = false;
return true;
}
}
protected function redis()
{
if (!$this->redis) {
$this->redis = RedisService::getInstance();
}
}
protected function run($data, $popen = false)
{
if (isset($data['merchant']) && $data['merchant'] && isset($data['product']) && $data['product'] && isset($data['param']) && $data['param'] && isset($data['order']) && $data['order']) {
if ($popen) {
$this->popen($data['order']);
} else {
$this->channel($data['merchant'], $data['product'], false, $data['param'], $data['order']);
}
}
}
# 将数据推到子进程处理
public function popen($order)
{
#默认
$command = 'php /www/wwwroot/reapi/public/index.php task/task/runOne ' . $order . ' 1>/dev/null 2>&1 &';
2024-12-04 12:33:47 +08:00
exec($command);
}
# 获取当前执行的子进程数量
public function getNum()
{
$command = 'ps -ef | grep task/task/runOne | grep -v grep | wc -l';
2024-12-04 12:33:47 +08:00
$num = exec($command);
return $num;
}
# 对一条数据进行处理
public function runOne()
{
if (isset($_SERVER["argv"][2]) && $_SERVER["argv"][2]) {
$order = $_SERVER["argv"][2];
} else {
$order = input('order');
}
if (!$order) {
exit('error');
}
$nq = false;
if (strstr($order, '_nq')) {
$order = str_replace('_nq', '', $order);
$nq = true;
}
$orderService = OrderService::instance();
$info = $orderService->get($order);
if ($info && $info['status'] <= 1 && $info['request']) {
$info['request'] = json_decode($info['request'], true);
if ($nq) {
$info['request']['param']['s'] = 1;
}
$this->run($info['request']);
}
exit('ok');
}
# 检测当前订单表里的status=1的订单如果超过时间5分钟还有status=1就直接执行可以每5分钟执行一次取代上述的runAll方法
public function runOther()
{
$orderService = OrderService::instance();
$data = $orderService->getOldData();
foreach ($data as $k => $v) {
if ($v['request'] && $v['status'] == 1) {
$v['request'] = json_decode($v['request'], true);
$this->run($v['request'], true);
}
}
}
# 对延迟下单的订单下单 status = -1 可以加入到计划任务中,每分钟执行一次
public function runTempOrder()
{
$orderService = OrderService::instance();
$result = $orderService->getTempOrder();
if ($result) {
$data = $result;
$time = time();
$account = array();
foreach ($data as $k => $v) {
if ($v['request']) {
$v['request'] = json_decode($v['request'], true);
if (isset($v['request']['time']) && $time > $v['request']['time']) {
if ($v['account']) {
if (isset($account[$v['account']]) && $account[$v['account']]) {
continue;
} else {
$account[$v['account']] = $v['account'];
}
}
$this->run($v['request'], true);
}
}
}
}
}
# 对超过数量的订单延迟下单 status = -3 可以加入到计划任务中,每分钟执行一次
public function runTimeOrder()
{
# 获取设置了延迟处理的渠道
$where['status'] = 1;
$where['is_deleted'] = 0;
$data = \app\channel\service\ChannelService::instance()->db()->where($where)->whereRaw('order_limit > 0')->select()->toArray();
if ($data) {
$orderService = OrderService::instance();
foreach ($data as $k1 => $v1) {
$result = $orderService->getTimeOrder($v1['id'], false, false, $v1['order_limit']);
if ($result) {
foreach ($result as $k => $v) {
if ($v['request']) {
$v['request'] = json_decode($v['request'], true);
if ($v['request']) {
$v['request']['order'] .= '_nq';
$this->run($v['request'], true);
}
}
}
}
}
}
}
# 对暂停的的订单重新下单 status = -5
public function runStopOrderQueue()
2024-12-04 12:33:47 +08:00
{
$orderService = OrderService::instance();
$result = $orderService->getStopOrderQueue();
2024-12-04 12:33:47 +08:00
if ($result) {
$data = $result;
$time = time();
$account = array();
$service = \app\channel\service\ChannelService::instance();
foreach ($data as $k => $v) {
if ($v['param']) {
$v['param'] = json_decode($v['param'], true);
if ($v['param']) {
$param = $v['param'];
$order_id = $v['order_id'] . '_' . $v['num'];
$service->use($v['mid'], $v['pid'], $param, true, $order_id);
}
}
}
}
}
# 对暂停的的订单重新下单 status = -5
public function runStopOrder()
{
$orderService = OrderService::instance();
$result = $orderService->getStopOrder();
if ($result) {
$data = $result;
foreach ($data as $k => $v) {
$id = $orderService->db()->where('id', $v['id'])->update(['status' => -8]);
}
}
}
2024-12-04 12:33:47 +08:00
# 对需要复冲下单的订单下单 status = -4 可以加入到计划任务中,每分钟执行一次
public function runFcOrder()
{
$orderService = OrderService::instance();
$result = $orderService->getFcOrder();
if ($result) {
$data = $result;
$time = time();
$account = array();
$service = \app\channel\service\ChannelService::instance();
foreach ($data as $k => $v) {
if ($v['param']) {
$v['param'] = json_decode($v['param'], true);
if ($v['param']) {
$param = $v['param'];
$order_id = $v['order_id'] . '_' . $v['num'];
$service->use($v['mid'], $v['pid'], $param, true, $order_id);
}
}
}
}
}
# 对回调再次处理如果渠道回调失败同时商户回调失败将使用该方法进行再次发起回调5分钟执行一次
public function call()
{
$orderService = OrderService::instance();
$data = $orderService->getErrorData();
if ($data) {
foreach ($data as $k => $v) {
$this->callSend($v, 1);
}
}
}
# 对回调再次处理如果渠道回调失败同时商户回调失败将使用该方法进行再次发起回调5分钟执行一次
public function callHistory()
{
$orderService = OrderHistoryService::instance();
$data = $orderService->getErrorData();
if ($data) {
foreach ($data as $k => $v) {
$this->callSend($v, 2);
}
}
}
private function callSend($v, $type = 1)
{
OrderService::instance()->callSend($v, $type);
}
# 重启守护进程 每天凌晨1点执行吧
public function restart()
{
sysconf('settingUpdate', 1);
#修改地址
$command = 'python /www/wwwroot/reapi/kill.py 1>/dev/null 2>&1 &';
exec($command);
sysconf('settingUpdate', 2);
}
# 对历史的账户流水日志进行迁移 每月1号凌晨3点半执行
# 30 3 1 * * /www/server/php/74/bin/php /www/wwwroot/api/public/index.php gateway/api.task/moveAccountLog > /dev/null
public function moveAccountLog()
{
MerchantLogHistoryService::instance()->handle();
}
# 对商户进行返点
public function rebate()
{
RebateService::instance()->getData();
}
# 计算利润
public function profit()
{
$day = input('day');
if (!$day) {
$day = date('Y-m-d', strtotime('-1 day'));
}
// $orderHistoryService = OrderLastHistoryService::instance();
$orderHistoryService = OrderLastweekHistoryService::instance();
2024-12-04 12:33:47 +08:00
$data = $orderHistoryService->getTotalData($day);
// var_dump($data);
if ($data) {
StatService::instance()->handle($day, $data);
}
}
# 计算渠道统计数据
public function statChannel()
{
$day = input('day');
if (!$day) {
$day = date('Y-m-d', strtotime('-1 day'));
}
ChannelAccountService::instance()->stat($day);
}
# 将数据推到子进程处理
public function popenRunError()
{
$num = $this->getErrorNum();
if ($num < 1) {
$command = 'php /www/wwwroot/reapi/public/index.php task/task/runErrorOrder 1>/dev/null 2>&1 &';
2024-12-04 12:33:47 +08:00
#修改地址
exec($command);
}
$this->success('操作成功', array('msg' => 'ok'));
}
# 获取当前执行的错误的子进程数量
public function getErrorNum()
{
$command = 'ps -ef | grep task/task/runErrorOrder | grep -v grep | wc -l';
2024-12-04 12:33:47 +08:00
$num = exec($command);
return $num;
}
# 对失败的订单进行下单 status = -2 手动执行
public function runErrorOrder()
{
$orderService = OrderService::instance();
$data = $orderService->getErrorOrder();
if ($data) {
foreach ($data as $k => $v) {
if ($v['request']) {
$v['request'] = json_decode($v['request'], true);
// 转到天成
$v['request']['product'] = ProductService::instance()->get($v['request']['merchant'], 12);
print_r($v);
$this->run($v['request']);die;
}
}
}
}
# 对账统计 每天0点10分统计吧
public function account()
{
$day = input('day');
if (!$day) {
$day = date('Y-m-d', strtotime('-1 day'));
}
\app\setting\service\AccountLogService::instance()->handle($day);
}
public function account_all()
{
$yue = '2021-07-';
for ($i = 1; $i<=30; $i++) {
if ($i < 10) {
$k = '0'. $i;
} else {
$k = $i;
}
$day = $yue . $k;
\app\setting\service\AccountLogService::instance()->handle($day);
}
}
# 获取渠道余额
public function getYue(): void
{
$day = input('day');
if (!$day) {
$day = date('Y-m-d', strtotime('-1 day'));
}
// var_dump($_SERVER["argv"]);die;
2024-12-04 12:33:47 +08:00
if (isset($_SERVER["argv"][2]) && $_SERVER["argv"][2]) {
$cid = $_SERVER["argv"][2];
} else {
$cid = false;
}
2024-12-08 19:20:29 +08:00
$cid = 11;
2024-12-04 12:33:47 +08:00
if (!$cid) {
exit('error');
}
2024-12-08 19:20:29 +08:00
2024-12-04 12:33:47 +08:00
//10040_10026_10044
//10040_10026
$log = \app\setting\service\AccountLogService::instance();
$cid = explode('_', $cid);
foreach ($cid as $k => $v) {
$log->getYue($v, $day);
}
}
public function runerrorOther()
{
$time = '1.5';
$orderService = OrderService::instance();
$data = $orderService->getOldDataDiy($time);
// print_r($data);die;
foreach ($data as $k => $v) {
if ($v['request'] && $v['status'] == 1) {
$v['request'] = json_decode($v['request'], true);
\app\order\service\OrderService::instance()->setCallback(1, $v['order_id'], 3);
}
}
}
#
2024-12-04 12:33:47 +08:00
}