当前位置: 首页 > news >正文

分布式任务调度Schedulerx2.0工作原理

一、前言

Schedulerx2.0是阿里巴巴开发的一个基于akka的分布式任务调度框架,提供分布式执行、多种任务类型、统一日志等功能,用户只要依赖schedulerx-worker这个jar包,通过schedulerx2.0提供的编程模型,简单几行代码就能实现一套高可靠可运维的分布式执行引擎。本文主要讲解schedulerx-worker的工作原理

二、整体架构

Schedulerx2.0是中心化的调度框架,包括Server和Worker。Server负责任务的触发和调度,通过分发引擎提交任务给Worker。Worker负责任务的执行。

Worker分为TaskMaster, Container, Processor三层:

分为TaskMaster, Container, Processor三层:

  • TaskMaster:类似于yarn的AppMaster,支持可扩展的分布式执行框架,进行整个jobInstance的生命周期管理、container的资源管理,同时还有failover等能力。默认实现StandaloneTaskMaster(单机执行),BroadcastTaskMaster(广播执行),MapTaskMaster(并行计算、内存网格、网格计算),MapReduceTaskMaster(并行计算、内存网格、网格计算)。
  • Container:执行业务逻辑的容器框架,支持线程/进程/docker/actor等。
  • Processor:业务逻辑框架,不同的processor表示不同的任务类型。

三、任务执行流程-worker部分

Server触发任务调度执行,提交任务给Worker,Worker通过JobInstanceActor接收Server提交的任务:

public class JobInstanceActor extends UntypedActor {
    private TaskMasterPool masterPool;
    private LogCollector logCollector;
    private static final Logger LOGGER = LogFactory.getLogger(JobInstanceActor.class);

    public JobInstanceActor() {
        this.masterPool = TaskMasterPool.INSTANCE;
        this.logCollector = LogCollectorFactory.get();
    }

    public void onReceive(Object obj) throws Throwable {
        if (obj instanceof ServerSubmitJobInstanceRequest) {
            this.handleSubmitJobInstance((ServerSubmitJobInstanceRequest)obj);
        } else if (obj instanceof ServerKillJobInstanceRequest) {
            this.handleKillJobInstance((ServerKillJobInstanceRequest)obj);
        } else if (obj instanceof ServerRetryTasksRequest) {
            this.handleRetryTasks((ServerRetryTasksRequest)obj);
        } else if (obj instanceof ServerKillTaskRequest) {
            this.handleKillTask((ServerKillTaskRequest)obj);
        } else if (obj instanceof ServerCheckTaskMasterRequest) {
            this.handCheckTaskMaster((ServerCheckTaskMasterRequest)obj);
        } else if (obj instanceof MasterNotifyWorkerPullRequest) {
            this.handleInitPull((MasterNotifyWorkerPullRequest)obj);
        } else if (obj instanceof ServerThreadDumpRequest) {
            this.handleThreadDump((ServerThreadDumpRequest)obj);
        } else if (obj instanceof ServerPushLogConfigRequest) {
            this.handlePushLogConfig((ServerPushLogConfigRequest)obj);
        }

    }

其中,任务执行的消息类是ServerSubmitJobInstanceRequest,Worker在接收到消息时,会交给函数handleSubmitJobInstance来处理:

private void handleSubmitJobInstance(ServerSubmitJobInstanceRequest request) {
       
        ServerSubmitJobInstanceResponse response = null;
        //任务正在执行中,直接返回
        if (this.masterPool.contains(request.getJobInstanceId())) {
            
            this.logCollector.collect(IdUtil.getUniqueIdWithoutTask(request.getJobId(), request.getJobInstanceId()), ClientLoggerMessage.appendMessage("server trigger client fail.", new String[]{errMsg}));
            response = ServerSubmitJobInstanceResponse.newBuilder().setSuccess(false).setMessage(errMsg).build();
            this.getSender().tell(response, this.getSelf());
        } else {
            response = ServerSubmitJobInstanceResponse.newBuilder().setSuccess(true).build();
            this.getSender().tell(response, this.getSelf());

            try {
                JobInstanceInfo jobInstanceInfo = this.convet2JobInstanceInfo(request);
                //根据任务的类型,创建TaskMaster对象
                TaskMaster taskMaster = TaskMasterFactory.create(jobInstanceInfo, this.getContext());
              
                this.masterPool.put(jobInstanceInfo.getJobInstanceId(), taskMaster);
                //提交任务执行
                taskMaster.submitInstance(jobInstanceInfo);
               
                ......
            } catch (Throwable var5) {
               ......
            }
        }

    }

handleSubmitJobInstance中会创建一个TaskMaster对象来执行任务,TaskMaster是一个抽象类,它有几个继承类,对应几个分布式任务的编程模型,本文以Map类型的任务为例子,说明任务执行流程。

map模型作业提供了并行计算、内存网格、网格计算三种执行方式:

  • 并行计算:子任务300以下,有子任务列表。
  • 内存网格:子任务5W以下,无子任务列表,速度快。
  • 网格计算:子任务100W以下,无子任务列表。

3.1 TaskMaster

MapTaskMaster作为TaskMaster的继承类,定义如下:

public abstract class MapTaskMaster extends TaskMaster {
   
    protected volatile int pageSize = ConfigUtil.getWorkerConfig().getInt("map.master.page.size", 100);
    protected volatile int queueSize = ConfigUtil.getWorkerConfig().getInt("map.master.queue.size", 10000);
    private volatile int dispatcherSize = ConfigUtil.getWorkerConfig().getInt("map.master.dispatcher.size", 5);
    protected ReqQueue<ContainerReportTaskStatusRequest> taskStatusReqQueue;
    protected TMStatusReqHandler<ContainerReportTaskStatusRequest> taskStatusReqBatchHandler;
    //存放map生成的任务
    protected ReqQueue<MasterStartContainerRequest> taskBlockingQueue;
   //存放即将转发的任务,并将任务转发给各个worker执行
    protected TaskDispatchReqHandler<MasterStartContainerRequest> taskDispatchReqHandler;
    private volatile String rootTaskResult;
    protected TaskPersistence taskPersistence;
    protected Map<String, TaskProgressCounter> taskProgressMap = Maps.newConcurrentMap();
    protected Map<String, WorkerProgressCounter> workerProgressMap = Maps.newConcurrentMap();
    private Map<Long, String> taskResultMap = Maps.newHashMap();
    private Map<Long, TaskStatus> taskStatusMap = Maps.newHashMap();
    ......
}

MapTaskMaster的submitInstance函数如下:

public void submitInstance(JobInstanceInfo jobInstanceInfo) throws Exception {
        try {
           ......
            //各个对象的初始化,启动线程池中的任务
            this.startBatchHandler();
            this.createRootTask();
            this.init();
        } catch (Throwable var4) {
           ......
        }

    }

  1、startBatchHandler主要是做一些初始化,启动线程池中的任务定时执行。

  2、createRootTask主要是创建一个根任务,即由这个任务来map出来多个子任务。

3、init主要是启动线程池中的任务定时执行。

createRootTask的代码如下:

protected void createRootTask() throws Exception {
        String taskName = "MAP_TASK_ROOT";
        ByteString taskBody = ByteString.copyFrom(HessianUtil.toBytes("MAP_TASK_ROOT"));
        //初始化任务计数器,当前job有一个task正在执行
        this.initTaskProgress(taskName, 1);
        //参数转换为MasterStartContainerRequest 
        MasterStartContainerRequest startContainerRequest = this.convert2StartContainerRequest(this.jobInstanceInfo, this.aquireTaskId(), taskName, taskBody);
        //将root任务分发给本地Worker执行
        this.batchDispatchTasks(Lists.newArrayList(new MasterStartContainerRequest[]{startContainerRequest}), this.getLocalWorkerIdAddr());
    }

函数batchDispatchTasks将任务分配给各个Worker执行,将root类型的任务分配给本地的Worker执行:

public void batchDispatchTasks(List<MasterStartContainerRequest> masterStartContainerRequests, String remoteWorker) {
        Map<String, List<MasterStartContainerRequest>> worker2ReqsWithNormal = Maps.newHashMap();
        Map<String, List<MasterStartContainerRequest>> worker2ReqsWithFailover = Maps.newHashMap();
        this.batchHandlePulledProgress(masterStartContainerRequests, worker2ReqsWithNormal, worker2ReqsWithFailover, remoteWorker);
        Iterator var5 = worker2ReqsWithNormal.entrySet().iterator();

        Entry entry;
        while(var5.hasNext()) {
            entry = (Entry)var5.next();
            this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), false, TaskDispatchMode.PUSH);
        }

        var5 = worker2ReqsWithFailover.entrySet().iterator();

        while(var5.hasNext()) {
            entry = (Entry)var5.next();
            this.batchHandleContainers((String)entry.getKey(), (List)entry.getValue(), true, TaskDispatchMode.PUSH);
        }

    }

函数batchHandleContainers将任务写入本地数据库H2,同时将任务转发给对应的worker。

private void batchHandleContainers(final String workerIdAddr, final List<MasterStartContainerRequest> reqs, boolean isFailover, TaskDispatchMode dispatchMode) {
         ......

        try {
            //将task写入本地数据库H2,task的状态为running
            this.batchHandlePersistence(workerId, workerAddr, reqs, isFailover);
            if (dispatchMode.equals(TaskDispatchMode.PUSH)) {
                //将task发送给对应的worker
                final long startTime = System.currentTimeMillis();
                ActorSelection selection = this.getActorContext().actorSelection(ActorPathUtil.getContainerRouterPath(workerIdAddr));
                MasterBatchStartContainersRequest request = MasterBatchStartContainersRequest.newBuilder().setJobInstanceId(this.jobInstanceInfo.getJobInstanceId()).setJobId(this.jobInstanceInfo.getJobId()).addAllStartReqs(reqs).build();
                Timeout timeout = new Timeout(Duration.create(15L, TimeUnit.SECONDS));
                Future<Object> future = Patterns.ask(selection, request, timeout);
                
        } catch (Throwable var13) {
           ......
        }

    }

3.2 Container

Container是执行业务逻辑的容器框架,TaskMaster转发给worker的任务会Containe模块执行。其中ContainerRoutingActor是一个路由Actor,里面包含多个ContainerActor,ContainerRoutingActor将接收到的消息转发给其中的一个ContainerActor。ContainerActor的定义如下:

public class ContainerActor extends UntypedActor {
    public void onReceive(Object obj) throws Throwable {
        if (obj instanceof MasterStartContainerRequest) {
            this.handleStartContainer((MasterStartContainerRequest)obj);
        } else if (obj instanceof MasterBatchStartContainersRequest) {
            this.handleBatchStartContainers((MasterBatchStartContainersRequest)obj);
        } else if (obj instanceof MasterKillContainerRequest) {
            this.handleKillContainer((MasterKillContainerRequest)obj);
        } else if (obj instanceof MasterDestroyContainerPoolRequest) {
            this.handleDestroyContainerPool((MasterDestroyContainerPoolRequest)obj);
        }

    }
}

当接收到MasterBatchStartContainersRequest类型的消息时,会调用函数startContainer来执行

private String startContainer(MasterStartContainerRequest request) throws Exception {
        String uniqueId = IdUtil.getUniqueId(request.getJobId(), request.getJobInstanceId(), request.getTaskId());
      
        JobContext context = ContanerUtil.convert2JobContext(request);
        Container container = ContainerFactory.create(context);
        if (container != null) {
                this.containerPool.submit(context.getJobId(), context.getJobInstanceId(), context.getTaskId(), container, consumerNum);
            
        } 
        ......
    }

最后通过submit提交任务、执行任务。

相关文章:

  • ATF启动(三):BL2
  • 论Orchestration和Choreography
  • JUC线程线程池和锁面试题
  • TypeScript 简介
  • css过渡效果
  • mysql中EXPLAIN命令解析
  • 【NodeJs-5天学习】第二天篇④ ——项目模块化
  • LeetCode 110.平衡二叉树 (C++)
  • 基于SpringBoot的校园闲置物品交易管理系统
  • 在线表格 循环替换 脚本
  • 量化投资学习——股指期货研究(二)
  • npm下载包速度慢-淘宝NPM镜像服务器--如何切换其他服务器下载
  • 基于elasticjob的入门maven项目搭建
  • 【校招VIP】产品项目分析之竞品分析
  • 服务端(后端)主动通知前端的实现:WebSocket(springboot中使用WebSocket案例)
  • [笔记] php常见简单功能及函数
  • bootstrap创建登录注册页面
  • exif信息对照
  • iBatis和MyBatis在使用ResultMap对应关系时的区别
  • Java,console输出实时的转向GUI textbox
  • LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
  • XML已死 ?
  • 初识MongoDB分片
  • 从零到一:用Phaser.js写意地开发小游戏(Chapter 3 - 加载游戏资源)
  • 海量大数据大屏分析展示一步到位:DataWorks数据服务+MaxCompute Lightning对接DataV最佳实践...
  • 机器学习中为什么要做归一化normalization
  • 将回调地狱按在地上摩擦的Promise
  • 离散点最小(凸)包围边界查找
  • 浅谈JavaScript的面向对象和它的封装、继承、多态
  • 嵌入式文件系统
  • 网页视频流m3u8/ts视频下载
  • 小程序开发中的那些坑
  • 走向全栈之MongoDB的使用
  • 不要一棍子打翻所有黑盒模型,其实可以让它们发挥作用 ...
  • 通过调用文摘列表API获取文摘
  • ​iOS安全加固方法及实现
  • (delphi11最新学习资料) Object Pascal 学习笔记---第2章第五节(日期和时间)
  • (超详细)2-YOLOV5改进-添加SimAM注意力机制
  • (机器学习-深度学习快速入门)第三章机器学习-第二节:机器学习模型之线性回归
  • (三)docker:Dockerfile构建容器运行jar包
  • (一)kafka实战——kafka源码编译启动
  • (译)2019年前端性能优化清单 — 下篇
  • .NET C# 使用 SetWindowsHookEx 监听鼠标或键盘消息以及此方法的坑
  • .NET Core IdentityServer4实战-开篇介绍与规划
  • .NET开源快速、强大、免费的电子表格组件
  • .NET面试题解析(11)-SQL语言基础及数据库基本原理
  • .net之微信企业号开发(一) 所使用的环境与工具以及准备工作
  • /dev/sda2 is mounted; will not make a filesystem here!
  • @Async注解的坑,小心
  • []sim300 GPRS数据收发程序
  • [2008][note]腔内级联拉曼发射的,二极管泵浦多频调Q laser——
  • [51nod1610]路径计数
  • [ai笔记9] openAI Sora技术文档引用文献汇总
  • [Ariticle] 厚黑之道 一 小狐狸听故事
  • [Bugku]密码???[writeup]