springbatch-使用
springbatch一共有4个重要角色:
- JobLauncher:任务启动器,通过它来启动任务,可以看作是程序的入口
- Job:一个具体的任务
- Step:一个具体的步骤,一个job可以包含多个step
- JobRepository:存储数据的地方,可以看做是一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等信息
整合springboot
-
导入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-test</artifactId> <scope>test</scope> </dependency> <!--持久化--> <!--<dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency>--> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.23</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency>
-
修改配置文件(配置数据库访问)
spring: datasource: url: username: password: driver-class-name:
-
引导类添加注解,开启springbatch
@SpringBootApplication @EnableBatchProcessing public class BatchDemoApplication { public static void main(String[] args) { SpringApplication.run(BatchDemoApplication.class, args); } }
-
使用
@Configuration public class JobConfig { //注入创建任务对象的对象 @Autowired private JobBuilderFactory jobBuilderFactory; //注入创建step对象的对象(任务的执行由step决定) @Autowired private StepBuilderFactory stepBuilderFactory; //创建任务对象 @Bean public Job helloWorldJob(){ return jobBuilderFactory .get("helloWorld") .start(step1()) .build(); } @Bean public Step step1(){ return stepBuilderFactory .get("step1") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception { System.out.println("hello world"); return RepeatStatus.FINISHED; } }).build(); } }
执行多个step
@Configuration
public class JobDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job jobDemoJob(){
return jobBuilderFactory
.get("jobDemoJob")
.start(step1())
.next(step2())
.next(step3())
.build();
}
@Bean
public Step step1(){
return stepBuilderFactory
.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step1...");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step step2(){
return stepBuilderFactory
.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step2...");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step step3(){
return stepBuilderFactory
.get("step3")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step3...");
return RepeatStatus.FINISHED;
}
}).build();
}
}
使用flow
flow可以把step进行统一管理类型,相当于一种分组。大大提高了step的复用性
@Configuration
public class FlowDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step step1(){
return stepBuilderFactory
.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step1...");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step step2(){
return stepBuilderFactory
.get("step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step2...");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step step3(){
return stepBuilderFactory
.get("step3")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step3...");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Flow flowDemoFlow(){
return new FlowBuilder<Flow>("flowDemoFlow")
.start(step1())
.next(step2())
.next(step3())
.build();
}
@Bean
public Job fowDemoJob(){
return jobBuilderFactory
.get("jobBuilderJob")
.start(flowDemoFlow())
.next(step3())
.end()
.build();
}
}
split使用
split可以使同一个job下的step并发执行
@Configuration
@EnableBatchProcessing
public class SplitDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step splitDemoStep1(){
return stepBuilderFactory
.get("splitDemoStep1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step1...");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step splitDemoStep2(){
return stepBuilderFactory
.get("splitDemoStep2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step2...");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step splitDemoStep3(){
return stepBuilderFactory
.get("splitDemoStep3")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step3...");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Flow splitDemoFlow1(){
return new FlowBuilder<Flow>("splitDemoFlow1")
.start(splitDemoStep1())
.build();
}
@Bean
public Flow splitDemoFlow2(){
return new FlowBuilder<Flow>("splitDemoFlow2")
.start(splitDemoStep2())
.next(splitDemoStep3())
.build();
}
@Bean
public Job splitDemoJob(){
return jobBuilderFactory
.get("splitDemoJob")
.start(splitDemoFlow1())
.split(new SimpleAsyncTaskExecutor())//并发执行
.add(splitDemoFlow2())
.end()
.build();
}
}
自定义决策器
决策器
public class MyDecider implements JobExecutionDecider {
private int count;
@Override
public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
count++;
if (count%2==0)
return new FlowExecutionStatus("even");
else
return new FlowExecutionStatus("odd");
}
}
使用
@Configuration
@EnableBatchProcessing
public class DeciderDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step deciderDemoStep1(){
return stepBuilderFactory
.get("deciderDemoStep1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step1...");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step deciderDemoStep2(){
return stepBuilderFactory
.get("deciderDemoStep2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step2...");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step deciderDemoStep3(){
return stepBuilderFactory
.get("deciderDemoStep3")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step3...");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public JobExecutionDecider myDecider(){
return new MyDecider();
}
@Bean
public Job deciderDemoJob(){
return jobBuilderFactory
.get("deciderDemoJob")
.start(deciderDemoStep1())
.next(myDecider())
.from(myDecider()).on("even").to(deciderDemoStep2())
.from(myDecider()).on("odd").to(deciderDemoStep3())
.from(deciderDemoStep3()).on("*").to(myDecider())//*匹配所有
.end()
.build();
}
}
job嵌套
子job1
@Configuration
public class ChildJob1 {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step childJob1Step1(){
return stepBuilderFactory
.get("childJob1Step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step1...");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Job childJobOne(){
return jobBuilderFactory
.get("childJobOne")
.start(childJob1Step1())
.build();
}
}
子job2
@Configuration
public class ChildJob2 {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Step childJob1Stet1(){
return stepBuilderFactory
.get("childJob2Step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step2-1...");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Step childJob1Stet2(){
return stepBuilderFactory
.get("childJob2Step2")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("step2-2...");
return RepeatStatus.FINISHED;
}
}).build();
}
@Bean
public Job childJobTwo(){
return jobBuilderFactory
.get("childJobOne")
.start(childJob1Stet1())
.next(childJob1Stet2())
.build();
}
}
父job
@Configuration
public class NestedDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private Job childJobOne;
@Autowired
private Job childJobTwo;
@Autowired
private JobRepository jobRepository;
@Autowired
private PlatformTransactionManager platformTransactionManager;
@Autowired
private JobLauncher jobLauncher;//启动对象
@Bean
public Job parentJob(){
return jobBuilderFactory
.get("parentJob1")
.start(childJob1())
.next(childJob2())
.build();
}
//返回job类型的step(特殊的step)
private Step childJob1(){
return new JobStepBuilder(new StepBuilder("childJob1"))
.job(childJobOne)
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(platformTransactionManager)
.build();
}
private Step childJob2(){
return new JobStepBuilder(new StepBuilder("childJob2"))
.job(childJobTwo)
.launcher(jobLauncher)
.repository(jobRepository)
.transactionManager(platformTransactionManager)
.build();
}
}
为防止子job执行可以在配置文件中指定执行的job
spring:
batch:
job:
names: parentJob1
监听器
接口形式监听器
public class MyJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println("job---before:"+jobExecution.getJobInstance().getJobName());
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("job---after:"+jobExecution.getJobInstance().getJobName());
}
}
注解形式监听器
public class MyChunkListener {
@BeforeChunk
public void beforeChunk(ChunkContext context){
System.out.println(context.getStepContext().getStepName()+"before...");
}
@AfterChunk
public void afterChunk(ChunkContext context){
System.out.println(context.getStepContext().getStepName()+"after...");
}
}
配置监听器
@Configuration
public class ListenerDemo {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job listenerJob(){
return jobBuilderFactory
.get("listenerJob")
.start(step1())
.listener(new MyJobListener())
.build();
}
@Bean
public Step step1(){
return stepBuilderFactory
.get("step1")
.<String,String>chunk(2)//每操作多少数据进行一次处理
.listener(new MyChunkListener())
.reader(reader())//读数据
.writer(writer())//写数据
.build();
}
@Bean
public ItemReader<String> reader(){
return new ListItemReader<>(Arrays.asList("a","b","c","d","e"));
}
@Bean
public ItemWriter<String> writer(){
return new ItemWriter<String>() {
@Override
public void write(List<? extends String> list) throws Exception {
System.out.println(list);
}
};
}
}
job参数
job执行的是step,job使用的数据肯定是在step中使用。所以所以我们只需要给step传递数据
使用step级别的监听来传递数据
@Configuration
public class ParametersDemo implements StepExecutionListener {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
private Map<String,JobParameter> parameterMap;
@Bean
public Job parameterJob(){
return jobBuilderFactory
.get("parameterJob")
.start(parameterStep())
.build();
}
@Bean
public Step parameterStep(){
return stepBuilderFactory
.get("parameterStep")
.listener(this)
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println("接收参数:"+parameterMap.get("info"));
return RepeatStatus.FINISHED;
}
}).build();
}
@Override
public void beforeStep(StepExecution stepExecution) {
parameterMap=stepExecution.getJobParameters().getParameters();
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
return null;
}
}
在启动时添加参数info=xxx