Try Live
Add Docs
Rankings
Pricing
Enterprise
Docs
Install
Install
Docs
Pricing
Enterprise
More...
More...
Try Live
Rankings
Add Docs
Spring Batch
https://github.com/spring-projects/spring-batch
Admin
Spring Batch is a lightweight, comprehensive batch framework for developing robust enterprise batch
...
Tokens:
121,050
Snippets:
1,003
Trust Score:
-
Update:
3 weeks ago
Context
Skills
Chat
Benchmark
92.2
Suggestions
Latest
Show doc for...
Code
Info
Show Results
Context Summary (auto-generated)
Raw
Copy
Link
# Spring Batch Spring Batch is a comprehensive framework for developing robust batch applications in Java/Spring. It provides reusable functions essential for processing large volumes of records, including logging/tracing, transaction management, job processing statistics, job restart, skip, and resource management. The framework follows a domain-driven design with core concepts like Jobs (composed of Steps), ItemReader/ItemProcessor/ItemWriter for chunk-oriented processing, and a JobRepository for persisting batch metadata. The framework is organized into several modules: spring-batch-core (Job, Step, repository, configuration), spring-batch-infrastructure (item readers, writers, processors), spring-batch-integration (Spring Integration support), spring-batch-test (testing utilities), and spring-batch-samples (example implementations). Spring Batch 6.x requires Java 17+ and integrates with Spring Framework 7, Spring Data 4, and Micrometer 1.16 for observability. ## Configuration Annotations ### @EnableBatchProcessing Enables Spring Batch features and provides a base configuration for batch jobs. This annotation configures common batch infrastructure beans and should be combined with either `@EnableJdbcJobRepository` or `@EnableMongoJobRepository` for persistence configuration. ```java @Configuration @EnableBatchProcessing(taskExecutorRef = "batchTaskExecutor") @EnableJdbcJobRepository(dataSourceRef = "batchDataSource") public class BatchConfiguration { @Bean public Job myJob(JobRepository jobRepository, Step step1) { return new JobBuilder("myJob", jobRepository) .start(step1) .build(); } @Bean public Step step1(JobRepository jobRepository) { return new StepBuilder("step1", jobRepository) .tasklet((contribution, chunkContext) -> { System.out.println("Executing step1"); return RepeatStatus.FINISHED; }) .build(); } @Bean public DataSource batchDataSource() { return new EmbeddedDatabaseBuilder() .setType(EmbeddedDatabaseType.H2) .addScript("/org/springframework/batch/core/schema-drop-h2.sql") .addScript("/org/springframework/batch/core/schema-h2.sql") .build(); } @Bean public JdbcTransactionManager transactionManager(DataSource dataSource) { return new JdbcTransactionManager(dataSource); } } ``` ### @EnableJdbcJobRepository Configures a JDBC-based job repository for persisting batch metadata. Supports configuring database type, table prefix, isolation level, and connection settings. ```java @Configuration @EnableBatchProcessing @EnableJdbcJobRepository( dataSourceRef = "batchDataSource", transactionManagerRef = "batchTransactionManager", tablePrefix = "BATCH_", isolationLevelForCreate = "ISOLATION_SERIALIZABLE", maxVarCharLength = 2500 ) public class JdbcBatchConfiguration { @Bean public DataSource batchDataSource() { HikariDataSource dataSource = new HikariDataSource(); dataSource.setJdbcUrl("jdbc:postgresql://localhost:5432/batch"); dataSource.setUsername("batch_user"); dataSource.setPassword("secret"); return dataSource; } @Bean public JdbcTransactionManager batchTransactionManager(DataSource batchDataSource) { return new JdbcTransactionManager(batchDataSource); } } ``` ### @EnableMongoJobRepository Configures a MongoDB-based job repository for persisting batch metadata. Useful for applications that prefer document-based storage over relational databases. ```java @Configuration @EnableBatchProcessing @EnableMongoJobRepository( mongoOperationsRef = "batchMongoOperations", transactionManagerRef = "batchTransactionManager" ) public class MongoBatchConfiguration { @Bean public MongoOperations batchMongoOperations(MongoDatabaseFactory factory) { return new MongoTemplate(factory); } @Bean public MongoTransactionManager batchTransactionManager(MongoDatabaseFactory factory) { return new MongoTransactionManager(factory); } } ``` ## Core Domain Classes ### Job Interface The Job interface represents the top-level batch job configuration. A job contains one or more steps and is executed by the JobOperator. ```java @FunctionalInterface public interface Job { default String getName() { return this.getClass().getName(); } default boolean isRestartable() { return true; } void execute(JobExecution execution) throws JobInterruptedException; default JobParametersIncrementer getJobParametersIncrementer() { return null; } default JobParametersValidator getJobParametersValidator() { return new DefaultJobParametersValidator(); } } // Usage: Creating a custom Job implementation public class CustomJob implements Job { private final String name; private final Step step; public CustomJob(String name, Step step) { this.name = name; this.step = step; } @Override public String getName() { return name; } @Override public void execute(JobExecution execution) throws JobInterruptedException { StepExecution stepExecution = execution.createStepExecution(step.getName()); step.execute(stepExecution); execution.setStatus(stepExecution.getStatus()); execution.setExitStatus(stepExecution.getExitStatus()); } } ``` ### Step Interface The Step interface represents a phase of a batch job. Steps can be tasklet-based (simple operations) or chunk-oriented (read-process-write pattern). ```java @FunctionalInterface public interface Step { String STEP_TYPE_KEY = "batch.stepType"; default String getName() { return this.getClass().getName(); } default boolean isAllowStartIfComplete() { return false; } default int getStartLimit() { return Integer.MAX_VALUE; } void execute(StepExecution stepExecution) throws JobInterruptedException; } // Usage: Creating a custom Step implementation public class LoggingStep implements Step { private final String name; private final String message; public LoggingStep(String name, String message) { this.name = name; this.message = message; } @Override public String getName() { return name; } @Override public void execute(StepExecution stepExecution) { System.out.println(message); stepExecution.setStatus(BatchStatus.COMPLETED); stepExecution.setExitStatus(ExitStatus.COMPLETED); } } ``` ### JobParameters An immutable, thread-safe record for holding job parameters. Parameters can be typed (String, Long, Double, Date, LocalDate, LocalDateTime) and marked as identifying or non-identifying. ```java // Creating JobParameters using JobParametersBuilder JobParameters params = new JobParametersBuilder() .addString("inputFile", "/data/input.csv", true) // identifying parameter .addString("outputFile", "/data/output.csv", true) .addLong("timestamp", System.currentTimeMillis(), false) // non-identifying .addLocalDate("runDate", LocalDate.now(), true) .addDouble("threshold", 0.95, false) .toJobParameters(); // Accessing parameters String inputFile = params.getString("inputFile"); Long timestamp = params.getLong("timestamp"); LocalDate runDate = params.getLocalDate("runDate"); Double threshold = params.getDouble("threshold"); // Iterating over parameters for (JobParameter<?> param : params) { System.out.printf("Parameter: %s = %s (identifying: %s)%n", param.getName(), param.getValue(), param.isIdentifying()); } // Getting only identifying parameters (used for job instance identity) Set<JobParameter<?>> identifyingParams = params.getIdentifyingParameters(); ``` ### JobExecution and StepExecution JobExecution represents a single attempt to run a job, while StepExecution tracks the execution state and statistics of a step. ```java // JobExecution contains execution metadata and statistics JobExecution jobExecution = jobRepository.getJobExecution(executionId); // Access execution details BatchStatus status = jobExecution.getStatus(); // STARTING, STARTED, STOPPING, STOPPED, FAILED, COMPLETED, ABANDONED ExitStatus exitStatus = jobExecution.getExitStatus(); LocalDateTime createTime = jobExecution.getCreateTime(); LocalDateTime startTime = jobExecution.getStartTime(); LocalDateTime endTime = jobExecution.getEndTime(); JobParameters parameters = jobExecution.getJobParameters(); ExecutionContext context = jobExecution.getExecutionContext(); List<Throwable> failures = jobExecution.getFailureExceptions(); // Access step executions within a job for (StepExecution stepExecution : jobExecution.getStepExecutions()) { System.out.printf("Step: %s%n", stepExecution.getStepName()); System.out.printf(" Read count: %d%n", stepExecution.getReadCount()); System.out.printf(" Write count: %d%n", stepExecution.getWriteCount()); System.out.printf(" Filter count: %d%n", stepExecution.getFilterCount()); System.out.printf(" Read skip count: %d%n", stepExecution.getReadSkipCount()); System.out.printf(" Write skip count: %d%n", stepExecution.getWriteSkipCount()); System.out.printf(" Process skip count: %d%n", stepExecution.getProcessSkipCount()); System.out.printf(" Commit count: %d%n", stepExecution.getCommitCount()); System.out.printf(" Rollback count: %d%n", stepExecution.getRollbackCount()); } ``` ## Job and Step Builders ### JobBuilder Entry point for building jobs using a fluent API. Supports linear sequential jobs, flow-based jobs with conditional transitions, and split jobs for parallel execution. ```java @Configuration @EnableBatchProcessing @EnableJdbcJobRepository public class JobBuilderExamples { // Simple linear job with sequential steps @Bean public Job linearJob(JobRepository jobRepository, Step step1, Step step2, Step step3) { return new JobBuilder("linearJob", jobRepository) .start(step1) .next(step2) .next(step3) .build(); } // Flow-based job with conditional transitions @Bean public Job flowJob(JobRepository jobRepository, Step stepA, Step stepB, Step stepC, Step errorStep) { return new JobBuilder("flowJob", jobRepository) .start(stepA) .on("FAILED").to(errorStep) .from(stepA).on("*").to(stepB) .from(stepB).on("COMPLETED").to(stepC) .end() .build(); } // Split job for parallel execution @Bean public Job splitJob(JobRepository jobRepository, Step step1, Step step2, Step step3, Step step4) { Flow flow1 = new FlowBuilder<SimpleFlow>("flow1") .start(step1).next(step2).build(); Flow flow2 = new FlowBuilder<SimpleFlow>("flow2") .start(step3).next(step4).build(); return new JobBuilder("splitJob", jobRepository) .start(flow1) .split(new SimpleAsyncTaskExecutor()) .add(flow2) .end() .build(); } // Job with parameter validation and incrementer @Bean public Job validatedJob(JobRepository jobRepository, Step step) { return new JobBuilder("validatedJob", jobRepository) .validator(new DefaultJobParametersValidator( new String[]{"inputFile"}, // required new String[]{"outputFile"} // optional )) .incrementer(new RunIdIncrementer()) .preventRestart() .start(step) .build(); } } ``` ### StepBuilder Entry point for building steps. Supports tasklet steps, chunk-oriented steps, partitioned steps, and nested job steps. ```java @Configuration public class StepBuilderExamples { // Tasklet step for simple operations @Bean public Step taskletStep(JobRepository jobRepository) { return new StepBuilder("taskletStep", jobRepository) .tasklet((contribution, chunkContext) -> { // Access job parameters JobParameters params = chunkContext.getStepContext() .getStepExecution().getJobParameters(); String inputFile = params.getString("inputFile"); // Perform operation System.out.println("Processing file: " + inputFile); return RepeatStatus.FINISHED; }) .build(); } // Chunk-oriented step with fault tolerance @Bean public Step chunkStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, ItemReader<String> reader, ItemProcessor<String, String> processor, ItemWriter<String> writer) { return new StepBuilder("chunkStep", jobRepository) .<String, String>chunk(100) .transactionManager(transactionManager) .reader(reader) .processor(processor) .writer(writer) .faultTolerant() .skip(ValidationException.class) .skip(FlatFileParseException.class) .skipLimit(10) .retry(DeadlockLoserDataAccessException.class) .retryLimit(3) .listener(new CustomStepListener()) .build(); } // Partitioned step for parallel processing @Bean public Step partitionedStep(JobRepository jobRepository, Step workerStep, Partitioner partitioner) { return new StepBuilder("partitionedStep", jobRepository) .partitioner("workerStep", partitioner) .step(workerStep) .gridSize(10) .taskExecutor(new SimpleAsyncTaskExecutor()) .build(); } // Nested job step @Bean public Step nestedJobStep(JobRepository jobRepository, Job childJob, JobOperator jobOperator) { return new StepBuilder("nestedJobStep", jobRepository) .job(childJob) .operator(jobOperator) .build(); } } ``` ## Item Processing Interfaces ### ItemReader Reads items one at a time from a data source. Returns null when all items have been read. ```java @FunctionalInterface public interface ItemReader<T> { @Nullable T read() throws Exception; } // Custom ItemReader implementation public class ApiItemReader implements ItemReader<Customer> { private final RestTemplate restTemplate; private final String apiUrl; private int page = 0; private Iterator<Customer> currentBatch; public ApiItemReader(RestTemplate restTemplate, String apiUrl) { this.restTemplate = restTemplate; this.apiUrl = apiUrl; } @Override public Customer read() throws Exception { if (currentBatch == null || !currentBatch.hasNext()) { List<Customer> batch = fetchNextBatch(); if (batch.isEmpty()) { return null; // Signal end of data } currentBatch = batch.iterator(); } return currentBatch.next(); } private List<Customer> fetchNextBatch() { String url = apiUrl + "?page=" + page++ + "&size=100"; ResponseEntity<List<Customer>> response = restTemplate.exchange( url, HttpMethod.GET, null, new ParameterizedTypeReference<List<Customer>>() {} ); return response.getBody() != null ? response.getBody() : Collections.emptyList(); } } ``` ### ItemProcessor Transforms or filters items. Return null to filter out an item. ```java @FunctionalInterface public interface ItemProcessor<I, O> { @Nullable O process(I item) throws Exception; } // Transforming processor public class CustomerEnrichmentProcessor implements ItemProcessor<Customer, EnrichedCustomer> { private final CreditService creditService; public CustomerEnrichmentProcessor(CreditService creditService) { this.creditService = creditService; } @Override public EnrichedCustomer process(Customer customer) throws Exception { CreditScore score = creditService.getCreditScore(customer.getSsn()); return new EnrichedCustomer( customer.getId(), customer.getName(), customer.getEmail(), score.getValue(), score.getRating() ); } } // Filtering processor public class ActiveCustomerFilter implements ItemProcessor<Customer, Customer> { @Override public Customer process(Customer customer) { // Return null to filter out inactive customers return customer.isActive() ? customer : null; } } // Composite processor chaining multiple processors @Bean public ItemProcessor<Customer, EnrichedCustomer> compositeProcessor() { CompositeItemProcessor<Customer, EnrichedCustomer> processor = new CompositeItemProcessor<>(); processor.setDelegates(List.of( new ValidationProcessor(), new ActiveCustomerFilter(), new CustomerEnrichmentProcessor(creditService) )); return processor; } ``` ### ItemWriter Writes a chunk of items to a destination. ```java @FunctionalInterface public interface ItemWriter<T> { void write(Chunk<? extends T> chunk) throws Exception; } // Custom ItemWriter implementation public class ApiItemWriter implements ItemWriter<ProcessedOrder> { private final RestTemplate restTemplate; private final String apiUrl; public ApiItemWriter(RestTemplate restTemplate, String apiUrl) { this.restTemplate = restTemplate; this.apiUrl = apiUrl; } @Override public void write(Chunk<? extends ProcessedOrder> chunk) throws Exception { for (ProcessedOrder order : chunk) { HttpEntity<ProcessedOrder> request = new HttpEntity<>(order); ResponseEntity<Void> response = restTemplate.postForEntity( apiUrl + "/orders", request, Void.class ); if (!response.getStatusCode().is2xxSuccessful()) { throw new RuntimeException("Failed to submit order: " + order.getId()); } } } } // Composite writer for multiple destinations @Bean public ItemWriter<Customer> compositeWriter(ItemWriter<Customer> dbWriter, ItemWriter<Customer> cacheWriter, ItemWriter<Customer> auditWriter) { CompositeItemWriter<Customer> writer = new CompositeItemWriter<>(); writer.setDelegates(List.of(dbWriter, cacheWriter, auditWriter)); return writer; } ``` ## Built-in Item Readers ### FlatFileItemReader Reads items from delimited or fixed-length flat files. ```java // Delimited file reader (CSV) @Bean @StepScope public FlatFileItemReader<Customer> csvReader( @Value("#{jobParameters['inputFile']}") Resource resource) { return new FlatFileItemReaderBuilder<Customer>() .name("customerCsvReader") .resource(resource) .linesToSkip(1) // Skip header .delimited() .delimiter(",") .names("id", "firstName", "lastName", "email", "phone", "balance") .targetType(Customer.class) .strict(true) .build(); } // Fixed-length file reader @Bean @StepScope public FlatFileItemReader<LegacyRecord> fixedLengthReader( @Value("#{jobParameters['inputFile']}") Resource resource) { return new FlatFileItemReaderBuilder<LegacyRecord>() .name("legacyRecordReader") .resource(resource) .fixedLength() .columns(new Range(1, 10), new Range(11, 30), new Range(31, 45), new Range(46, 55)) .names("accountNumber", "customerName", "transactionDate", "amount") .targetType(LegacyRecord.class) .build(); } // Custom field set mapper for complex mappings @Bean public FlatFileItemReader<Order> orderReader(Resource resource) { return new FlatFileItemReaderBuilder<Order>() .name("orderReader") .resource(resource) .delimited() .names("orderId", "customerId", "productId", "quantity", "unitPrice", "orderDate") .fieldSetMapper(fieldSet -> { Order order = new Order(); order.setOrderId(fieldSet.readString("orderId")); order.setCustomerId(fieldSet.readLong("customerId")); order.setProductId(fieldSet.readString("productId")); order.setQuantity(fieldSet.readInt("quantity")); order.setUnitPrice(fieldSet.readBigDecimal("unitPrice")); order.setOrderDate(fieldSet.readDate("orderDate", "yyyy-MM-dd")); order.setTotalAmount(order.getUnitPrice() .multiply(BigDecimal.valueOf(order.getQuantity()))); return order; }) .build(); } ``` ### JdbcCursorItemReader Reads items from a database using a JDBC cursor. ```java @Bean @StepScope public JdbcCursorItemReader<Customer> jdbcCursorReader( DataSource dataSource, @Value("#{stepExecutionContext['minId']}") Long minId, @Value("#{stepExecutionContext['maxId']}") Long maxId) { return new JdbcCursorItemReaderBuilder<Customer>() .name("customerReader") .dataSource(dataSource) .sql("SELECT id, first_name, last_name, email, created_date " + "FROM customers WHERE id BETWEEN ? AND ? ORDER BY id") .preparedStatementSetter(ps -> { ps.setLong(1, minId); ps.setLong(2, maxId); }) .rowMapper((rs, rowNum) -> new Customer( rs.getLong("id"), rs.getString("first_name"), rs.getString("last_name"), rs.getString("email"), rs.getTimestamp("created_date").toLocalDateTime() )) .fetchSize(1000) .queryTimeout(300) .build(); } // Using DataClassRowMapper for automatic mapping @Bean public JdbcCursorItemReader<Product> productReader(DataSource dataSource) { return new JdbcCursorItemReaderBuilder<Product>() .name("productReader") .dataSource(dataSource) .sql("SELECT product_id, name, description, price, category FROM products") .rowMapper(new DataClassRowMapper<>(Product.class)) .build(); } ``` ### JdbcPagingItemReader Reads items from a database using pagination for better memory management with large datasets. ```java @Bean @StepScope public JdbcPagingItemReader<Order> jdbcPagingReader( DataSource dataSource, @Value("#{jobParameters['orderDate']}") LocalDate orderDate) { Map<String, Object> parameterValues = new HashMap<>(); parameterValues.put("orderDate", orderDate); return new JdbcPagingItemReaderBuilder<Order>() .name("orderPagingReader") .dataSource(dataSource) .selectClause("SELECT order_id, customer_id, total_amount, status") .fromClause("FROM orders") .whereClause("WHERE order_date = :orderDate") .sortKeys(Map.of("order_id", Order.ASCENDING)) .parameterValues(parameterValues) .rowMapper(new DataClassRowMapper<>(Order.class)) .pageSize(500) .build(); } ``` ### JsonItemReader Reads items from a JSON array file. ```java @Bean @StepScope public JsonItemReader<Transaction> jsonReader( @Value("#{jobParameters['inputFile']}") Resource resource) { return new JsonItemReaderBuilder<Transaction>() .name("transactionJsonReader") .resource(resource) .jsonObjectReader(new JacksonJsonObjectReader<>(Transaction.class)) .build(); } // With custom ObjectMapper configuration @Bean public JsonItemReader<Event> eventJsonReader(Resource resource) { ObjectMapper objectMapper = new ObjectMapper() .registerModule(new JavaTimeModule()) .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); JacksonJsonObjectReader<Event> jsonObjectReader = new JacksonJsonObjectReader<>(Event.class); jsonObjectReader.setMapper(objectMapper); return new JsonItemReaderBuilder<Event>() .name("eventReader") .resource(resource) .jsonObjectReader(jsonObjectReader) .build(); } ``` ## Built-in Item Writers ### FlatFileItemWriter Writes items to a flat file. ```java @Bean @StepScope public FlatFileItemWriter<Customer> csvWriter( @Value("#{jobParameters['outputFile']}") WritableResource resource) { return new FlatFileItemWriterBuilder<Customer>() .name("customerCsvWriter") .resource(resource) .delimited() .delimiter(",") .names("id", "firstName", "lastName", "email", "status") .headerCallback(writer -> writer.write("ID,First Name,Last Name,Email,Status")) .footerCallback(writer -> writer.write("# End of file")) .append(false) .build(); } // Custom line aggregator for complex formatting @Bean public FlatFileItemWriter<Report> reportWriter(WritableResource resource) { return new FlatFileItemWriterBuilder<Report>() .name("reportWriter") .resource(resource) .lineAggregator(report -> String.format( "%-20s | %10.2f | %s | %s", report.getName(), report.getAmount(), report.getDate().format(DateTimeFormatter.ISO_DATE), report.getStatus() )) .build(); } ``` ### JdbcBatchItemWriter Writes items to a database using batch SQL operations. ```java @Bean public JdbcBatchItemWriter<ProcessedCustomer> jdbcWriter(DataSource dataSource) { return new JdbcBatchItemWriterBuilder<ProcessedCustomer>() .dataSource(dataSource) .sql("INSERT INTO processed_customers " + "(customer_id, full_name, email, credit_score, processed_date) " + "VALUES (:customerId, :fullName, :email, :creditScore, :processedDate)") .beanMapped() .build(); } // Using ItemPreparedStatementSetter for custom parameter binding @Bean public JdbcBatchItemWriter<Order> orderWriter(DataSource dataSource) { return new JdbcBatchItemWriterBuilder<Order>() .dataSource(dataSource) .sql("UPDATE orders SET status = ?, processed_at = ?, processor_id = ? WHERE order_id = ?") .itemPreparedStatementSetter((order, ps) -> { ps.setString(1, order.getStatus().name()); ps.setTimestamp(2, Timestamp.valueOf(LocalDateTime.now())); ps.setString(3, order.getProcessorId()); ps.setLong(4, order.getOrderId()); }) .build(); } ``` ### JsonFileItemWriter Writes items to a JSON file. ```java @Bean @StepScope public JsonFileItemWriter<AuditRecord> jsonWriter( @Value("#{jobParameters['outputFile']}") WritableResource resource) { return new JsonFileItemWriterBuilder<AuditRecord>() .name("auditJsonWriter") .resource(resource) .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>()) .build(); } // With custom ObjectMapper @Bean public JsonFileItemWriter<Event> eventJsonWriter(WritableResource resource) { ObjectMapper objectMapper = new ObjectMapper() .registerModule(new JavaTimeModule()) .enable(SerializationFeature.INDENT_OUTPUT); JacksonJsonObjectMarshaller<Event> marshaller = new JacksonJsonObjectMarshaller<>(); marshaller.setObjectMapper(objectMapper); return new JsonFileItemWriterBuilder<Event>() .name("eventWriter") .resource(resource) .jsonObjectMarshaller(marshaller) .build(); } ``` ## JobRepository The central persistence interface for batch metadata. Supports querying job instances, executions, and managing execution state. ```java @Service public class BatchOperationsService { private final JobRepository jobRepository; private final JobOperator jobOperator; public BatchOperationsService(JobRepository jobRepository, JobOperator jobOperator) { this.jobRepository = jobRepository; this.jobOperator = jobOperator; } // Query job instances public List<JobInstance> getRecentJobInstances(String jobName, int count) { return jobRepository.getJobInstances(jobName, 0, count); } // Get job execution details public JobExecution getJobExecution(long executionId) { return jobRepository.getJobExecution(executionId); } // Find running executions public List<JobExecution> getRunningExecutions(String jobName) { return jobRepository.getJobInstances(jobName, 0, 100).stream() .flatMap(instance -> jobRepository.getJobExecutions(instance).stream()) .filter(execution -> execution.isRunning()) .collect(Collectors.toList()); } // Launch a new job public long startJob(String jobName, Properties parameters) throws Exception { return jobOperator.start(jobName, parameters); } // Restart a failed job public long restartJob(long executionId) throws Exception { return jobOperator.restart(executionId); } // Stop a running job public void stopJob(long executionId) throws Exception { jobOperator.stop(executionId); } // Abandon a failed job (cannot be restarted) public void abandonJob(long executionId) throws Exception { jobOperator.abandon(executionId); } } ``` ## Partitioning Enables parallel processing by splitting work across multiple threads or nodes. ```java // Custom Partitioner implementation public class ColumnRangePartitioner implements Partitioner { private final JdbcTemplate jdbcTemplate; private final String table; private final String column; public ColumnRangePartitioner(DataSource dataSource, String table, String column) { this.jdbcTemplate = new JdbcTemplate(dataSource); this.table = table; this.column = column; } @Override public Map<String, ExecutionContext> partition(int gridSize) { Long min = jdbcTemplate.queryForObject( "SELECT MIN(" + column + ") FROM " + table, Long.class); Long max = jdbcTemplate.queryForObject( "SELECT MAX(" + column + ") FROM " + table, Long.class); long targetSize = (max - min) / gridSize + 1; Map<String, ExecutionContext> partitions = new HashMap<>(); long start = min; long end = start + targetSize - 1; for (int i = 0; i < gridSize; i++) { ExecutionContext context = new ExecutionContext(); context.putLong("minValue", start); context.putLong("maxValue", Math.min(end, max)); partitions.put("partition" + i, context); start = end + 1; end = start + targetSize - 1; } return partitions; } } // Partitioned step configuration @Configuration public class PartitionedJobConfiguration { @Bean public Job partitionedJob(JobRepository jobRepository, Step managerStep) { return new JobBuilder("partitionedJob", jobRepository) .start(managerStep) .build(); } @Bean public Step managerStep(JobRepository jobRepository, Partitioner partitioner, Step workerStep) { return new StepBuilder("managerStep", jobRepository) .partitioner("workerStep", partitioner) .step(workerStep) .gridSize(10) .taskExecutor(new SimpleAsyncTaskExecutor()) .build(); } @Bean @StepScope public Step workerStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, @Value("#{stepExecutionContext['minValue']}") Long minValue, @Value("#{stepExecutionContext['maxValue']}") Long maxValue, ItemReader<Customer> reader, ItemWriter<Customer> writer) { return new StepBuilder("workerStep", jobRepository) .<Customer, Customer>chunk(100) .transactionManager(transactionManager) .reader(reader) .writer(writer) .build(); } @Bean public Partitioner customerPartitioner(DataSource dataSource) { return new ColumnRangePartitioner(dataSource, "customers", "id"); } } ``` ## Testing Utilities ### @SpringBatchTest Test annotation that provides testing utilities and scope management for batch tests. ```java @SpringBatchTest @SpringJUnitConfig(BatchJobConfiguration.class) class CustomerProcessingJobTest { @Autowired private JobOperatorTestUtils jobOperatorTestUtils; @Autowired private JobRepositoryTestUtils jobRepositoryTestUtils; @BeforeEach void setUp(@Autowired Job job) { jobOperatorTestUtils.setJob(job); jobRepositoryTestUtils.removeJobExecutions(); } @Test void testJobCompletesSuccessfully() throws Exception { // Given JobParameters params = new JobParametersBuilder() .addString("inputFile", "classpath:test-data/customers.csv") .addString("outputFile", "file:target/output/processed-customers.csv") .addLocalDateTime("runTime", LocalDateTime.now()) .toJobParameters(); // When JobExecution execution = jobOperatorTestUtils.startJob(params); // Then assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED); assertThat(execution.getExitStatus()).isEqualTo(ExitStatus.COMPLETED); StepExecution stepExecution = execution.getStepExecutions().iterator().next(); assertThat(stepExecution.getReadCount()).isEqualTo(100); assertThat(stepExecution.getWriteCount()).isEqualTo(95); assertThat(stepExecution.getFilterCount()).isEqualTo(5); } @Test void testSingleStepExecution() throws Exception { // Given JobParameters params = jobOperatorTestUtils.getUniqueJobParameters(); // When JobExecution execution = jobOperatorTestUtils.startStep("processCustomers", params); // Then assertThat(execution.getStatus()).isEqualTo(BatchStatus.COMPLETED); } @Test void testJobFailsOnInvalidData() throws Exception { // Given JobParameters params = new JobParametersBuilder() .addString("inputFile", "classpath:test-data/invalid-customers.csv") .toJobParameters(); // When JobExecution execution = jobOperatorTestUtils.startJob(params); // Then assertThat(execution.getStatus()).isEqualTo(BatchStatus.FAILED); assertThat(execution.getFailureExceptions()).isNotEmpty(); } } ``` ### Testing Step-Scoped Components Testing components that use `@StepScope` for late binding of job/step parameters. ```java @SpringBatchTest @SpringJUnitConfig(BatchJobConfiguration.class) class StepScopedComponentsTest { @Autowired private ApplicationContext applicationContext; @Test void testStepScopedReader() { // Create step execution context with parameters StepExecution stepExecution = MetaDataInstanceFactory.createStepExecution( new JobParametersBuilder() .addString("inputFile", "classpath:test-data/sample.csv") .addLong("minId", 1L) .addLong("maxId", 100L) .toJobParameters() ); // Execute within step scope StepSynchronizationManager.register(stepExecution); try { FlatFileItemReader<Customer> reader = applicationContext.getBean( "customerReader", FlatFileItemReader.class); reader.open(stepExecution.getExecutionContext()); Customer first = reader.read(); assertThat(first).isNotNull(); assertThat(first.getId()).isGreaterThanOrEqualTo(1L); reader.close(); } finally { StepSynchronizationManager.release(); } } @Test void testStepScopedProcessor() { StepExecution stepExecution = MetaDataInstanceFactory.createStepExecution( new JobParametersBuilder() .addDouble("discountRate", 0.15) .toJobParameters() ); StepSynchronizationManager.register(stepExecution); try { ItemProcessor<Order, Order> processor = applicationContext.getBean( "discountProcessor", ItemProcessor.class); Order input = new Order(1L, BigDecimal.valueOf(100.00)); Order output = processor.process(input); assertThat(output.getAmount()).isEqualByComparingTo(BigDecimal.valueOf(85.00)); } finally { StepSynchronizationManager.release(); } } } ``` ## Complete Job Example A comprehensive example showing all major features working together. ```java @Configuration @EnableBatchProcessing @EnableJdbcJobRepository(dataSourceRef = "batchDataSource") public class CustomerMigrationJobConfiguration { @Bean public DataSource batchDataSource() { return new EmbeddedDatabaseBuilder() .setType(EmbeddedDatabaseType.H2) .addScript("/org/springframework/batch/core/schema-h2.sql") .addScript("/schema/customers.sql") .build(); } @Bean public JdbcTransactionManager transactionManager(DataSource batchDataSource) { return new JdbcTransactionManager(batchDataSource); } @Bean public Job customerMigrationJob(JobRepository jobRepository, Step validateStep, Step migrateStep, Step reportStep) { return new JobBuilder("customerMigrationJob", jobRepository) .validator(new DefaultJobParametersValidator( new String[]{"inputFile", "outputFile"}, new String[]{"skipLimit"} )) .incrementer(new RunIdIncrementer()) .listener(new JobExecutionListener() { @Override public void beforeJob(JobExecution jobExecution) { System.out.println("Starting job: " + jobExecution.getJobInstance().getJobName()); } @Override public void afterJob(JobExecution jobExecution) { System.out.printf("Job completed with status: %s%n", jobExecution.getStatus()); jobExecution.getStepExecutions().forEach(step -> System.out.printf(" Step %s: read=%d, written=%d, skipped=%d%n", step.getStepName(), step.getReadCount(), step.getWriteCount(), step.getSkipCount() ) ); } }) .start(validateStep) .next(migrateStep) .next(reportStep) .build(); } @Bean public Step validateStep(JobRepository jobRepository) { return new StepBuilder("validateStep", jobRepository) .tasklet((contribution, chunkContext) -> { String inputFile = chunkContext.getStepContext() .getJobParameters().get("inputFile").toString(); Resource resource = new FileSystemResource(inputFile); if (!resource.exists()) { throw new IllegalArgumentException("Input file not found: " + inputFile); } System.out.println("Validation passed for: " + inputFile); return RepeatStatus.FINISHED; }) .build(); } @Bean public Step migrateStep(JobRepository jobRepository, PlatformTransactionManager transactionManager, ItemReader<Customer> customerReader, ItemProcessor<Customer, MigratedCustomer> customerProcessor, ItemWriter<MigratedCustomer> customerWriter) { return new StepBuilder("migrateStep", jobRepository) .<Customer, MigratedCustomer>chunk(500) .transactionManager(transactionManager) .reader(customerReader) .processor(customerProcessor) .writer(customerWriter) .faultTolerant() .skip(ValidationException.class) .skip(FlatFileParseException.class) .skipLimit(100) .retry(DeadlockLoserDataAccessException.class) .retryLimit(3) .listener(new StepExecutionListener() { @Override public ExitStatus afterStep(StepExecution stepExecution) { if (stepExecution.getSkipCount() > 0) { return new ExitStatus("COMPLETED_WITH_SKIPS"); } return stepExecution.getExitStatus(); } }) .build(); } @Bean @StepScope public FlatFileItemReader<Customer> customerReader( @Value("#{jobParameters['inputFile']}") Resource resource) { return new FlatFileItemReaderBuilder<Customer>() .name("customerReader") .resource(resource) .linesToSkip(1) .delimited() .names("id", "firstName", "lastName", "email", "phone", "createdDate") .targetType(Customer.class) .build(); } @Bean public ItemProcessor<Customer, MigratedCustomer> customerProcessor() { return customer -> { // Validation if (customer.getEmail() == null || !customer.getEmail().contains("@")) { throw new ValidationException("Invalid email: " + customer.getEmail()); } // Transformation return new MigratedCustomer( customer.getId(), customer.getFirstName() + " " + customer.getLastName(), customer.getEmail().toLowerCase(), customer.getPhone(), LocalDateTime.now(), "ACTIVE" ); }; } @Bean @StepScope public FlatFileItemWriter<MigratedCustomer> customerWriter( @Value("#{jobParameters['outputFile']}") WritableResource resource) { return new FlatFileItemWriterBuilder<MigratedCustomer>() .name("customerWriter") .resource(resource) .delimited() .names("id", "fullName", "email", "phone", "migratedAt", "status") .headerCallback(writer -> writer.write("id,fullName,email,phone,migratedAt,status")) .build(); } @Bean public Step reportStep(JobRepository jobRepository) { return new StepBuilder("reportStep", jobRepository) .tasklet((contribution, chunkContext) -> { JobExecution jobExecution = chunkContext.getStepContext() .getStepExecution().getJobExecution(); StepExecution migrateStep = jobExecution.getStepExecutions().stream() .filter(s -> s.getStepName().equals("migrateStep")) .findFirst() .orElseThrow(); System.out.println("\n=== Migration Report ==="); System.out.printf("Records read: %d%n", migrateStep.getReadCount()); System.out.printf("Records written: %d%n", migrateStep.getWriteCount()); System.out.printf("Records skipped: %d%n", migrateStep.getSkipCount()); System.out.printf("Duration: %d seconds%n", Duration.between(migrateStep.getStartTime(), migrateStep.getEndTime()).toSeconds()); System.out.println("========================\n"); return RepeatStatus.FINISHED; }) .build(); } } ``` ## Summary Spring Batch is the standard framework for batch processing in Java/Spring applications. Its main use cases include ETL (Extract-Transform-Load) operations, data migration between systems, report generation, file processing (CSV, XML, JSON), database synchronization, and scheduled data processing jobs. The framework excels at handling large volumes of data reliably through its chunk-oriented processing model, which provides automatic transaction management, restart capabilities, and fault tolerance through skip and retry policies. Integration patterns commonly used with Spring Batch include Spring Integration for event-driven batch triggering, Spring Cloud Task for cloud-native batch execution, Spring Scheduler for time-based job scheduling, and Kubernetes/container orchestration for distributed batch processing. The framework's modular architecture allows readers and writers to be mixed and matched (file to database, database to API, API to file, etc.), while the partitioning feature enables parallel processing across multiple threads or distributed nodes. For testing, the `@SpringBatchTest` annotation and associated utilities provide comprehensive support for unit and integration testing of batch jobs, steps, and individual components.