org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler定时任务调度线程池
CREATE TABLE `sys_job` ( `id` bigint(20) NOT NULL COMMENT '任务key', `job_name`
varchar(64) NOT NULL COMMENT '任务名称', `bean_class` varchar(128) NOT NULL COMMENT
'类路径', `cron_expression` varchar(64) NOT NULL COMMENT 'cron表达式', `status`
tinyint(1) NOT NULL COMMENT '状态值 @JobStatusEnum 详见具体枚举类', `is_deleted` tinyint(1
) DEFAULT '0' COMMENT '删除标识 1是 0否', `create_time` datetime DEFAULT NULL, `
update_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) USING BTREE ) ENGINE=
InnoDB DEFAULT CHARSET=utf8mb4; @Configuration @Slf4j public class
SchedulingConfigure { @Bean public ThreadPoolTaskScheduler
threadPoolTaskScheduler() { log.info("开始创建定时任务调度线程池"); ThreadPoolTaskScheduler
threadPoolTaskScheduler= new ThreadPoolTaskScheduler(); threadPoolTaskScheduler.
setPoolSize(20); threadPoolTaskScheduler.setThreadNamePrefix("schedule-task-");
threadPoolTaskScheduler.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskScheduler.setAwaitTerminationSeconds(60); log.info(
"创建定时任务调度线程池完成!"); return threadPoolTaskScheduler; } } public enum JobStatusEnum
{ /** * 未加入调度器 */ NOT_SCHEDULE(0, "未加入调度器"), /** * 加入调度器,但未运行 */
SCHEDULED_BUT_NOT_RUNNING(1, "加入调度器,但未运行"), /** * 从调度器中已删除 */ DELETED(2,
"从调度器中已删除"), ; private Integer status; private String detail; JobStatusEnum(
Integer status, String detail) { this.status = status; this.detail = detail; }
public Integer getStatus() { return status; } public void setStatus(Integer
status) { this.status = status; } public String getDetail() { return detail; }
public void setDetail(String detail) { this.detail = detail; } } @Component
@Slf4j public class ScheduledJobService { private final ReentrantLock lock = new
ReentrantLock(); @Autowired private ThreadPoolTaskScheduler
threadPoolTaskScheduler; @Autowired private SysJobService jobService; @Autowired
private SpringBeanUtils springBeanUtils; /** * 已经加入调度器的任务map */ private final
ConcurrentHashMap<Long, ScheduledFuture<?>> scheduledFutureMap = new
ConcurrentHashMap<>(); /** * 初始化启动任务 * * @param sysJobs 数据库任务集合 */ public void
initAllJob(List<SysJob> sysJobs) { if (CollectionUtils.isEmpty(sysJobs)) {
return; } for (SysJob sysJob : sysJobs) { if (JobStatusEnum.NOT_SCHEDULE.
getStatus().equals(sysJob.getStatus()) || JobStatusEnum.DELETED.getStatus().
equals(sysJob.getStatus()) || this.isScheduled(sysJob.getId())) { //
任务初始化状态或已删除或已加载到调度器中 continue; } // 将任务加入调度器 this.doScheduleJob(sysJob); } }
/** * 启动任务 * * @param jobId job主键id */ public void start(Long jobId) { log.info(
"启动任务:-> jobId_{}", jobId); // 加入调度器 schedule(jobId); log.info("启动任务结束:->
jobId_{}", jobId); // 更新任务状态 jobService.updateJobStatus(jobId, JobStatusEnum.
SCHEDULED_BUT_NOT_RUNNING.getStatus()); } /** * 停止任务 * * @param jobId job主键id */
public void stop(Long jobId) { log.info("停止任务:-> jobId_{}", jobId); // 取消任务
cancel(jobId); log.info("停止任务结束:-> jobId_{}", jobId); // 更新表中任务状态为已停止 jobService
.updateJobStatus(jobId, JobStatusEnum.NOT_SCHEDULE.getStatus()); } /** * 移除任务 *
* @param jobId job主键id */ public void remove(Long jobId) { log.info("移除任务:->
jobId_{}", jobId); // 取消任务 cancel(jobId); log.info("移除任务结束:-> jobId_{}", jobId);
// 更新表中任务状态为已删除 jobService.updateJobStatus(jobId, JobStatusEnum.DELETED.
getStatus()); } /** * 取消 * * @param jobId 工作id */ private void cancel(Long jobId
) { // 任务是否存在 if (scheduledFutureMap.containsKey(jobId)) { ScheduledFuture<?>
scheduledFuture= scheduledFutureMap.get(jobId); if (!scheduledFuture.isCancelled
()) { // 取消调度 scheduledFuture.cancel(true); } } } private void schedule(Long
jobId) { // 添加锁,只允许单个线程访问,防止任务启动多次 lock.lock(); try { if (isScheduled(jobId)) {
log.error("任务jobId_{}已经加入调度器,无需重复操作", jobId); return; } // 通过jobKey查询jobBean对象
SysJob sysJob = jobService.getById(jobId); // 启动定时任务 doScheduleJob(sysJob); }
finally { // 释放锁资源 lock.unlock(); } } /** * 执行启动任务 * * @param sysJob 任务实体类对象 */
private void doScheduleJob(SysJob sysJob) { Long jobId = sysJob.getId(); String
beanClass= sysJob.getBeanClass(); String jobName = sysJob.getJobName(); String
cron= sysJob.getCronExpression(); // 从Spring中获取目标的job业务实现类 ScheduledJob
scheduledJob= parseFrom(beanClass); if (scheduledJob == null) { return; }
scheduledJob.setJobId(jobId); scheduledJob.setJobName(jobName); ScheduledFuture<
?> scheduledFuture = threadPoolTaskScheduler.schedule(scheduledJob,
triggerContext-> { CronTrigger cronTrigger = new CronTrigger(cron); return
cronTrigger.nextExecutionTime(triggerContext); }); log.info("任务加入调度器 ->
jobId:{},jobName:{}", jobId, jobName); // 将启动的任务放入map assert scheduledFuture !=
null; scheduledFutureMap.put(jobId, scheduledFuture); } /** * 任务是否已经进入调度器 * *
@param jobId 任务主键key * @return {@link Boolean} */ private Boolean isScheduled(
Long jobId) { if (scheduledFutureMap.containsKey(jobId)) { return !
scheduledFutureMap.get(jobId).isCancelled(); } return false; } private
ScheduledJob parseFrom(String beanClass) { try { Class<?> clazz = Class.forName(
beanClass); return (ScheduledJob) springBeanUtils.getBean(clazz); } catch (
ClassNotFoundException e) { e.printStackTrace(); } return null; } } @Component
public class SpringBeanUtils implements ApplicationContextAware { private static
ApplicationContext applicationContext; @Override public void
setApplicationContext(ApplicationContext applicationContext) throws
BeansException { SpringBeanUtils.applicationContext = applicationContext; } /**
* 获取applicationContext */ public static ApplicationContext getApplicationContext
() { return applicationContext; } /** * 通过name获取 Bean. */ public Object getBean(
String name) { return getApplicationContext().getBean(name); } /** *
通过class获取Bean. */ public <T> T getBean(Class<T> clazz) { return
getApplicationContext().getBean(clazz); } /** * 通过name,以及Clazz返回指定的Bean */
public <T> T getBean(String name, Class<T> clazz) { return getApplicationContext
().getBean(name, clazz); } } @Data public abstract class ScheduledJob implements
Runnable { /** * 任务主键id */ private Long jobId; /** * 任务名 */ private String
jobName; } @Component public class SchedulerTestDemo extends ScheduledJob {
@Override public void run() { System.out.println("我是定时任务要执行的类.."); System.out.
println(SchedulerTestDemo.class.getName() + ":" + LocalDateTime.now()); } } /**
* 项目启动时,将数据库中job定时任务加载 */ @Component public class GrapeApplicationListener {
private final ScheduledJobService scheduledJobService; private final
ISysJobService sysJobService; public GrapeApplicationListener(ISysJobService
sysJobService, ScheduledJobService scheduledJobService) { this.sysJobService =
sysJobService; this.scheduledJobService = scheduledJobService; } @PostConstruct
public void initStartJob() { // 初始化job scheduledJobService.initAllJob(
sysJobService.list()); } } @SpringBootApplication(scanBasePackages = {
"com.example.grape"}) @MapperScan("com.example.grape.dao.mapper")
@EnableScheduling public class GrapeApplication { public static void main(String
[] args) { SpringApplication.run(GrapeApplication.class, args); } }

技术
下载桌面版
GitHub
百度网盘(提取码:draw)
Gitee
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:766591547
关注微信