elkers
2025-04-07 bd9808a81b1eafdc75a42c1c9904408dc888061d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package com.nanjing.water.service.quartz;
 
import com.nanjing.water.common.ExecutedResult;
import com.nanjing.water.common.util.ExceptionUtil;
import com.nanjing.water.common.util.SerializeUtil;
import com.nanjing.water.entity.enums.EHandleStatus;
import com.nanjing.water.entity.enums.EState;
import com.nanjing.water.repository.po.QuartzTaskInfoPO;
import com.nanjing.water.repository.po.QuartzTaskRecordPO;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobDataMap;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
 
import java.util.Objects;
 
@DisallowConcurrentExecution
public class QuartzMainJobFactory implements Job {
    private static Logger logger = LoggerFactory.getLogger(QuartzMainJobFactory.class);
 
    @Autowired
    private QuartzExecutor executor;
    @Autowired
    private QuartzService quartzService;
 
    @Override
    public void execute(JobExecutionContext jobExecutionContext) {
        JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();
        String id = jobDataMap.getString("id");
        String taskNo = jobDataMap.getString("taskNo");
        String executorName = jobDataMap.getString("executor");
        String sendType = jobDataMap.getString("sendType");
        String url = jobDataMap.getString("url");
        String executeParameter = jobDataMap.getString("executeParameter");
        logger.info("定时任务被执行:taskNo={},executor={},sendType={},url={},executeParameter={}", taskNo, executor, sendType, url, executeParameter);
        //QuartzService quartzService = SpringUtil.getBean(QuartzService.class);
        QuartzTaskRecordPO records = null;
        ExecutedResult<String> result = ExecutedResult.failed("未知错误");
        try {
            ExecutedResult<QuartzTaskInfoPO> checkTask4Code = quartzService.checkTask4Code(taskNo);
            if (checkTask4Code.isFailed()) {
                logger.info("taskNo={}执行任务失败.找不到任务信息", taskNo);
                return;
            }
            QuartzTaskInfoPO task = checkTask4Code.getData();
            //保存定时任务的执行记录
            records = quartzService.addTaskRecords(task, executeParameter);
            if (null == records || !Objects.equals(EHandleStatus.AWAITING.getValue(), records.getTaskStatus())) {
                logger.info("taskNo={}执行定时任务失败: 保存执行记录失败", taskNo);
                return;
            }
            if (!Objects.equals(task.getStatus(), EState.NORMAL.getValue())) {
                logger.info("taskNo={}执行定时任务失败: 任务状态异常{}", taskNo, task.getStatus());
                result = ExecutedResult.failed("定时任务状态异常." + task.getStatus());
                return;
            }
 
            result = executor.execute(executorName, executeParameter);
            logger.info("{}-{}执行结果: {}", taskNo, task.getTaskName(), SerializeUtil.toJson(result));
 
            // 发送执行任务的消息
//            if (ResultEnum.HTTP.getMessage().equals(sendType)) {
//                try {
//                    String result = HttpClientUtil.doPost(url, "text/json", executeParameter);
//                    logger.info("taskNo={},sendType={}执行结果result{}", taskNo, sendType, result);
//                    if (StringUtils.isEmpty(result)) {
//                        throw new RuntimeException("taskNo=" + taskNo + "http方式返回null");
//                    }
//                } catch (Exception ex) {
//                    logger.error("");
//                    throw ex;
//                }
//            } else if (ResultEnum.KAFKA.getMessage().equals(sendType)) {
//                try {
//                    String message = new StringBuffer(taskNo).append(":").append(id).append(":").append(executeParameter).toString();
//                    quartzService.sendMessage(message);
//                    logger.info("taskNo={},sendType={}推送至kafka成功", taskNo, sendType);
//                } catch (Exception ex) {
//                    logger.error("");
//                    throw ex;
//                }
//            }
        } catch (Exception ex) {
            logger.error("任务调度QuartzMainJobFactory.execute", ex);
            if (Objects.nonNull(records)) {
                quartzService.addTaskErrorRecord(records.getId(), taskNo + ":" + ex.getMessage(), ExceptionUtil.getDetails(ex));
            }
            result = ExecutedResult.failed(ExceptionUtil.getDetails(ex));
        } finally {
            if (Objects.nonNull(records)) {
                // 更改record表的执行状态
                quartzService.doneRecordById(records.getId(), result);
            }
        }
    }
}