package com.lunhan.xxx.service.quartz; import com.lunhan.xxx.common.ExecutedResult; import com.lunhan.xxx.common.util.ExceptionUtil; import com.lunhan.xxx.common.util.SerializeUtil; import com.lunhan.xxx.entity.enums.EHandleStatus; import com.lunhan.xxx.entity.enums.EState; import com.lunhan.xxx.repository.po.QuartzTaskInfoPO; import com.lunhan.xxx.repository.po.QuartzTaskRecordPO; import org.quartz.DisallowConcurrentExecution; import org.quartz.Job; import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.util.Objects; @DisallowConcurrentExecution public class QuartzMainJobFactory implements Job { private static Logger logger = LoggerFactory.getLogger(QuartzMainJobFactory.class); @Autowired private QuartzExecutor executor; @Autowired private QuartzService quartzService; @Override public void execute(JobExecutionContext jobExecutionContext) { JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap(); String id = jobDataMap.getString("id"); String taskNo = jobDataMap.getString("taskNo"); String executorName = jobDataMap.getString("executor"); String sendType = jobDataMap.getString("sendType"); String url = jobDataMap.getString("url"); String executeParameter = jobDataMap.getString("executeParameter"); logger.info("定时任务被执行:taskNo={},executor={},sendType={},url={},executeParameter={}", taskNo, executor, sendType, url, executeParameter); //QuartzService quartzService = SpringUtil.getBean(QuartzService.class); QuartzTaskRecordPO records = null; ExecutedResult result = ExecutedResult.failed("未知错误"); try { ExecutedResult checkTask4Code = quartzService.checkTask4Code(taskNo); if (checkTask4Code.isFailed()) { logger.info("taskNo={}执行任务失败.找不到任务信息", taskNo); return; } QuartzTaskInfoPO task = checkTask4Code.getData(); //保存定时任务的执行记录 records = quartzService.addTaskRecords(task, executeParameter); if (null == records || !Objects.equals(EHandleStatus.AWAITING.getValue(), records.getTaskStatus())) { logger.info("taskNo={}执行定时任务失败: 保存执行记录失败", taskNo); return; } if (!Objects.equals(task.getStatus(), EState.NORMAL.getValue())) { logger.info("taskNo={}执行定时任务失败: 任务状态异常{}", taskNo, task.getStatus()); result = ExecutedResult.failed("定时任务状态异常." + task.getStatus()); return; } result = executor.execute(executorName, executeParameter); logger.info("{}-{}执行结果: {}", taskNo, task.getTaskName(), SerializeUtil.toJson(result)); // 发送执行任务的消息 // if (ResultEnum.HTTP.getMessage().equals(sendType)) { // try { // String result = HttpClientUtil.doPost(url, "text/json", executeParameter); // logger.info("taskNo={},sendType={}执行结果result{}", taskNo, sendType, result); // if (StringUtils.isEmpty(result)) { // throw new RuntimeException("taskNo=" + taskNo + "http方式返回null"); // } // } catch (Exception ex) { // logger.error(""); // throw ex; // } // } else if (ResultEnum.KAFKA.getMessage().equals(sendType)) { // try { // String message = new StringBuffer(taskNo).append(":").append(id).append(":").append(executeParameter).toString(); // quartzService.sendMessage(message); // logger.info("taskNo={},sendType={}推送至kafka成功", taskNo, sendType); // } catch (Exception ex) { // logger.error(""); // throw ex; // } // } } catch (Exception ex) { logger.error("任务调度QuartzMainJobFactory.execute", ex); if (Objects.nonNull(records)) { quartzService.addTaskErrorRecord(records.getId(), taskNo + ":" + ex.getMessage(), ExceptionUtil.getDetails(ex)); } result = ExecutedResult.failed(ExceptionUtil.getDetails(ex)); } finally { if (Objects.nonNull(records)) { // 更改record表的执行状态 quartzService.doneRecordById(records.getId(), result); } } } }