package com.lunhan.xxx.service.quartz;
|
|
import com.lunhan.xxx.common.ExecutedResult;
|
import com.lunhan.xxx.common.util.ExceptionUtil;
|
import com.lunhan.xxx.common.util.SerializeUtil;
|
import com.lunhan.xxx.entity.enums.EHandleStatus;
|
import com.lunhan.xxx.entity.enums.EState;
|
import com.lunhan.xxx.repository.po.QuartzTaskInfoPO;
|
import com.lunhan.xxx.repository.po.QuartzTaskRecordPO;
|
import org.quartz.DisallowConcurrentExecution;
|
import org.quartz.Job;
|
import org.quartz.JobDataMap;
|
import org.quartz.JobExecutionContext;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
import java.util.Objects;
|
|
@DisallowConcurrentExecution
|
public class QuartzMainJobFactory implements Job {
|
private static Logger logger = LoggerFactory.getLogger(QuartzMainJobFactory.class);
|
|
@Autowired
|
private QuartzExecutor executor;
|
@Autowired
|
private QuartzService quartzService;
|
|
@Override
|
public void execute(JobExecutionContext jobExecutionContext) {
|
JobDataMap jobDataMap = jobExecutionContext.getMergedJobDataMap();
|
String id = jobDataMap.getString("id");
|
String taskNo = jobDataMap.getString("taskNo");
|
String executorName = jobDataMap.getString("executor");
|
String sendType = jobDataMap.getString("sendType");
|
String url = jobDataMap.getString("url");
|
String executeParameter = jobDataMap.getString("executeParameter");
|
logger.info("定时任务被执行:taskNo={},executor={},sendType={},url={},executeParameter={}", taskNo, executor, sendType, url, executeParameter);
|
//QuartzService quartzService = SpringUtil.getBean(QuartzService.class);
|
QuartzTaskRecordPO records = null;
|
ExecutedResult<String> result = ExecutedResult.failed("未知错误");
|
try {
|
ExecutedResult<QuartzTaskInfoPO> checkTask4Code = quartzService.checkTask4Code(taskNo);
|
if (checkTask4Code.isFailed()) {
|
logger.info("taskNo={}执行任务失败.找不到任务信息", taskNo);
|
return;
|
}
|
QuartzTaskInfoPO task = checkTask4Code.getData();
|
//保存定时任务的执行记录
|
records = quartzService.addTaskRecords(task, executeParameter);
|
if (null == records || !Objects.equals(EHandleStatus.AWAITING.getValue(), records.getTaskStatus())) {
|
logger.info("taskNo={}执行定时任务失败: 保存执行记录失败", taskNo);
|
return;
|
}
|
if (!Objects.equals(task.getStatus(), EState.NORMAL.getValue())) {
|
logger.info("taskNo={}执行定时任务失败: 任务状态异常{}", taskNo, task.getStatus());
|
result = ExecutedResult.failed("定时任务状态异常." + task.getStatus());
|
return;
|
}
|
|
result = executor.execute(executorName, executeParameter);
|
logger.info("{}-{}执行结果: {}", taskNo, task.getTaskName(), SerializeUtil.toJson(result));
|
|
// 发送执行任务的消息
|
// if (ResultEnum.HTTP.getMessage().equals(sendType)) {
|
// try {
|
// String result = HttpClientUtil.doPost(url, "text/json", executeParameter);
|
// logger.info("taskNo={},sendType={}执行结果result{}", taskNo, sendType, result);
|
// if (StringUtils.isEmpty(result)) {
|
// throw new RuntimeException("taskNo=" + taskNo + "http方式返回null");
|
// }
|
// } catch (Exception ex) {
|
// logger.error("");
|
// throw ex;
|
// }
|
// } else if (ResultEnum.KAFKA.getMessage().equals(sendType)) {
|
// try {
|
// String message = new StringBuffer(taskNo).append(":").append(id).append(":").append(executeParameter).toString();
|
// quartzService.sendMessage(message);
|
// logger.info("taskNo={},sendType={}推送至kafka成功", taskNo, sendType);
|
// } catch (Exception ex) {
|
// logger.error("");
|
// throw ex;
|
// }
|
// }
|
} catch (Exception ex) {
|
logger.error("任务调度QuartzMainJobFactory.execute", ex);
|
if (Objects.nonNull(records)) {
|
quartzService.addTaskErrorRecord(records.getId(), taskNo + ":" + ex.getMessage(), ExceptionUtil.getDetails(ex));
|
}
|
result = ExecutedResult.failed(ExceptionUtil.getDetails(ex));
|
} finally {
|
if (Objects.nonNull(records)) {
|
// 更改record表的执行状态
|
quartzService.doneRecordById(records.getId(), result);
|
}
|
}
|
}
|
}
|