Spring Batch JobParameters
在 Spring Batch 中,一个 JobInstance 只能成功运行一次,它根据作业名和识别性参数的哈希值来标记任务的唯一性。
启动时命令后面带上参数:foo=bar (如果通过命令行启动,直接在 jar 文件的后面跟上 key=value
结构的参数就行了,IDEA 中可以在运行配置里设置参数)。
java -jar chapter04-0.0.1-SNAPSHOT.jar foo=bar
执行结果如下:
2022-11-24 17:31:53.674 INFO 26296 --- [ main] o.s.b.a.b.JobLauncherApplicationRunner : Running default command line with: [foo=bar]
2022-11-24 17:31:53.975 INFO 26296 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=basicJob]] launched with the following parameters: [{foo=bar}]
2022-11-24 17:31:54.194 INFO 26296 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
Hello, World!
2022-11-24 17:31:54.322 INFO 26296 --- [ main] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 127ms
2022-11-24 17:31:54.393 INFO 26296 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=basicJob]] completed with the following parameters: [{foo=bar}] and the following status: [COMPLETED] in 332ms
如果尝试再次以同样的参数运行,则程序会报 JobInstanceAlreadyCompleteException
异常:
org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={foo=bar}. If you want to run this job again, change the parameters.
提示该任务已经执行过了。
另外还支持 非识别性参数 ,即不参与哈希值计算的参数。只需要在参数前面加上 -
就可以了。如 -operator=jiajia
。
从上面的运行日志还可以看到,任务是通过 JobLauncherApplicationRunner
启动的,这个 Launcher 是由 Spring Batch Core 提供的,除此之外还有 CommandLineJobRunner
和 JobRegistryBackgroundJobRunner
。
本地的运行结果和书中的有点区别,书中说是通过 JobLauncherCommandLineRunner
启动任务的,但这个类型在 2.7.5 已经没有了,估计是改名为 JobLauncherApplicationRunner
了。
在 JobLauncherApplicationRunner
中会接受命令行参数,然后转化为 JobParameters
,然后传递给 JobInstance 。JobParameters
只不过是 Map<String,JobParameter>
对象的封装器而已。之所以 value 是 JobParameter
类型,是因为它支持设置参数类型。只需在参数 key 的后面添加类型的后缀既可以指定类型(如 executionDate(date)=2022/11/25
)。
Spring Batch 的参数共支持四种参数后缀(代码摘自 DefaultJobParametersConverter
类):
public static final String DATE_TYPE = "(date)";
public static final String STRING_TYPE = "(string)";
public static final String LONG_TYPE = "(long)";
private static final String DOUBLE_TYPE = "(double)";
另外在这个类中还可以到 非识别性参数 和 识别性参数 参数的前缀(识别性参数的前缀可以省略):
private static final String NON_IDENTIFYING_FLAG = "-";
private static final String IDENTIFYING_FLAG = "+";
获取参数
ChunkContext
:从 Step 中的 chunkContext 参数获取当前 Job 的参数。chunkContext 提供了作业在执行时的状态。其中包含了当前正在处理的块( chunk )的所有信息。
java@Bean("step1") public Step step1() { return this.stepBuilderFactory.get("step1") .tasklet((stepContribution, chunkContext) -> { String name = (String) chunkContext.getStepContext() .getJobParameters() .get("name"); System.out.printf("%s:Hello, %s!%n", chunkContext.getStepContext().getStepName(), name); return RepeatStatus.FINISHED; }) .build(); }
延迟绑定:使用 Spring 配置进行注入
java@Bean("step2") public Step step2(@Qualifier("helloWorldTasklet") Tasklet helloWorldTasklet) { return this.stepBuilderFactory.get("step2") .tasklet(helloWorldTasklet) .build(); } @StepScope @Bean("helloWorldTasklet") public Tasklet helloWorldTasklet(@Value("#{jobParameters['name']}") String name) { return (stepContribution, chunkContext) -> { System.out.printf("%s:Hello, %s!%n", chunkContext.getStepContext().getStepName(), name); return RepeatStatus.FINISHED; }; }
校验参数
Spring Batch 提供了如下几种校验方式:
DefaultJobParametersValidator
:Spring Batch 提供的 validator,只可以用来校验参数是否必须传入javaDefaultJobParametersValidator defaultJobParametersValidator = new DefaultJobParametersValidator( new String[]{"fileName"}, new String[]{"name", "run.id", "currentDate"});
DefaultJobParametersValidator
提供了两个配置项:requiredKeys :必传参数
optionalKeys :可选参数
如果该参数未指定,则默认可以传入任意参数,否则,只可以传入指定的必传和可选参数。
指定了可选参数时,如果传入必传和可选参数之外的参数,则会报类似如下错误:
Caused by: org.springframework.batch.core.JobParametersInvalidException: The JobParameters contains keys that are not explicitly optional or required: [run.id]
JobParametersValidator
:通过实现该接口来实现自定义的参数校验只要
validate()
方法没有抛出异常,就认为校验通过了。javapackage me.liujiajia.batch.validator; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.JobParametersValidator; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; /** * @author 佳佳 */ @Component public class FileNameValidator implements JobParametersValidator { /** * Check the parameters meet whatever requirements are appropriate, and * throw an exception if not. * * @param parameters some {@link JobParameters} (can be {@code null}) * @throws JobParametersInvalidException if the parameters are invalid */ @Override public void validate(JobParameters parameters) throws JobParametersInvalidException { String fileName = parameters.getString("fileName"); if (!StringUtils.hasText(fileName)) { throw new JobParametersInvalidException("fileName parameter is missing"); } else if (!StringUtils.endsWithIgnoreCase(fileName, "csv")) { throw new JobParametersInvalidException("fileName parameter does not use the csv file extension"); } } }
参数校验未通过时会显示如下错误:
Caused by: org.springframework.batch.core.JobParametersInvalidException: fileName parameter is missing
Caused by: org.springframework.batch.core.JobParametersInvalidException: fileName parameter does not use the csv file extensionCompositeJobParametersValidator
:当需要实现多个参数校验规则时使用组合校验这里组合了上面的参数必传和可选校验及自定义的文件名验校验证。
java@Bean("helloWorldValidator") public CompositeJobParametersValidator helloWorldValidator(FileNameValidator fileNameValidator) { CompositeJobParametersValidator validator = new CompositeJobParametersValidator(); DefaultJobParametersValidator defaultJobParametersValidator = new DefaultJobParametersValidator( new String[]{"fileName"}, new String[]{"name"}); defaultJobParametersValidator.afterPropertiesSet(); validator.setValidators(Arrays.asList(fileNameValidator, defaultJobParametersValidator)); return validator; }
添加 validator 到 Job :
@Bean
public Job basicJob(@Qualifier("step1") Step step1,
@Qualifier("helloWorldValidator") CompositeJobParametersValidator helloWorldValidator) {
return this.jobBuilderFactory.get("basicJob")
.validator(helloWorldValidator)
.start(step1)
.build();
}
递增参数
有些场景可能需要同样的参数运行多次,Spring Batch 框架提供了 RunIdIncrementer
类和 JobParametersIncrementer
接口。
RunIdIncrementer
默认传入一个 run.id 的数字(
long
型),每次运行自动加 1 。注意:如果制定了 optionalKeys ,则需要添加 run.id 到其中。
JobParametersIncrementer
如需实现自定义的递增处理时,可以实现
JobParametersIncrementer
接口。下面是书中提供的添加时间戳参数( currentDate )的示例。
同上,如果制定了 optionalKeys ,则需要添加代码中自定义的参数 Key 到其中。
javapackage me.liujiajia.batch.incrementer; import org.springframework.batch.core.JobParameters; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersIncrementer; import org.springframework.stereotype.Component; import java.util.Date; /** * @author 佳佳 */ @Component public class DailyJobTimestampIncrementer implements JobParametersIncrementer { /** * Increment the provided parameters. If the input is empty, then this * should return a bootstrap or initial value to be used on the first * instance of a job. * * @param parameters the last value used * @return the next value to use (never {@code null}) */ @Override public JobParameters getNext(JobParameters parameters) { return new JobParametersBuilder(parameters) .addDate("currentDate", new Date()) .toJobParameters(); } }
给 Job 添加自增参数:
@Bean
public Job basicJob(@Qualifier("step1") Step step1,
DailyJobTimestampIncrementer dailyJobTimestampIncrementer) {
return this.jobBuilderFactory.get("basicJob")
.incrementer(dailyJobTimestampIncrementer)
.start(step1)
.build();
}
示例代码
package me.liujiajia.batch;
import me.liujiajia.batch.incrementer.DailyJobTimestampIncrementer;
import me.liujiajia.batch.validator.FileNameValidator;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.job.CompositeJobParametersValidator;
import org.springframework.batch.core.job.DefaultJobParametersValidator;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.Arrays;
/**
* @author 佳佳
*/
@EnableBatchProcessing
@SpringBootApplication
public class HelloWorldJob {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean("step1")
public Step step1() {
return this.stepBuilderFactory.get("step1")
.tasklet((stepContribution, chunkContext) -> {
String name = (String) chunkContext.getStepContext()
.getJobParameters()
.get("name");
System.out.printf("%s:Hello, %s!%n", chunkContext.getStepContext().getStepName(), name);
return RepeatStatus.FINISHED;
})
.build();
}
@Bean("step2")
public Step step2(@Qualifier("helloWorldTasklet") Tasklet helloWorldTasklet) {
return this.stepBuilderFactory.get("step2")
.tasklet(helloWorldTasklet)
.build();
}
@StepScope
@Bean("helloWorldTasklet")
public Tasklet helloWorldTasklet(@Value("#{jobParameters['name']}") String name) {
return (stepContribution, chunkContext) -> {
System.out.printf("%s:Hello, %s!%n", chunkContext.getStepContext().getStepName(), name);
return RepeatStatus.FINISHED;
};
}
@Bean("step3")
public Step step3(@Qualifier("fileNameTasklet") Tasklet fileNameTasklet) {
return this.stepBuilderFactory.get("step3")
.tasklet(fileNameTasklet)
.build();
}
@StepScope
@Bean("fileNameTasklet")
public Tasklet fileNameTasklet(@Value("#{jobParameters['fileName']}") String fileName) {
return (stepContribution, chunkContext) -> {
System.out.printf("%s:fileName = %s%n", chunkContext.getStepContext().getStepName(), fileName);
return RepeatStatus.FINISHED;
};
}
@Bean("helloWorldValidator")
public CompositeJobParametersValidator helloWorldValidator(FileNameValidator fileNameValidator) {
CompositeJobParametersValidator validator = new CompositeJobParametersValidator();
DefaultJobParametersValidator defaultJobParametersValidator = new DefaultJobParametersValidator(
new String[]{"fileName"},
new String[]{"name", "run.id", "currentDate"});
defaultJobParametersValidator.afterPropertiesSet();
validator.setValidators(Arrays.asList(fileNameValidator, defaultJobParametersValidator));
return validator;
}
@Bean
public Job basicJob(@Qualifier("step1") Step step1,
@Qualifier("step2") Step step2,
@Qualifier("step3") Step step3,
@Qualifier("helloWorldValidator") CompositeJobParametersValidator helloWorldValidator,
DailyJobTimestampIncrementer dailyJobTimestampIncrementer) {
return this.jobBuilderFactory.get("basicJob")
.incrementer(dailyJobTimestampIncrementer)
// .incrementer(new RunIdIncrementer())
.validator(helloWorldValidator)
.start(step1)
.next(step2)
.next(step3)
.build();
}
public static void main(String[] args) {
SpringApplication.run(HelloWorldJob.class, args);
}
}
参考
[1]:《Spring Batch 权威指南》 -- [美] 迈克尔·T.米内拉(Michael,T.,Minella)著