当前位置: 首页 > news >正文

dolphinscheduler1.3版本源码分析---API模块

API模块主要功能

api模块主要提供对外接口,界面上的对流程、定时的管理等相关操作都是通过调用API模块的接口实现的,API模块直接跟数据库打交道,不会与master和worker模块交互。

相关接口概览

同时由于api模块集成了swagger,我们可以通过访问  http://xxxx/dolphinscheduler/doc.html来查看详细的API说明

流程定时调度逻辑

当我们创建好流程,并对流程增加定时管理后,dolphin是如何定时去调度流程的呢?该部分逻辑其实也是在API模块,分析如下:

 

1.现在我们创建一个流程定义:并将流程上线

创建流程源码逻辑在  

ProcessDefinitionController下的createProcessDefinition方法中,主要是在数据库表t_ds_process_definition中增加了一条记录

上线流程的源码逻辑在:

ProcessDefinitionController下的releaseProcessDefinition方法中,主要是更改记录的状态字段

2、对该流程创建一条定时记录并将定时记录上线,如下图:

创建定时记录的源码逻辑在SchedulerController类中的createSchedule中;上线定时记录的源码逻辑在SchedulerController的online中。

3、上线动作触发的调度

dolphin是通过quartz实现流程的调度的,我们知道quartz实现调度需要三个对象,分别是scheduler,jobDetail和trigger。

上线的源码里其实就是创建jobDetail和trigger并将这俩个对象加入到scheduler中的过程。

 try {
            switch (scheduleStatus) {
                case ONLINE: {
                    logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
                    setSchedule(project.getId(), id);
                    break;
                }
                case OFFLINE: {
                    logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers);
                    deleteSchedule(project.getId(), id);
                    break;
                }
                default: {
                    putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString());
                    return result;
                }
            }
        } catch (Exception e) {
            result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure");
            throw new RuntimeException(result.get(Constants.MSG).toString());
        }

上图的中的判断如果为上线的操作则会走setSchedule方法,schedule方法如下

 public void setSchedule(int projectId, int scheduleId) throws RuntimeException{
        logger.info("set schedule, project id: {}, scheduleId: {}", projectId, scheduleId);


        Schedule schedule = processService.querySchedule(scheduleId);
        if (schedule == null) {
            logger.warn("process schedule info not exists");
            return;
        }

        Date startDate = schedule.getStartTime();
        Date endDate = schedule.getEndTime();

        String jobName = QuartzExecutors.buildJobName(scheduleId);
        String jobGroupName = QuartzExecutors.buildJobGroupName(projectId);

        Map<String, Object> dataMap = QuartzExecutors.buildDataMap(projectId, scheduleId, schedule);

        QuartzExecutors.getInstance().addJob(ProcessScheduleJob.class, jobName, jobGroupName, startDate, endDate,
                schedule.getCrontab(), dataMap);

    }

最后一行的addJob为重点,前面的代码主要生成job的一些基本信息。下图为addJob方法中向scheduler中增加jobDetail的代码

 public void addJob(Class<? extends Job> clazz,String jobName,String jobGroupName,Date startDate, Date endDate,
                                 String cronExpression,
                                 Map<String, Object> jobDataMap) {
    lock.writeLock().lock();
    try {

      JobKey jobKey = new JobKey(jobName, jobGroupName);
      JobDetail jobDetail;
      //add a task (if this task already exists, return this task directly)
      if (scheduler.checkExists(jobKey)) {

        jobDetail = scheduler.getJobDetail(jobKey);
        if (jobDataMap != null) {
          jobDetail.getJobDataMap().putAll(jobDataMap);
        }
      } else {
        jobDetail = newJob(clazz).withIdentity(jobKey).build();

        if (jobDataMap != null) {
          jobDetail.getJobDataMap().putAll(jobDataMap);
        }

        scheduler.addJob(jobDetail, false, true);

        logger.info("Add job, job name: {}, group name: {}",
                jobName, jobGroupName);
      }

下面的代码为向scheduler中增加trigger的方法,trigger对象使用了定时记录中的cron表达式字段的值

 CronTrigger cronTrigger = newTrigger().withIdentity(triggerKey).startAt(startDate).endAt(endDate)
              .withSchedule(cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing())
              .forJob(jobDetail).build();

      if (scheduler.checkExists(triggerKey)) {
          // updateProcessInstance scheduler trigger when scheduler cycle changes
          CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
          String oldCronExpression = oldCronTrigger.getCronExpression();

          if (!StringUtils.equalsIgnoreCase(cronExpression,oldCronExpression)) {
            // reschedule job trigger
            scheduler.rescheduleJob(triggerKey, cronTrigger);
            logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
                    jobName, jobGroupName, cronExpression, startDate, endDate);
          }
      } else {
        scheduler.scheduleJob(cronTrigger);
        logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
                jobName, jobGroupName, cronExpression, startDate, endDate);
      }

至此任务和trigger都加入到scheduler中了,quartz会给我们管理具体的调度逻辑,当到cron表达式指定的时间时,会自动触发调用ProcessScheduleJob中的execute方法。

exccute方法中的主要逻辑其实就是生产一条command记录,保存到t_ds_command表中

     ProcessDefinition processDefinition = getProcessService().findProcessDefineById(schedule.getProcessDefinitionId());
        // release state : online/offline
        ReleaseState releaseState = processDefinition.getReleaseState();
        if (processDefinition == null || releaseState == ReleaseState.OFFLINE) {
            logger.warn("process definition does not exist in db or offline,need not to create command, projectId:{}, processId:{}", projectId, scheduleId);
            return;
        }

        Command command = new Command();
        command.setCommandType(CommandType.SCHEDULER);
        command.setExecutorId(schedule.getUserId());
        command.setFailureStrategy(schedule.getFailureStrategy());
        command.setProcessDefinitionId(schedule.getProcessDefinitionId());
        command.setScheduleTime(scheduledFireTime);
        command.setStartTime(fireTime);
        command.setWarningGroupId(schedule.getWarningGroupId());
        String workerGroup = StringUtils.isEmpty(schedule.getWorkerGroup()) ? Constants.DEFAULT_WORKER_GROUP : schedule.getWorkerGroup();
        command.setWorkerGroup(workerGroup);
        command.setWarningType(schedule.getWarningType());
        command.setProcessInstancePriority(schedule.getProcessInstancePriority());

        getProcessService().createCommand(command);

至此我们就分析完了API模块的流程调度相关的逻辑,总结如下:

相关文章:

  • dolphinscheduler1.3版本源码分析---MASTER模块
  • gradle 作为编译工具 lombok 死活不生效解决
  • java lambda groupingby 结果的value为对象的一个属性
  • class.getTypeParameters()方法
  • flatmap使用
  • mybatis-plus QueryWrapper 添加limit
  • linux ls ll命令中文乱码
  • skywalking和JPA冲突问题解决
  • Gradle 构建jar包,依赖和配置分离
  • springCloud2020.0.2+springboot2.4.5接入consul注册中心和服务中心
  • 警告: ParameterizedTypeImpl是内部专用 API, 解决方式
  • springCloud-gateway按照服务名动态路由的改造(一)
  • springCloud-gateway按照服务名动态路由的改造(二)
  • springCloud-gateway按照服务名动态路由的改造(三)
  • 前端传入数字,后端用枚举接收统一处理
  • [微信小程序] 使用ES6特性Class后出现编译异常
  • Apache Zeppelin在Apache Trafodion上的可视化
  • ECS应用管理最佳实践
  • flask接收请求并推入栈
  • input的行数自动增减
  • Magento 1.x 中文订单打印乱码
  • scrapy学习之路4(itemloder的使用)
  • weex踩坑之旅第一弹 ~ 搭建具有入口文件的weex脚手架
  • 分布式任务队列Celery
  • 服务器从安装到部署全过程(二)
  • 利用jquery编写加法运算验证码
  • 普通函数和构造函数的区别
  • 算法-图和图算法
  • 看到一个关于网页设计的文章分享过来!大家看看!
  • Unity3D - 异步加载游戏场景与异步加载游戏资源进度条 ...
  • 哈罗单车融资几十亿元,蚂蚁金服与春华资本加持 ...
  • 回归生活:清理微信公众号
  • 移动端高清、多屏适配方案
  • #### go map 底层结构 ####
  • #stm32整理(一)flash读写
  • (0)Nginx 功能特性
  • (10)ATF MMU转换表
  • (2.2w字)前端单元测试之Jest详解篇
  • (2009.11版)《网络管理员考试 考前冲刺预测卷及考点解析》复习重点
  • (Python) SOAP Web Service (HTTP POST)
  • (webRTC、RecordRTC):navigator.mediaDevices undefined
  • (附源码)springboot高校宿舍交电费系统 毕业设计031552
  • (附源码)计算机毕业设计SSM疫情下的学生出入管理系统
  • (转)JVM内存分配 -Xms128m -Xmx512m -XX:PermSize=128m -XX:MaxPermSize=512m
  • (最简单,详细,直接上手)uniapp/vue中英文多语言切换
  • .MyFile@waifu.club.wis.mkp勒索病毒数据怎么处理|数据解密恢复
  • .net core 源码_ASP.NET Core之Identity源码学习
  • .NET/C# 在代码中测量代码执行耗时的建议(比较系统性能计数器和系统时间)...
  • .NET处理HTTP请求
  • .net获取当前url各种属性(文件名、参数、域名 等)的方法
  • .NET开发不可不知、不可不用的辅助类(三)(报表导出---终结版)
  • .net开发时的诡异问题,button的onclick事件无效
  • .NET企业级应用架构设计系列之技术选型
  • .NET中两种OCR方式对比
  • .Net转Java自学之路—SpringMVC框架篇六(异常处理)