package com.nanjing.water.service.quartz; import com.nanjing.water.common.ExecutedResult; import com.nanjing.water.common.PagerResult; import com.nanjing.water.common.enums.EYesOrNo; import com.nanjing.water.common.model.Tuple; import com.nanjing.water.common.util.*; import com.nanjing.water.entity.enums.EHandleStatus; import com.nanjing.water.entity.enums.EState; import com.nanjing.water.entity.request.quartztaskinfo.ReqCreateQuartzTaskInfo; import com.nanjing.water.entity.request.quartztaskinfo.ReqModifyQuartzTaskInfo; import com.nanjing.water.entity.request.quartztaskinfo.ReqRunTaskRightNow; import com.nanjing.water.entity.search.SearchQuartzTaskInfo; import com.nanjing.water.entity.search.SearchQuartzTaskRecord; import com.nanjing.water.repository.impl.QuartzTaskErrorMapperImpl; import com.nanjing.water.repository.impl.QuartzTaskInfoMapperImpl; import com.nanjing.water.repository.impl.QuartzTaskRecordMapperImpl; import com.nanjing.water.repository.po.QuartzTaskErrorPO; import com.nanjing.water.repository.po.QuartzTaskInfoPO; import com.nanjing.water.repository.po.QuartzTaskRecordPO; import com.nanjing.water.repository.vo.QuartzTaskErrorVO; import com.nanjing.water.repository.vo.QuartzTaskInfoVO; import com.nanjing.water.repository.vo.QuartzTaskRecordVO; import com.nanjing.water.service.convert.QuartzTaskInfoConvert; import com.nanjing.water.common.util.*; import org.quartz.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.quartz.SchedulerFactoryBean; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import java.util.ArrayList; import java.util.List; import java.util.Objects; @Service public class QuartzService implements InitializingBean { private static final Logger logger = LoggerFactory.getLogger(QuartzService.class); @Value("${quartz.enable}") private Boolean isQuartzEnable; @Autowired private QuartzTaskInfoMapperImpl taskDao; @Autowired private QuartzTaskRecordMapperImpl taskRecordsDao; @Autowired private QuartzTaskErrorMapperImpl taskErrorsDao; @Autowired private SchedulerFactoryBean schedulerBean; @Autowired private QuartzExecutor executor; public ExecutedResult> listTask(SearchQuartzTaskInfo search) { // 预处理 mybatis plus 空指针 search.setCreateTimeStart(0L); search.setCreateTimeEnd(0L); // 处理创建时间范围-查询参数 Tuple createTimeRange = ParameterUtil.getTimeRange(search.getCreateTimeRange()); if (StringUtil.isNotNullOrEmpty(createTimeRange.getItem1())) { search.setCreateTimeStart(LocalDateTimeUtil.getTimeStamp(createTimeRange.getItem1()).getTime()); } if (StringUtil.isNotNullOrEmpty(createTimeRange.getItem2())) { search.setCreateTimeEnd(LocalDateTimeUtil.getTimeStamp(createTimeRange.getItem2()).getTime()); } PagerResult pageList = taskDao.search(search); List listVo = new ArrayList<>(); List list = pageList.getList(); if (ListUtil.isNotNullOrEmpty(list)) { // 转换vo listVo = QuartzTaskInfoConvert.INSTANCE.toVo(list); } PagerResult result = new PagerResult<>(pageList.getLimit(), pageList.getPage(), pageList.getTotal(), listVo); return ExecutedResult.success(result); } public ExecutedResult addTask(ReqCreateQuartzTaskInfo request) { Long now = LocalDateTimeUtil.nowTimeStamp(); // 转换po QuartzTaskInfoPO item = QuartzTaskInfoConvert.INSTANCE.toCreate(request); // 任务编号 item.setTaskNo("T" + SnowFlakeUtil.getId()); // 设置状态 item.setStatus(EState.NORMAL.getValue()); // 设置记录创建时间 item.setCreateTime(now); // 版本号 item.setVersion(now); item.setLastTime(""); item.setLastStatus(0); item.setLastParameter(""); // 是否删除(逻辑删除)初始值 item.setIsDelete(EYesOrNo.NO.getValue()); int result = this.taskDao.insert(item); if (result != 1) { return ExecutedResult.failed("创建任务失败。"); } Scheduler scheduler = schedulerBean.getScheduler(); try { this.schedule(item, scheduler); } catch (SchedulerException e) { logger.error("addTask", e); } return ExecutedResult.success(item.getTaskNo()); } @Transactional public ExecutedResult updateTask(ReqModifyQuartzTaskInfo request) throws SchedulerException { // 验证记录是否存在 ExecutedResult checkExists = this.checkTask4Id(request.getId()); if (checkExists.isFailed()) { return ExecutedResult.failed(checkExists.getMsg()); } QuartzTaskInfoPO find = checkExists.getData(); // 转换po QuartzTaskInfoPO item = CopierUtil.mapTo(request, QuartzTaskInfoPO.class); int result = taskDao.updateById(item); if (result != 1) { return ExecutedResult.failed("编辑任务失败。"); } Scheduler scheduler = schedulerBean.getScheduler(); scheduler.deleteJob(new JobKey(find.getTaskNo())); if (Objects.equals(item.getStatus(),EState.NORMAL.getValue())) { this.schedule(find, scheduler); } return ExecutedResult.success(); } public ExecutedResult checkTask4Id(Long id) { QuartzTaskInfoPO exists = taskDao.selectById(id); if (Objects.isNull(exists)) { return ExecutedResult.failed("任务不存在:" + id); } return ExecutedResult.success(exists); } public ExecutedResult checkTask4Code(String taskNo) { QuartzTaskInfoPO exists = taskDao.get4No(taskNo); if (Objects.isNull(exists)) { return ExecutedResult.failed("任务不存在:" + taskNo); } return ExecutedResult.success(exists); } // 启动 或者 暂定定时任务 @Transactional public ExecutedResult optionJob(Long taskId) throws SchedulerException { // 验证记录是否存在 ExecutedResult checkExists = this.checkTask4Id(taskId); if (checkExists.isFailed()) { return ExecutedResult.failed(checkExists.getMsg()); } Long now = LocalDateTimeUtil.nowTimeStamp(); QuartzTaskInfoPO task = checkExists.getData(); Integer status = task.getStatus(); String taskNo = task.getTaskNo(); Scheduler scheduler = schedulerBean.getScheduler(); QuartzTaskInfoPO item = new QuartzTaskInfoPO(); item.setId(task.getId()); item.setVersion(task.getVersion()); //说明要暂停 if (Objects.equals(status, EState.NORMAL.getValue())) { scheduler.deleteJob(new JobKey(taskNo)); item.setFrozenTime(now); item.setStatus(EState.DISABLED.getValue()); } //说明要启动 else if (Objects.equals(status, EState.DISABLED.getValue())) { scheduler.deleteJob(new JobKey(taskNo)); this.schedule(task, scheduler); item.setUnfrozenTime(now); item.setStatus(EState.NORMAL.getValue()); } taskDao.updateById(item); logger.info("taskNo={},taskName={},scheduleRule={},任务{}成功", task.getTaskNo(), task.getTaskName(), task.getSchedulerRule(), Objects.equals(status, EState.NORMAL.getValue()) ? "启动" : "暂停"); return ExecutedResult.success(); } /** * 立即执行一次任务 * @param request 请求参数 */ public ExecutedResult runTaskRightNow(ReqRunTaskRightNow request) { // 验证记录是否存在 ExecutedResult checkExists = this.checkTask4Id(request.getId()); if (checkExists.isFailed()) { return ExecutedResult.failed(checkExists.getMsg()); } QuartzTaskInfoPO task = checkExists.getData(); String taskNo = task.getTaskNo(); String sendType = task.getSendType(); String executorName = task.getExecutor(); String url = task.getUrl(); String executeParameter = request.getExecuteParameter(); logger.info("定时任务被执行:taskNo={},executor={},sendType={},url={},executeParameter={}", taskNo, executor, sendType, url, executeParameter); QuartzTaskRecordPO records = null; try { //保存定时任务的执行记录 records = this.addTaskRecords(task, executeParameter); if (Objects.isNull(records) || !Objects.equals(EHandleStatus.AWAITING.getValue(), records.getTaskStatus())) { logger.info("taskNo={}执行定时任务失败--->>保存执行记录失败", taskNo); return ExecutedResult.failed("单次执行任务失败"); } Long recordId = records.getId(); String taskName = task.getTaskName(); ThreadPoolUtil.getDefaultPool().execute(() -> { ExecutedResult executeResult = SpringUtil.getBean(QuartzExecutor.class).execute(executorName, executeParameter); logger.info("{}-{}执行结果: {}", taskNo, taskName, SerializeUtil.toJson(executeResult)); // 更改record表的执行状态 this.doneRecordById(recordId, executeResult); }); // 发送执行任务的消息 // if (ResultEnum.HTTP.getMessage().equals(sendType)) { // try { // HttpClientUtil.doPost(url, "text/json", executeParameter); // logger.info(""); // } catch (Exception ex) { // logger.error(""); // atomicInteger.incrementAndGet(); // throw ex; // } // } else if (ResultEnum.KAFKA.getMessage().equals(sendType)) { // try { // String message = new StringBuffer(taskNo).append(":").append(executeParameter).toString(); // this.sendMessage(message); // } catch (Exception ex) { // logger.error(""); // atomicInteger.incrementAndGet(); // throw ex; // } // } return ExecutedResult.success(); } catch (Exception ex) { logger.error("单次执行任务 runTaskRightNow", ex); if (Objects.nonNull(records)) { this.addTaskErrorRecord(records.getId(), taskNo + ":" + ex.getMessage(), ExceptionUtil.getDetails(ex)); } ExecutedResult result = ExecutedResult.failed("单次执行任务失败." + ex.getMessage()); if (Objects.nonNull(records)) { // 更改record表的执行状态 this.doneRecordById(records.getId(), result); } return result; } } // 定时任务执行记录 public ExecutedResult> taskRecords(SearchQuartzTaskRecord search) { // 预处理 mybatis plus 空指针 search.setCreateTimeStart(0L); search.setCreateTimeEnd(0L); search.setExecuteTimeStart(0L); search.setExecuteTimeEnd(0L); // 处理创建时间范围-查询参数 Tuple createTimeRange = ParameterUtil.getTimeRange(search.getCreateTimeRange()); if (StringUtil.isNotNullOrEmpty(createTimeRange.getItem1())) { search.setCreateTimeStart(LocalDateTimeUtil.getTimeStamp(createTimeRange.getItem1()).getTime()); } if (StringUtil.isNotNullOrEmpty(createTimeRange.getItem2())) { search.setCreateTimeEnd(LocalDateTimeUtil.getTimeStamp(createTimeRange.getItem2()).getTime()); } // 处理执行时间范围-查询参数 Tuple executeTimeRange = ParameterUtil.getTimeRange(search.getExecuteTimeRange()); if (StringUtil.isNotNullOrEmpty(executeTimeRange.getItem1())) { search.setExecuteTimeStart(LocalDateTimeUtil.getTimeStamp(executeTimeRange.getItem1()).getTime()); } if (StringUtil.isNotNullOrEmpty(executeTimeRange.getItem2())) { search.setExecuteTimeEnd(LocalDateTimeUtil.getTimeStamp(executeTimeRange.getItem2()).getTime()); } PagerResult pageList = taskRecordsDao.search(search); List listVo = new ArrayList<>(); List list = pageList.getList(); if (ListUtil.isNotNullOrEmpty(list)) { // 转换vo listVo = CopierUtil.mapTo(list, QuartzTaskRecordVO.class); } PagerResult result = new PagerResult<>(pageList.getLimit(), pageList.getPage(), pageList.getTotal(), listVo); return ExecutedResult.success(result); } // 定时任务错误详情 public ExecutedResult detailTaskErrors(String recordId) { QuartzTaskErrorPO error = taskErrorsDao.get4RecordId(recordId); if (Objects.isNull(error)) { return ExecutedResult.success(null); } return ExecutedResult.success(CopierUtil.mapTo(error, QuartzTaskErrorVO.class)); } /** * Invoked by the containing {@code BeanFactory} after it has set all bean properties * and satisfied {link BeanFactory Aware}, {@code ApplicationContextAware} etc. *

This method allows the bean instance to perform validation of its overall * configuration and final initialization when all bean properties have been set. */ @Override public void afterPropertiesSet() { if (!this.isQuartzEnable) { return; } this.initLoadOnlineTasks(); } /** * 初始化加载定时任务 * * @throws Exception */ public void initLoadOnlineTasks() { List list = taskDao.list4Status(EState.NORMAL.getValue()); if (ListUtil.isNullOrEmpty(list)) { logger.info("没有需要初始化加载的定时任务"); return; } Scheduler scheduler = schedulerBean.getScheduler(); for (QuartzTaskInfoPO task : list) { try { this.schedule(task, scheduler); } catch (Exception e) { logger.error("系统初始化加载定时任务:taskno={},taskname={}失败原因exception={}", task.getTaskNo(), task.getTaskName(), e); } } } private void schedule(QuartzTaskInfoPO task, Scheduler scheduler) throws SchedulerException { TriggerKey triggerKey = TriggerKey.triggerKey(task.getTaskNo(), Scheduler.DEFAULT_GROUP); JobDetail jobDetail = JobBuilder.newJob(QuartzMainJobFactory.class).withDescription(task.getTaskName()).withIdentity(task.getTaskNo(), Scheduler.DEFAULT_GROUP).build(); JobDataMap jobDataMap = jobDetail.getJobDataMap(); jobDataMap.put("id", task.getId().toString()); jobDataMap.put("taskNo", task.getTaskNo()); jobDataMap.put("executor", task.getExecutor()); jobDataMap.put("sendType", task.getSendType()); jobDataMap.put("url", task.getUrl()); jobDataMap.put("executeParameter", task.getExecuteParameter()); CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(task.getSchedulerRule()); CronTrigger cronTrigger = TriggerBuilder.newTrigger().withDescription(task.getTaskName()).withIdentity(triggerKey).withSchedule(cronScheduleBuilder).build(); scheduler.scheduleJob(jobDetail, cronTrigger); logger.info("taskNo={},taskName={},scheduleRule={} load to quartz success!", task.getTaskNo(), task.getTaskName(), task.getSchedulerRule()); } public QuartzTaskRecordPO addTaskRecords(QuartzTaskInfoPO task, String executeParameter) { Long now = LocalDateTimeUtil.nowTimeStamp(); QuartzTaskRecordPO quartzTaskRecords = null; try { quartzTaskRecords = new QuartzTaskRecordPO(); quartzTaskRecords.setTaskNo(task.getTaskNo()); quartzTaskRecords.setTaskName(task.getTaskName()); quartzTaskRecords.setExecuteParameter(executeParameter); quartzTaskRecords.setStartTime(now); quartzTaskRecords.setEndTime(0L); quartzTaskRecords.setExecuteTimes(0L); quartzTaskRecords.setTaskStatus(EHandleStatus.AWAITING.getValue()); quartzTaskRecords.setFailReason(""); quartzTaskRecords.setIsDelete(EYesOrNo.NO.getValue()); quartzTaskRecords.setCreateTime(now); taskRecordsDao.insert(quartzTaskRecords); logger.info("taskNo={},taskName={}添加执行记录表成功", task.getTaskNo(), task.getTaskName()); return quartzTaskRecords; } catch (Exception ex) { logger.error("添加执行记录异常 addTaskRecords", ex); return null; } } public Integer addTaskErrorRecord(Long id, String errorKey, String errorValue) { QuartzTaskErrorPO taskErrors = new QuartzTaskErrorPO(); taskErrors.setTaskExecuteRecordId(String.valueOf(id)); taskErrors.setErrorKey(errorKey); taskErrors.setErrorValue(errorValue); taskErrors.setIsDelete(EYesOrNo.NO.getValue()); taskErrors.setCreateTime(LocalDateTimeUtil.nowTimeStamp()); return taskErrorsDao.insert(taskErrors); } /** * 完成执行记录 * @param id 执行记录id * @param result 执行结果 */ public Integer doneRecordById(Long id, ExecutedResult result) { QuartzTaskRecordPO find = taskRecordsDao.selectById(id); QuartzTaskRecordPO records = new QuartzTaskRecordPO(); records.setId(id); records.setEndTime(LocalDateTimeUtil.nowTimeStamp()); records.setExecuteTimes(records.getEndTime() - find.getStartTime()); records.setFailReason("SUCCESS"); if (result.isSuccess()) { records.setTaskStatus(EHandleStatus.SUCCESS.getValue()); } else { records.setTaskStatus(EHandleStatus.FAILED.getValue()); records.setFailReason(result.getMsg()); } QuartzTaskInfoPO task = taskDao.get4No(find.getTaskNo()); // 更新任务的上次执行结果 QuartzTaskInfoPO item = new QuartzTaskInfoPO(); item.setId(task.getId()); item.setLastTime(LocalDateTimeUtil.nowDateTimeFullStr()); item.setLastStatus(records.getTaskStatus()); item.setLastParameter(find.getExecuteParameter()); taskDao.updateById(item); return taskRecordsDao.updateById(records); } }