一、业务背景支付对账是金融系统每日核心定时任务数据源业务库订单支付流水、第三方支付渠道回调流水对账逻辑以渠道流水为准匹配本地支付订单区分三类结果平账本地订单与渠道流水金额、订单号完全匹配本地长款渠道有记录本地无对应订单本地短款本地有订单渠道无对应流水3.需求约束每日凌晨自动执行处理前一日全量支付数据百万级流水不 OOM流式分页读取数据库批量入库对账结果支持断点续跑失败分片单独重试不重复处理完整数据4.技术选型Spring Batch MySQL JdbcCursorItemReader流式游标读 JdbcBatchItemWriter批量写二、整体架构流程Job支付对账任务payReconciliationJobStep单步分片处理reconciliationStepReaderJdbcCursorItemReader读取前一日第三方渠道流水流式游标避免全量加载Processor对账核心处理器匹配本地订单生成对账结果Writer批量写入对账结果表Chunk 批量事务提交参数通过 Job 参数传入对账日期动态筛选当日流水三、数据库表设计1. 第三方渠道流水表source_channel_paysqlCREATE TABLE source_channel_pay (id BIGINT PRIMARY KEY AUTO_INCREMENT COMMENT 主键,out_trade_no VARCHAR(64) NOT NULL COMMENT 商户订单号,channel_trade_no VARCHAR(64) NOT NULL COMMENT 渠道交易号,pay_amount DECIMAL(12,2) NOT NULL COMMENT 支付金额,pay_time DATETIME NOT NULL COMMENT 支付时间,create_time DATETIME DEFAULT CURRENT_TIMESTAMP,INDEX idx_pay_time (pay_time)) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COMMENT第三方支付渠道流水;2. 本地业务支付订单表biz_pay_ordersqlCREATE TABLE biz_pay_order (id BIGINT PRIMARY KEY AUTO_INCREMENT,order_no VARCHAR(64) NOT NULL COMMENT 商户订单号,pay_amount DECIMAL(12,2) NOT NULL COMMENT 支付金额,pay_status TINYINT NOT NULL COMMENT 1待支付 2已支付,pay_time DATETIME COMMENT 本地支付完成时间,INDEX idx_order_no (order_no)) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COMMENT本地支付订单;3. 对账结果表pay_reconciliation_resultsqlCREATE TABLE pay_reconciliation_result (id BIGINT PRIMARY KEY AUTO_INCREMENT,recon_date DATE NOT NULL COMMENT 对账日期,out_trade_no VARCHAR(64) NOT NULL COMMENT 商户订单号,channel_trade_no VARCHAR(64) NOT NULL COMMENT 渠道交易号,channel_amount DECIMAL(12,2) NOT NULL COMMENT 渠道金额,local_amount DECIMAL(12,2) DEFAULT NULL COMMENT 本地订单金额,recon_type TINYINT NOT NULL COMMENT 1平账 2长款 3短款,recon_desc VARCHAR(256) COMMENT 对账说明,create_time DATETIME DEFAULT CURRENT_TIMESTAMP,INDEX idx_recon_date (recon_date)) ENGINEInnoDB DEFAULT CHARSETutf8mb4 COMMENT支付对账结果表;四、依赖引入Mavenxml!-- Spring Batch 核心 --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-batch/artifactId/dependency!-- JDBC 数据库 --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-jdbc/artifactId/dependency!-- MySQL驱动 --dependencygroupIdcom.mysql/groupIdartifactIdmysql-connector-j/artifactIdscoperuntime/scope/dependency!-- Lombok --dependencygroupIdorg.projectlombok/groupIdartifactIdlombok/artifactIdoptionaltrue/optional/dependency五、实体对象封装1. 渠道流水实体 ChannelPayDOjavaimport lombok.Data;import java.math.BigDecimal;import java.time.LocalDateTime;Datapublic class ChannelPayDO {private Long id;private String outTradeNo;private String channelTradeNo;private BigDecimal payAmount;private LocalDateTime payTime;}2. 本地支付订单 BizPayOrderDOjavaimport lombok.Data;import java.math.BigDecimal;import java.time.LocalDateTime;Datapublic class BizPayOrderDO {private Long id;private String orderNo;private BigDecimal payAmount;private Integer payStatus;private LocalDateTime payTime;}3. 对账结果 PayReconciliationResultDOjavaimport lombok.Data;import java.math.BigDecimal;import java.time.LocalDate;Datapublic class PayReconciliationResultDO {private LocalDate reconDate;private String outTradeNo;private String channelTradeNo;private BigDecimal channelAmount;private BigDecimal localAmount;/** 1平账 2长款 3短款 */private Integer reconType;private String reconDesc;}六、Spring Batch 完整配置类配置说明JdbcCursorItemReader流式游标读取渠道流水百万级数据不 OOMChunk 1000每 1000 条一批次读取、处理、批量写入单事务提交Job 参数reconDate动态传入对账日期筛选当日数据自动启用 Spring Batch 内置元数据表job 执行日志、断点续跑。javaimport lombok.RequiredArgsConstructor;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.job.builder.JobBuilder;import org.springframework.batch.core.step.builder.StepBuilder;import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;import org.springframework.batch.item.database.JdbcBatchItemWriter;import org.springframework.batch.item.database.JdbcCursorItemReader;import org.springframework.batch.item.database.builder.JdbcBatchItemWriterBuilder;import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.jdbc.core.BeanPropertyRowMapper;import javax.sql.DataSource;import java.time.LocalDate;ConfigurationEnableBatchProcessingRequiredArgsConstructorpublic class PayReconciliationBatchConfig {private final JobBuilderFactory jobBuilderFactory;private final StepBuilderFactory stepBuilderFactory;private final DataSource dataSource;private final PayReconciliationProcessor reconciliationProcessor;// Reader读取渠道流水 Beanpublic JdbcCursorItemReaderChannelPayDO channelPayReader() {// 从Job参数获取对账日期reconDate筛选前一日渠道流水String sql SELECT id, out_trade_no, channel_trade_no, pay_amount, pay_time FROM source_channel_pay WHERE DATE(pay_time) :reconDate;return new JdbcCursorItemReaderBuilderChannelPayDO().dataSource(dataSource).sql(sql).rowMapper(new BeanPropertyRowMapper(ChannelPayDO.class)).fetchSize(1000) // 游标批量拉取优化网络IO.name(channelPayCursorReader).build();}// Writer批量写入对账结果 Beanpublic JdbcBatchItemWriterPayReconciliationResultDO reconResultWriter() {String insertSql INSERT INTO pay_reconciliation_result (recon_date, out_trade_no, channel_trade_no, channel_amount, local_amount, recon_type, recon_desc) VALUES (:reconDate, :outTradeNo, :channelTradeNo, :channelAmount, :localAmount, :reconType, :reconDesc);return new JdbcBatchItemWriterBuilderPayReconciliationResultDO().dataSource(dataSource).sql(insertSql).itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider()).build();}// Step对账处理步骤 Beanpublic Step reconciliationStep() {return stepBuilderFactory.get(reconciliationStep)// chunk块大小1000每1000条提交一次事务.ChannelPayDO, PayReconciliationResultDOchunk(1000).reader(channelPayReader()).processor(reconciliationProcessor).writer(reconResultWriter())// 容错单条数据异常跳过记录日志不中断整个任务.skip(Exception.class).skipLimit(100).build();}// Job支付对账主任务 Beanpublic Job payReconciliationJob() {return jobBuilderFactory.get(payReconciliationJob).start(reconciliationStep()).build();}}七、对账核心 ItemProcessor业务逻辑层核心逻辑根据渠道订单号查询本地支付订单匹配判定平账 / 长款 / 短款组装对账结果实体返回交由 Writer 批量入库。javaimport lombok.RequiredArgsConstructor;import lombok.extern.slf4j.Slf4j;import org.springframework.batch.item.ItemProcessor;import org.springframework.jdbc.core.BeanPropertyRowMapper;import org.springframework.jdbc.core.JdbcTemplate;import org.springframework.stereotype.Component;import java.math.BigDecimal;import java.time.LocalDate;Slf4jComponentRequiredArgsConstructorpublic class PayReconciliationProcessor implements ItemProcessorChannelPayDO, PayReconciliationResultDO {private final JdbcTemplate jdbcTemplate;private static final String QUERY_ORDER_SQL SELECT id, order_no, pay_amount, pay_status, pay_time FROM biz_pay_order WHERE order_no ?;Overridepublic PayReconciliationResultDO process(ChannelPayDO channelPay) throws Exception {String outTradeNo channelPay.getOutTradeNo();BigDecimal channelAmount channelPay.getPayAmount();LocalDate reconDate channelPay.getPayTime().toLocalDate();// 查询本地订单BizPayOrderDO localOrder null;try {localOrder jdbcTemplate.queryForObject(QUERY_ORDER_SQL,new Object[]{outTradeNo},new BeanPropertyRowMapper(BizPayOrderDO.class));} catch (Exception e) {// 无本地订单判定长款log.warn(订单{}本地无支付记录长款, outTradeNo);}PayReconciliationResultDO result new PayReconciliationResultDO();result.setReconDate(reconDate);result.setOutTradeNo(outTradeNo);result.setChannelTradeNo(channelPay.getChannelTradeNo());result.setChannelAmount(channelAmount);if (localOrder null) {// 长款渠道有、本地无result.setReconType(2);result.setReconDesc(长款渠道存在流水本地无对应支付订单);result.setLocalAmount(null);} else {result.setLocalAmount(localOrder.getPayAmount());if (channelAmount.compareTo(localOrder.getPayAmount()) 0) {// 平账金额一致result.setReconType(1);result.setReconDesc(平账渠道与本地订单金额匹配);} else {// 短款订单存在但金额不一致result.setReconType(3);result.setReconDesc(String.format(短款渠道金额%s本地订单金额%s, channelAmount, localOrder.getPayAmount()));}}return result;}}八、启动任务测试类手动执行 / 定时调度1. 手动启动单元测试javaimport org.junit.jupiter.api.Test;import org.springframework.batch.core.Job;import org.springframework.batch.core.JobParameters;import org.springframework.batch.core.JobParametersBuilder;import org.springframework.batch.core.launch.JobLauncher;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import java.time.LocalDate;SpringBootTestpublic class PayReconJobTest {Autowiredprivate JobLauncher jobLauncher;Autowiredprivate Job payReconciliationJob;Testvoid runReconJob() throws Exception {// 传入对账日期例如2026-06-14LocalDate reconDate LocalDate.of(2026, 6, 14);JobParameters params new JobParametersBuilder().addLocalDate(reconDate, reconDate).addLong(runTime, System.currentTimeMillis()) // 每次执行参数唯一避免任务重复跳过.toJobParameters();jobLauncher.run(payReconciliationJob, params);}}2. 定时自动执行每日凌晨 1 点对账javaimport lombok.RequiredArgsConstructor;import org.springframework.batch.core.Job;import org.springframework.batch.core.JobParameters;import org.springframework.batch.core.JobParametersBuilder;import org.springframework.batch.core.launch.JobLauncher;import org.springframework.scheduling.annotation.Scheduled;import org.springframework.stereotype.Component;import java.time.LocalDate;ComponentRequiredArgsConstructorpublic class ReconScheduleTask {private final JobLauncher jobLauncher;private final Job payReconciliationJob;/*** 每日凌晨1点执行前一日对账*/Scheduled(cron 0 0 1 * * ?)public void executeReconTask() throws Exception {// 对账日期为昨日LocalDate reconDate LocalDate.now().minusDays(1);JobParameters params new JobParametersBuilder().addLocalDate(reconDate, reconDate).addLong(runTime, System.currentTimeMillis()).toJobParameters();jobLauncher.run(payReconciliationJob, params);}}九、MySQL 性能优化配置application.ymlyamlspring:batch:jdbc:initialize-schema: always # 自动创建Batch元数据表首次运行开启datasource:url: jdbc:mysql://127.0.0.1:3306/pay_db?useUnicodetruecharacterEncodingutf8serverTimezoneAsia/Shanghai# 批量写入核心优化合并多条insert为单条SQL性能提升5~10倍rewriteBatchedStatementstrueuseServerPrepStmtstrueusername: rootpassword: 123456driver-class-name: com.mysql.cj.jdbc.Driver十、方案核心优势博客重点总结1. 大数据量内存可控使用JdbcCursorItemReader游标流式读取不会一次性加载全量渠道流水百万 / 千万级数据无 OOM 风险对比分页limit offset深度分页性能断崖下跌问题完美规避。2. 数据库写入高效JdbcBatchItemWriter底层 JDBC 批量addBatch配合rewriteBatchedStatementstrue1000 条单次事务提交网络 IO、事务日志开销大幅降低速度是单条循环写入 10 倍以上。3. 金融级可靠特性断点续跑Batch 内置元数据表记录每一个 chunk 执行状态任务中途崩溃重启后仅重跑失败分片无需全量重跑异常容错配置 skip 限制单条脏数据跳过不中断整体对账任务事务隔离每个 Chunk 独立事务成功批量入库失败整块回滚对账数据不会半残。4. 定时运维友好支持 Cron 定时每日自动执行通过 Job 参数动态指定对账日期支持手动重跑历史日期对账适配对账差错补跑场景。5. 业务解耦易扩展新增对账规则仅修改ItemProcessor读写层完全不动可扩展分区并行 Partition多线程分片读取渠道流水千万级数据同步提速可扩展文件导出、对账告警、差错推送等后置 Step。十一、生产调优建议博客拓展阅读Chunk 大小对账流水推荐 chunk1000平衡吞吐量与回滚成本数据量超大可调整至 2000~5000并行提速千万级流水接入Partitioner分区按渠道流水 ID 分片多线程并行读取索引优化渠道流水 pay_time、订单表 order_no 必须建立索引避免全表扫描资源隔离对账任务夜间执行避开业务高峰可单独配置读写分离从库读取渠道流水监控告警监听 Job 执行完成事件对账失败、长款短款数量超标推送钉钉 / 邮件告警。十二、适用场景拓展除支付对账外该架构可无缝复用资金流水核对、商户结算批处理订单数据统计、历史数据迁移第三方 API 数据落地、数据清洗批任务。