package com.nanjing.water.service.quartz;
|
|
import com.nanjing.water.common.ExecutedResult;
|
import com.nanjing.water.common.PagerResult;
|
import com.nanjing.water.common.enums.EYesOrNo;
|
import com.nanjing.water.common.model.Tuple;
|
import com.nanjing.water.common.util.*;
|
import com.nanjing.water.entity.enums.EHandleStatus;
|
import com.nanjing.water.entity.enums.EState;
|
import com.nanjing.water.entity.request.quartztaskinfo.ReqCreateQuartzTaskInfo;
|
import com.nanjing.water.entity.request.quartztaskinfo.ReqModifyQuartzTaskInfo;
|
import com.nanjing.water.entity.request.quartztaskinfo.ReqRunTaskRightNow;
|
import com.nanjing.water.entity.search.SearchQuartzTaskInfo;
|
import com.nanjing.water.entity.search.SearchQuartzTaskRecord;
|
import com.nanjing.water.repository.impl.QuartzTaskErrorMapperImpl;
|
import com.nanjing.water.repository.impl.QuartzTaskInfoMapperImpl;
|
import com.nanjing.water.repository.impl.QuartzTaskRecordMapperImpl;
|
import com.nanjing.water.repository.po.QuartzTaskErrorPO;
|
import com.nanjing.water.repository.po.QuartzTaskInfoPO;
|
import com.nanjing.water.repository.po.QuartzTaskRecordPO;
|
import com.nanjing.water.repository.vo.QuartzTaskErrorVO;
|
import com.nanjing.water.repository.vo.QuartzTaskInfoVO;
|
import com.nanjing.water.repository.vo.QuartzTaskRecordVO;
|
import com.nanjing.water.service.convert.QuartzTaskInfoConvert;
|
import com.nanjing.water.common.util.*;
|
import org.quartz.*;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.InitializingBean;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Value;
|
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
|
import org.springframework.stereotype.Service;
|
import org.springframework.transaction.annotation.Transactional;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Objects;
|
|
@Service
|
public class QuartzService implements InitializingBean {
|
private static final Logger logger = LoggerFactory.getLogger(QuartzService.class);
|
|
@Value("${quartz.enable}")
|
private Boolean isQuartzEnable;
|
|
@Autowired
|
private QuartzTaskInfoMapperImpl taskDao;
|
@Autowired
|
private QuartzTaskRecordMapperImpl taskRecordsDao;
|
@Autowired
|
private QuartzTaskErrorMapperImpl taskErrorsDao;
|
|
@Autowired
|
private SchedulerFactoryBean schedulerBean;
|
@Autowired
|
private QuartzExecutor executor;
|
|
public ExecutedResult<PagerResult<QuartzTaskInfoVO>> listTask(SearchQuartzTaskInfo search) {
|
// 预处理 mybatis plus 空指针
|
search.setCreateTimeStart(0L);
|
search.setCreateTimeEnd(0L);
|
// 处理创建时间范围-查询参数
|
Tuple<String, String> createTimeRange = ParameterUtil.getTimeRange(search.getCreateTimeRange());
|
if (StringUtil.isNotNullOrEmpty(createTimeRange.getItem1())) {
|
search.setCreateTimeStart(LocalDateTimeUtil.getTimeStamp(createTimeRange.getItem1()).getTime());
|
}
|
if (StringUtil.isNotNullOrEmpty(createTimeRange.getItem2())) {
|
search.setCreateTimeEnd(LocalDateTimeUtil.getTimeStamp(createTimeRange.getItem2()).getTime());
|
}
|
|
PagerResult<QuartzTaskInfoPO> pageList = taskDao.search(search);
|
List<QuartzTaskInfoVO> listVo = new ArrayList<>();
|
List<QuartzTaskInfoPO> list = pageList.getList();
|
if (ListUtil.isNotNullOrEmpty(list)) {
|
// 转换vo
|
listVo = QuartzTaskInfoConvert.INSTANCE.toVo(list);
|
}
|
PagerResult<QuartzTaskInfoVO> result = new PagerResult<>(pageList.getLimit(), pageList.getPage(), pageList.getTotal(), listVo);
|
return ExecutedResult.success(result);
|
}
|
|
public ExecutedResult<String> addTask(ReqCreateQuartzTaskInfo request) {
|
Long now = LocalDateTimeUtil.nowTimeStamp();
|
// 转换po
|
QuartzTaskInfoPO item = QuartzTaskInfoConvert.INSTANCE.toCreate(request);
|
// 任务编号
|
item.setTaskNo("T" + SnowFlakeUtil.getId());
|
// 设置状态
|
item.setStatus(EState.NORMAL.getValue());
|
// 设置记录创建时间
|
item.setCreateTime(now);
|
// 版本号
|
item.setVersion(now);
|
item.setLastTime("");
|
item.setLastStatus(0);
|
item.setLastParameter("");
|
// 是否删除(逻辑删除)初始值
|
item.setIsDelete(EYesOrNo.NO.getValue());
|
|
int result = this.taskDao.insert(item);
|
if (result != 1) {
|
return ExecutedResult.failed("创建任务失败。");
|
}
|
Scheduler scheduler = schedulerBean.getScheduler();
|
try {
|
this.schedule(item, scheduler);
|
} catch (SchedulerException e) {
|
logger.error("addTask", e);
|
}
|
return ExecutedResult.success(item.getTaskNo());
|
}
|
|
@Transactional
|
public ExecutedResult<String> updateTask(ReqModifyQuartzTaskInfo request) throws SchedulerException {
|
// 验证记录是否存在
|
ExecutedResult<QuartzTaskInfoPO> checkExists = this.checkTask4Id(request.getId());
|
if (checkExists.isFailed()) {
|
return ExecutedResult.failed(checkExists.getMsg());
|
}
|
QuartzTaskInfoPO find = checkExists.getData();
|
|
// 转换po
|
QuartzTaskInfoPO item = CopierUtil.mapTo(request, QuartzTaskInfoPO.class);
|
|
int result = taskDao.updateById(item);
|
if (result != 1) {
|
return ExecutedResult.failed("编辑任务失败。");
|
}
|
Scheduler scheduler = schedulerBean.getScheduler();
|
scheduler.deleteJob(new JobKey(find.getTaskNo()));
|
if (Objects.equals(item.getStatus(),EState.NORMAL.getValue())) {
|
this.schedule(find, scheduler);
|
}
|
return ExecutedResult.success();
|
}
|
|
public ExecutedResult<QuartzTaskInfoPO> checkTask4Id(Long id) {
|
QuartzTaskInfoPO exists = taskDao.selectById(id);
|
if (Objects.isNull(exists)) {
|
return ExecutedResult.failed("任务不存在:" + id);
|
}
|
return ExecutedResult.success(exists);
|
}
|
|
public ExecutedResult<QuartzTaskInfoPO> checkTask4Code(String taskNo) {
|
QuartzTaskInfoPO exists = taskDao.get4No(taskNo);
|
if (Objects.isNull(exists)) {
|
return ExecutedResult.failed("任务不存在:" + taskNo);
|
}
|
return ExecutedResult.success(exists);
|
}
|
|
// 启动 或者 暂定定时任务
|
@Transactional
|
public ExecutedResult<String> optionJob(Long taskId) throws SchedulerException {
|
// 验证记录是否存在
|
ExecutedResult<QuartzTaskInfoPO> checkExists = this.checkTask4Id(taskId);
|
if (checkExists.isFailed()) {
|
return ExecutedResult.failed(checkExists.getMsg());
|
}
|
Long now = LocalDateTimeUtil.nowTimeStamp();
|
|
QuartzTaskInfoPO task = checkExists.getData();
|
Integer status = task.getStatus();
|
String taskNo = task.getTaskNo();
|
|
Scheduler scheduler = schedulerBean.getScheduler();
|
QuartzTaskInfoPO item = new QuartzTaskInfoPO();
|
item.setId(task.getId());
|
item.setVersion(task.getVersion());
|
//说明要暂停
|
if (Objects.equals(status, EState.NORMAL.getValue())) {
|
scheduler.deleteJob(new JobKey(taskNo));
|
item.setFrozenTime(now);
|
item.setStatus(EState.DISABLED.getValue());
|
}
|
//说明要启动
|
else if (Objects.equals(status, EState.DISABLED.getValue())) {
|
scheduler.deleteJob(new JobKey(taskNo));
|
this.schedule(task, scheduler);
|
item.setUnfrozenTime(now);
|
item.setStatus(EState.NORMAL.getValue());
|
}
|
taskDao.updateById(item);
|
logger.info("taskNo={},taskName={},scheduleRule={},任务{}成功", task.getTaskNo(), task.getTaskName(), task.getSchedulerRule(), Objects.equals(status, EState.NORMAL.getValue()) ? "启动" : "暂停");
|
return ExecutedResult.success();
|
}
|
|
/**
|
* 立即执行一次任务
|
* @param request 请求参数
|
*/
|
public ExecutedResult<String> runTaskRightNow(ReqRunTaskRightNow request) {
|
// 验证记录是否存在
|
ExecutedResult<QuartzTaskInfoPO> checkExists = this.checkTask4Id(request.getId());
|
if (checkExists.isFailed()) {
|
return ExecutedResult.failed(checkExists.getMsg());
|
}
|
QuartzTaskInfoPO task = checkExists.getData();
|
String taskNo = task.getTaskNo();
|
|
String sendType = task.getSendType();
|
String executorName = task.getExecutor();
|
String url = task.getUrl();
|
String executeParameter = request.getExecuteParameter();
|
logger.info("定时任务被执行:taskNo={},executor={},sendType={},url={},executeParameter={}", taskNo, executor, sendType, url, executeParameter);
|
QuartzTaskRecordPO records = null;
|
try {
|
//保存定时任务的执行记录
|
records = this.addTaskRecords(task, executeParameter);
|
if (Objects.isNull(records) || !Objects.equals(EHandleStatus.AWAITING.getValue(), records.getTaskStatus())) {
|
logger.info("taskNo={}执行定时任务失败--->>保存执行记录失败", taskNo);
|
return ExecutedResult.failed("单次执行任务失败");
|
}
|
Long recordId = records.getId();
|
String taskName = task.getTaskName();
|
ThreadPoolUtil.getDefaultPool().execute(() -> {
|
ExecutedResult<String> executeResult = SpringUtil.getBean(QuartzExecutor.class).execute(executorName, executeParameter);
|
logger.info("{}-{}执行结果: {}", taskNo, taskName, SerializeUtil.toJson(executeResult));
|
// 更改record表的执行状态
|
this.doneRecordById(recordId, executeResult);
|
});
|
|
|
// 发送执行任务的消息
|
// if (ResultEnum.HTTP.getMessage().equals(sendType)) {
|
// try {
|
// HttpClientUtil.doPost(url, "text/json", executeParameter);
|
// logger.info("");
|
// } catch (Exception ex) {
|
// logger.error("");
|
// atomicInteger.incrementAndGet();
|
// throw ex;
|
// }
|
// } else if (ResultEnum.KAFKA.getMessage().equals(sendType)) {
|
// try {
|
// String message = new StringBuffer(taskNo).append(":").append(executeParameter).toString();
|
// this.sendMessage(message);
|
// } catch (Exception ex) {
|
// logger.error("");
|
// atomicInteger.incrementAndGet();
|
// throw ex;
|
// }
|
// }
|
return ExecutedResult.success();
|
} catch (Exception ex) {
|
logger.error("单次执行任务 runTaskRightNow", ex);
|
if (Objects.nonNull(records)) {
|
this.addTaskErrorRecord(records.getId(), taskNo + ":" + ex.getMessage(), ExceptionUtil.getDetails(ex));
|
}
|
ExecutedResult<String> result = ExecutedResult.failed("单次执行任务失败." + ex.getMessage());
|
if (Objects.nonNull(records)) {
|
// 更改record表的执行状态
|
this.doneRecordById(records.getId(), result);
|
}
|
return result;
|
}
|
}
|
|
// 定时任务执行记录
|
public ExecutedResult<PagerResult<QuartzTaskRecordVO>> taskRecords(SearchQuartzTaskRecord search) {
|
// 预处理 mybatis plus 空指针
|
search.setCreateTimeStart(0L);
|
search.setCreateTimeEnd(0L);
|
search.setExecuteTimeStart(0L);
|
search.setExecuteTimeEnd(0L);
|
// 处理创建时间范围-查询参数
|
Tuple<String, String> createTimeRange = ParameterUtil.getTimeRange(search.getCreateTimeRange());
|
if (StringUtil.isNotNullOrEmpty(createTimeRange.getItem1())) {
|
search.setCreateTimeStart(LocalDateTimeUtil.getTimeStamp(createTimeRange.getItem1()).getTime());
|
}
|
if (StringUtil.isNotNullOrEmpty(createTimeRange.getItem2())) {
|
search.setCreateTimeEnd(LocalDateTimeUtil.getTimeStamp(createTimeRange.getItem2()).getTime());
|
}
|
// 处理执行时间范围-查询参数
|
Tuple<String, String> executeTimeRange = ParameterUtil.getTimeRange(search.getExecuteTimeRange());
|
if (StringUtil.isNotNullOrEmpty(executeTimeRange.getItem1())) {
|
search.setExecuteTimeStart(LocalDateTimeUtil.getTimeStamp(executeTimeRange.getItem1()).getTime());
|
}
|
if (StringUtil.isNotNullOrEmpty(executeTimeRange.getItem2())) {
|
search.setExecuteTimeEnd(LocalDateTimeUtil.getTimeStamp(executeTimeRange.getItem2()).getTime());
|
}
|
|
PagerResult<QuartzTaskRecordPO> pageList = taskRecordsDao.search(search);
|
List<QuartzTaskRecordVO> listVo = new ArrayList<>();
|
List<QuartzTaskRecordPO> list = pageList.getList();
|
if (ListUtil.isNotNullOrEmpty(list)) {
|
// 转换vo
|
listVo = CopierUtil.mapTo(list, QuartzTaskRecordVO.class);
|
}
|
PagerResult<QuartzTaskRecordVO> result = new PagerResult<>(pageList.getLimit(), pageList.getPage(), pageList.getTotal(), listVo);
|
return ExecutedResult.success(result);
|
}
|
|
// 定时任务错误详情
|
public ExecutedResult<QuartzTaskErrorVO> detailTaskErrors(String recordId) {
|
QuartzTaskErrorPO error = taskErrorsDao.get4RecordId(recordId);
|
if (Objects.isNull(error)) {
|
return ExecutedResult.success(null);
|
}
|
return ExecutedResult.success(CopierUtil.mapTo(error, QuartzTaskErrorVO.class));
|
}
|
|
/**
|
* Invoked by the containing {@code BeanFactory} after it has set all bean properties
|
* and satisfied {link BeanFactory Aware}, {@code ApplicationContextAware} etc.
|
* <p>This method allows the bean instance to perform validation of its overall
|
* configuration and final initialization when all bean properties have been set.
|
*/
|
@Override
|
public void afterPropertiesSet() {
|
if (!this.isQuartzEnable) {
|
return;
|
}
|
this.initLoadOnlineTasks();
|
}
|
|
/**
|
* 初始化加载定时任务
|
*
|
* @throws Exception
|
*/
|
public void initLoadOnlineTasks() {
|
List<QuartzTaskInfoPO> list = taskDao.list4Status(EState.NORMAL.getValue());
|
if (ListUtil.isNullOrEmpty(list)) {
|
logger.info("没有需要初始化加载的定时任务");
|
return;
|
}
|
Scheduler scheduler = schedulerBean.getScheduler();
|
for (QuartzTaskInfoPO task : list) {
|
try {
|
this.schedule(task, scheduler);
|
} catch (Exception e) {
|
logger.error("系统初始化加载定时任务:taskno={},taskname={}失败原因exception={}", task.getTaskNo(), task.getTaskName(), e);
|
}
|
}
|
}
|
|
private void schedule(QuartzTaskInfoPO task, Scheduler scheduler) throws SchedulerException {
|
TriggerKey triggerKey = TriggerKey.triggerKey(task.getTaskNo(), Scheduler.DEFAULT_GROUP);
|
JobDetail jobDetail = JobBuilder.newJob(QuartzMainJobFactory.class).withDescription(task.getTaskName()).withIdentity(task.getTaskNo(), Scheduler.DEFAULT_GROUP).build();
|
JobDataMap jobDataMap = jobDetail.getJobDataMap();
|
jobDataMap.put("id", task.getId().toString());
|
jobDataMap.put("taskNo", task.getTaskNo());
|
jobDataMap.put("executor", task.getExecutor());
|
jobDataMap.put("sendType", task.getSendType());
|
jobDataMap.put("url", task.getUrl());
|
jobDataMap.put("executeParameter", task.getExecuteParameter());
|
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getSchedulerRule());
|
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withDescription(task.getTaskName()).withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build();
|
scheduler.scheduleJob(jobDetail, cronTrigger);
|
logger.info("taskNo={},taskName={},scheduleRule={} load to quartz success!", task.getTaskNo(), task.getTaskName(), task.getSchedulerRule());
|
}
|
|
public QuartzTaskRecordPO addTaskRecords(QuartzTaskInfoPO task, String executeParameter) {
|
Long now = LocalDateTimeUtil.nowTimeStamp();
|
QuartzTaskRecordPO quartzTaskRecords = null;
|
try {
|
quartzTaskRecords = new QuartzTaskRecordPO();
|
quartzTaskRecords.setTaskNo(task.getTaskNo());
|
quartzTaskRecords.setTaskName(task.getTaskName());
|
quartzTaskRecords.setExecuteParameter(executeParameter);
|
quartzTaskRecords.setStartTime(now);
|
quartzTaskRecords.setEndTime(0L);
|
quartzTaskRecords.setExecuteTimes(0L);
|
quartzTaskRecords.setTaskStatus(EHandleStatus.AWAITING.getValue());
|
quartzTaskRecords.setFailReason("");
|
quartzTaskRecords.setIsDelete(EYesOrNo.NO.getValue());
|
quartzTaskRecords.setCreateTime(now);
|
taskRecordsDao.insert(quartzTaskRecords);
|
logger.info("taskNo={},taskName={}添加执行记录表成功", task.getTaskNo(), task.getTaskName());
|
return quartzTaskRecords;
|
} catch (Exception ex) {
|
logger.error("添加执行记录异常 addTaskRecords", ex);
|
return null;
|
}
|
}
|
|
public Integer addTaskErrorRecord(Long id, String errorKey, String errorValue) {
|
QuartzTaskErrorPO taskErrors = new QuartzTaskErrorPO();
|
taskErrors.setTaskExecuteRecordId(String.valueOf(id));
|
taskErrors.setErrorKey(errorKey);
|
taskErrors.setErrorValue(errorValue);
|
taskErrors.setIsDelete(EYesOrNo.NO.getValue());
|
taskErrors.setCreateTime(LocalDateTimeUtil.nowTimeStamp());
|
return taskErrorsDao.insert(taskErrors);
|
}
|
|
/**
|
* 完成执行记录
|
* @param id 执行记录id
|
* @param result 执行结果
|
*/
|
public Integer doneRecordById(Long id, ExecutedResult<String> result) {
|
QuartzTaskRecordPO find = taskRecordsDao.selectById(id);
|
|
QuartzTaskRecordPO records = new QuartzTaskRecordPO();
|
records.setId(id);
|
records.setEndTime(LocalDateTimeUtil.nowTimeStamp());
|
records.setExecuteTimes(records.getEndTime() - find.getStartTime());
|
records.setFailReason("SUCCESS");
|
if (result.isSuccess()) {
|
records.setTaskStatus(EHandleStatus.SUCCESS.getValue());
|
} else {
|
records.setTaskStatus(EHandleStatus.FAILED.getValue());
|
records.setFailReason(result.getMsg());
|
}
|
|
QuartzTaskInfoPO task = taskDao.get4No(find.getTaskNo());
|
// 更新任务的上次执行结果
|
QuartzTaskInfoPO item = new QuartzTaskInfoPO();
|
item.setId(task.getId());
|
item.setLastTime(LocalDateTimeUtil.nowDateTimeFullStr());
|
item.setLastStatus(records.getTaskStatus());
|
item.setLastParameter(find.getExecuteParameter());
|
taskDao.updateById(item);
|
|
return taskRecordsDao.updateById(records);
|
}
|
}
|