当前位置: 首页 > news >正文

Spring Boot集成Spring Batch快速入门Demo

1.什么是Spring Batch?

Spring Batch 是一个轻量级的开源框架,它提供了一种简单的方式来处理大量的数据。它基于Spring框架,提供了一套批处理框架,可以处理各种类型的批处理任务,如ETL、数据导入/导出、报表生成等。Spring Batch提供了一些重要的概念,如Job、Step、ItemReader、ItemProcessor、ItemWriter等,这些概念可以帮助我们构建可重用的批处理应用程序。通过Spring Batch,我们可以轻松地实现批处理的并发、容错、重试等功能,同时也可以方便地与其他Spring组件集成,如Spring Boot、Spring Data等。总之,Spring Batch是一个非常强大、灵活、易于使用的批处理框架,可以帮助我们快速构建高效、可靠的批处理应用程序。

分层架构

spring-batch-layers

可以看到它分为三层,分别是:

  • Application应用层:包含了所有任务batch jobs和开发人员自定义的代码,主要是根据项目需要开发的业务流程等。
  • Batch Core核心层:包含启动和管理任务的运行环境类,如JobLauncher等。
  • Batch Infrastructure基础层:上面两层是建立在基础层之上的,包含基础的读入reader写出writer、重试框架等。

主要概念

spring-batch-reference-model

2.2.1 JobRepository

专门负责与数据库打交道,对整个批处理的新增、更新、执行进行记录。所以Spring Batch是需要依赖数据库来管理的。

2.2.2 任务启动器JobLauncher

负责启动任务Job

2.2.3 任务Job

Job是封装整个批处理过程的单位,跑一个批处理任务,就是跑一个Job所定义的内容。

111

  上图介绍了Job的一些相关概念:

  • Job:封装处理实体,定义过程逻辑。
  • JobInstanceJob的运行实例,不同的实例,参数不同,所以定义好一个Job后可以通过不同参数运行多次。
  • JobParameters:与JobInstance相关联的参数。
  • JobExecution:代表Job的一次实际执行,可能成功、可能失败。

所以,开发人员要做的事情,就是定义Job

2.2.4 步骤Step

Step是对Job某个过程的封装,一个Job可以包含一个或多个Step,一步步的Step按特定逻辑执行,才代表Job执行完成。

222

  通过定义Step来组装Job可以更灵活地实现复杂的业务逻辑。

2.2.5 输入——处理——输出

所以,定义一个Job关键是定义好一个或多个Step,然后把它们组装好即可。而定义Step有多种方法,但有一种常用的模型就是输入——处理——输出,即Item ReaderItem ProcessorItem Writer。比如通过Item Reader从文件输入数据,然后通过Item Processor进行业务处理和数据转换,最后通过Item Writer写到数据库中去。 Spring Batch为我们提供了许多开箱即用的ReaderWriter,非常方便。

 

2.环境搭建

参照代码仓库mysql模块里面docker目录搭建

3.代码工程

实验目标

如何使用 Spring Boot 创建各种不同类型 Spring Batch Job

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>springboot-demo</artifactId><groupId>com.et</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>SpringBatch</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-autoconfigure</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-batch</artifactId></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

job

第一个简单的任务

package com.et.batch.job;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class FirstJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job firstJob() {return jobBuilderFactory.get("firstJob").start(step()).build();}private Step step() {return stepBuilderFactory.get("step").tasklet((contribution, chunkContext) -> {System.out.println("execute  step....");return RepeatStatus.FINISHED;}).build();}
}

多步骤的job

package com.et.batch.job;import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class MultiStepJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job multiStepJob() {/*return jobBuilderFactory.get("multiStepJob").start(step1()).next(step2()).next(step3()).build();*/// control the next step by last Statusreturn jobBuilderFactory.get("multiStepJob2").start(step1()).on(ExitStatus.COMPLETED.getExitCode()).to(step2()).from(step2()).on(ExitStatus.COMPLETED.getExitCode()).to(step3()).from(step3()).end().build();}private Step step1() {return stepBuilderFactory.get("step1").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step1。。。");return RepeatStatus.FINISHED;}).build();}private Step step2() {return stepBuilderFactory.get("step2").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step2。。。");return RepeatStatus.FINISHED;}).build();}private Step step3() {return stepBuilderFactory.get("step3").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step3。。。");return RepeatStatus.FINISHED;}).build();}
}

多flow控制的job, 创建一个flow对象,包含若干个step

package com.et.batch.job;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class FlowJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job flowJob() {return jobBuilderFactory.get("flowJob").start(flow()).next(step3()).end().build();}private Step step1() {return stepBuilderFactory.get("step1").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step1。。。");return RepeatStatus.FINISHED;}).build();}private Step step2() {return stepBuilderFactory.get("step2").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step2。。。");return RepeatStatus.FINISHED;}).build();}private Step step3() {return stepBuilderFactory.get("step3").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step3。。。");return RepeatStatus.FINISHED;}).build();}private Flow flow() {return new FlowBuilder<Flow>("flow").start(step1()).next(step2()).build();}
}

并发执行的jobs

package com.et.batch.job;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowBuilder;
import org.springframework.batch.core.job.flow.Flow;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.stereotype.Component;@Component
public class SplitJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job splitJob() {return jobBuilderFactory.get("splitJob").start(flow1()).split(new SimpleAsyncTaskExecutor()).add(flow2()).end().build();}private Step step1() {return stepBuilderFactory.get("step1").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step1。。。");return RepeatStatus.FINISHED;}).build();}private Step step2() {return stepBuilderFactory.get("step2").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step2。。。");return RepeatStatus.FINISHED;}).build();}private Step step3() {return stepBuilderFactory.get("step3").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step3。。。");return RepeatStatus.FINISHED;}).build();}private Flow flow1() {return new FlowBuilder<Flow>("flow1").start(step1()).next(step2()).build();}private Flow flow2() {return new FlowBuilder<Flow>("flow2").start(step3()).build();}
}

根据上次运行结果判断是否执行下一步

package com.et.batch.job;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class DeciderJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate MyDecider myDecider;@Beanpublic Job deciderJob() {return jobBuilderFactory.get("deciderJob").start(step1()).next(myDecider).from(myDecider).on("weekend").to(step2()).from(myDecider).on("workingDay").to(step3()).from(step3()).on("*").to(step4()).end().build();}private Step step1() {return stepBuilderFactory.get("step1").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step1。。。");return RepeatStatus.FINISHED;}).build();}private Step step2() {return stepBuilderFactory.get("step2").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step2。。。");return RepeatStatus.FINISHED;}).build();}private Step step3() {return stepBuilderFactory.get("step3").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step3。。。");return RepeatStatus.FINISHED;}).build();}private Step step4() {return stepBuilderFactory.get("step4").tasklet((stepContribution, chunkContext) -> {System.out.println("execute step4。。。");return RepeatStatus.FINISHED;}).build();}
}

父子嵌套job

package com.et.batch.job;import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.JobStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;@Component
public class NestedJobDemo {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate JobRepository jobRepository;@Autowiredprivate PlatformTransactionManager platformTransactionManager;@Beanpublic Job parentJob() {return jobBuilderFactory.get("parentJob").start(childJobOneStep()).next(childJobTwoStep()).build();}private Step childJobOneStep() {return new JobStepBuilder(new StepBuilder("childJobOneStep")).job(childJobOne()).launcher(jobLauncher).repository(jobRepository).transactionManager(platformTransactionManager).build();}private Step childJobTwoStep() {return new JobStepBuilder(new StepBuilder("childJobTwoStep")).job(childJobTwo()).launcher(jobLauncher).repository(jobRepository).transactionManager(platformTransactionManager).build();}private Job childJobOne() {return jobBuilderFactory.get("childJobOne").start(stepBuilderFactory.get("childJobOneStep").tasklet((stepContribution, chunkContext) -> {System.out.println("subtask1。。。");return RepeatStatus.FINISHED;}).build()).build();}private Job childJobTwo() {return jobBuilderFactory.get("childJobTwo").start(stepBuilderFactory.get("childJobTwoStep").tasklet((stepContribution, chunkContext) -> {System.out.println("subtask2。。。");return RepeatStatus.FINISHED;}).build()).build();}
}

application.yaml

自动会初始化脚本,只需要建立以恶搞空库就行

spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/springbatchusername: rootpassword: 123456batch:jdbc:schema: classpath:org/springframework/batch/core/schema-mysql.sqlinitialize-schema: always #Since Spring Boot 2.5.0 use spring.batch.jdbc.initialize-schema=neverjob:enabled: true

以上只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • GitHub - Harries/springboot-demo: a simple springboot demo with some components for example: redis,solr,rockmq and so on.

4.测试

  • 启动Spring Boot应用程序,系统会自动运行job,跑过一次,下次启动不会继续执行
  • 如果要执行定时任务,可以利用spring提供的scheduledTaskRegistrar注册一个定时任务,扫描最新的定时任务,将这些定时任务注册到scheduleFuture中从而实现动态定时任务。

5.引用

  • Batch Applications :: Spring Boot
  • Spring Boot集成Spring Batch快速入门Demo | Harries Blog™

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 秒懂LINUX之初识命令(下)
  • 基于torch-pruning库对resnet18在cifar100数据集上进行剪枝实验
  • 【数据分享】2013-2022年我国省市县三级的逐月SO2数据(excel\shp格式\免费获取)
  • 华为1000人校园实验记录
  • Kafka Producer之ACKS应答机制
  • 泛型新理解
  • 数组算法--基本查找
  • 在vs code中用npm run serve运行项目报错
  • STM32之九:ADC模数转换器
  • 文献阅读:tidyomics 生态系统:增强组学数据分析
  • Guns v7.3.0:基于 Vue3、Antdv 和 TypeScript 打造的开箱即用型前端框架
  • Large Language Model系列之一:语言模型与表征学习(Language Models and Representation Learning)
  • Python+Django+MySQL的新闻发布管理系统【附源码,运行简单】
  • 使用 Vue3、Node.js、MySQL、Electron 和 Express 实现用户登录、文章管理和截屏功能
  • 【Git 学习笔记_14】第七章 用 git hook 钩子、别名、脚本提升日常工作效率(上)
  • httpie使用详解
  • input实现文字超出省略号功能
  • Java Agent 学习笔记
  • JAVA之继承和多态
  • js递归,无限分级树形折叠菜单
  • Nacos系列:Nacos的Java SDK使用
  • spring boot下thymeleaf全局静态变量配置
  • SQLServer之索引简介
  • 从PHP迁移至Golang - 基础篇
  • 动态规划入门(以爬楼梯为例)
  • 跨域
  • 如何进阶一名有竞争力的程序员?
  • 体验javascript之美-第五课 匿名函数自执行和闭包是一回事儿吗?
  • 详解NodeJs流之一
  • 做一名精致的JavaScripter 01:JavaScript简介
  • 阿里云服务器购买完整流程
  • 昨天1024程序员节,我故意写了个死循环~
  • ###项目技术发展史
  • #vue3 实现前端下载excel文件模板功能
  • $LayoutParams cannot be cast to android.widget.RelativeLayout$LayoutParams
  • (02)Cartographer源码无死角解析-(03) 新数据运行与地图保存、加载地图启动仅定位模式
  • (13)Hive调优——动态分区导致的小文件问题
  • (13)Latex:基于ΤΕΧ的自动排版系统——写论文必备
  • (42)STM32——LCD显示屏实验笔记
  • (done) ROC曲线 和 AUC值 分别是什么?
  • (Java数据结构)ArrayList
  • (STM32笔记)九、RCC时钟树与时钟 第一部分
  • (ZT)一个美国文科博士的YardLife
  • (二开)Flink 修改源码拓展 SQL 语法
  • (转)AS3正则:元子符,元序列,标志,数量表达符
  • (转)EOS中账户、钱包和密钥的关系
  • (转)JVM内存分配 -Xms128m -Xmx512m -XX:PermSize=128m -XX:MaxPermSize=512m
  • (转)机器学习的数学基础(1)--Dirichlet分布
  • (转)总结使用Unity 3D优化游戏运行性能的经验
  • ******IT公司面试题汇总+优秀技术博客汇总
  • ***通过什么方式***网吧
  • .NET 自定义中间件 判断是否存在 AllowAnonymousAttribute 特性 来判断是否需要身份验证
  • .NET/C# 阻止屏幕关闭,阻止系统进入睡眠状态
  • .NET程序员迈向卓越的必由之路
  • .net对接阿里云CSB服务