在第九期中,我们深入探讨了 Spring Batch 的批处理流程,剖析了 Job 和 Step 的执行机制。在企业级应用中,批处理任务可能因异常(如数据库故障、网络中断)失败,如何从失败点恢复并继续执行,是 Spring Batch 的关键特性之一。本篇将聚焦 Spring Batch 的恢复机制,深入源码分析其实现原理,并补充相关图示。
1. 恢复机制的核心概念
Spring Batch 的恢复机制依赖以下组件:
- JobRepository:持久化 Job 和 Step 的执行状态。
- JobExecution:记录 Job 的运行信息(如状态、失败原因)。
- StepExecution:记录 Step 的运行信息(如已处理条目数)。
- Restartability:支持从失败点重启。
恢复的核心在于利用持久化状态,定位失败位置并跳过已完成的数据。
2. 恢复机制的配置
一个支持恢复的 Spring Batch 配置:
@Configuration
@EnableBatchProcessing
public class BatchConfig {@Beanpublic Job job(JobBuilderFactory jobBuilderFactory, Step step) {return jobBuilderFactory.get("recoverableJob").start(step).build();}@Beanpublic Step step(StepBuilderFactory stepBuilderFactory, DataSource dataSource) {return stepBuilderFactory.get("recoverableStep").<String, String>chunk(10).reader(reader(dataSource)).processor(processor()).writer(writer()).faultTolerant().skip(IllegalArgumentException.class).skipLimit(5).build();}@Beanpublic ItemReader<String> reader(DataSource dataSource) {JdbcCursorItemReader<String> reader = new JdbcCursorItemReader<>();reader.setDataSource(dataSource);reader.setSql("SELECT name FROM items");reader.setRowMapper((rs, rowNum) -> rs.getString("name"));return reader;}@Beanpublic ItemProcessor<String, String> processor() {return item -> {if ("error".equals(item)) throw new IllegalArgumentException("Simulated error");return "Processed: " + item;};}@Beanpublic ItemWriter<String> writer() {return items -> items.forEach(System.out::println);}
}
faultTolerant()
:启用容错。skip()
:跳过指定异常。skipLimit()
:设置跳过次数上限。
3. JobRepository 的作用
JobRepository
使用数据库(如 BATCH_JOB_EXECUTION
和 BATCH_STEP_EXECUTION
表)持久化状态:
public interface JobRepository {JobExecution createJobExecution(String jobName, JobParameters jobParameters);void update(JobExecution jobExecution);void update(StepExecution stepExecution)