CompositeItemProcessor Spring Batch Example

In this tutorial, we are going to see CompositeItemProcessor Spring Batch Example using the MySql database. The CompositeItemProcessor is mainly used for item processor chaining. Using CompositeItemProcessor we can configure multiple item processors. If we modified items in one item processor, the same modified item will be passed to the next item processor. Let’s see what we are going to do in this tutorial.

We will read data from the database using JdbcCursorItemReader and write it to the CSV file. While writing the data to CSV we will modify some items (i.e Student) on basis of certain conditions.

High-level overview of Spring Batch CompositeItemProcessor example


CompositeItemProcessor Spring Batch Example

Define first ItemProcessor i.e StudentNameChangeItemProcessor class

public class StudentNameChangeItemProcessor implements ItemProcessor {

    @Override
    public Student process(Object item) throws Exception {
        Student student= (Student)item;
        if(student.getName().equals("arya")){
            student.setName("stark");
        }
        return student;
    }
}

Define second ItemProcessor StudentNameChangeItemProcessor class

public class StudentRollnumberChangeItemProcessor implements ItemProcessor {
    @Override
    public Student process(Object item) throws Exception {
        Student student= (Student)item;
        if(student.getRollNumber().equals("0135")){
            student.setRollNumber("0136");
        }
        return student;
    }
}

Now we have two item processors. Let’s configure these item processors using CompositeItemProcessor.


    @Bean
    public CompositeItemProcessor compositeProcessor() {
        List<ItemProcessor> itemProcessors = new ArrayList<>(2);

        itemProcessors.add(new StudentNameChangeItemProcessor());
        itemProcessors.add(new StudentRollnumberChangeItemProcessor());

        CompositeItemProcessor processor = new CompositeItemProcessor();
        processor.setDelegates(itemProcessors);

        return processor;
    }

And finally, we need to call this composite item processor.

    @Bean
    public Step getDbToCsvStep() {
        StepBuilder stepBuilder = stepBuilderFactory.get("getDbToCsvStep");
        SimpleStepBuilder<Student, Student> simpleStepBuilder = stepBuilder.chunk(1);
        return simpleStepBuilder.reader(reader()).processor(compositeProcessor()).writer(writer()).build();
    }

Let’s see the complete CompositeItemProcessor Spring Batch Example using the MySql database from scratch.


maven dependency

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
    </dependencies>

Package structure

CompositeItemProcessor Spring Batch
CompositeItemProcessor spring batch example

Define Student.java

package com.springbatchexample.entity;

public class Student {

    private Long id;
    private String name;
    private String rollNumber;

    public Student() {

    }

    public Student(Long id, String name, String rollNumber) {
        this.id = id;
        this.name = name;
        this.rollNumber = rollNumber;
    }


    //getter & setter
}

Define ItemProcessores

StudentNameChangeItemProcessor.java
package com.springbatchexample.component;

import com.springbatchexample.entity.Student;
import org.springframework.batch.item.ItemProcessor;

public class StudentNameChangeItemProcessor implements ItemProcessor {

    @Override
    public Student process(Object item) throws Exception {
        Student student= (Student)item;
        if(student.getName().equals("arya")){
            student.setName("stark");
        }
        return student;
    }
}
StudentRollnumberChangeItemProcessor.java
package com.springbatchexample.component;

import com.springbatchexample.entity.Student;
import org.springframework.batch.item.ItemProcessor;

public class StudentRollnumberChangeItemProcessor implements ItemProcessor {
    @Override
    public Student process(Object item) throws Exception {
        Student student= (Student)item;
        if(student.getRollNumber().equals("0135")){
            student.setRollNumber("0136");
        }
        return student;
    }
}

Define RowMapper

package com.springbatchexample.component;

import com.springbatchexample.entity.Student;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.stereotype.Component;

import java.sql.ResultSet;
import java.sql.SQLException;

@Component
public class StudentResultRowMapper implements RowMapper<Student> {
    @Override
    public Student mapRow(ResultSet rs, int i) throws SQLException {
        Student student = new Student();
        student.setId(rs.getLong("id"));
        student.setRollNumber(rs.getString("roll_number"));
        student.setName(rs.getString("name"));
        return student;
    }
}

Define SpringBatchConfig.java

package com.springbatchexample.config;

import com.springbatchexample.component.StudentNameChangeItemProcessor;
import com.springbatchexample.component.StudentResultRowMapper;
import com.springbatchexample.component.StudentRollnumberChangeItemProcessor;
import com.springbatchexample.entity.Student;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.job.builder.FlowJobBuilder;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;

import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.List;

@EnableBatchProcessing
@Configuration
public class SpringBatchConfig {

    @Autowired
    private DataSource dataSource;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public JdbcCursorItemReader<Student> reader() {
        JdbcCursorItemReader<Student> reader = new JdbcCursorItemReader<>();
        reader.setDataSource(dataSource);
        reader.setSql("select id, roll_number, name from student");
        reader.setRowMapper(new StudentResultRowMapper());
        return reader;
    }

    @Bean
    public FlatFileItemWriter<Student> writer() {
        FlatFileItemWriter<Student> writer = new FlatFileItemWriter<>();
        writer.setResource(new FileSystemResource("C://data/batch/data.csv"));
        writer.setLineAggregator(getDelimitedLineAggregator());
        return writer;
    }

    private DelimitedLineAggregator<Student> getDelimitedLineAggregator() {
        BeanWrapperFieldExtractor<Student> beanWrapperFieldExtractor = new BeanWrapperFieldExtractor<Student>();
        beanWrapperFieldExtractor.setNames(new String[]{"id", "rollNumber", "name"});

        DelimitedLineAggregator<Student> aggregator = new DelimitedLineAggregator<Student>();
        aggregator.setDelimiter(",");
        aggregator.setFieldExtractor(beanWrapperFieldExtractor);
        return aggregator;

    }

    @Bean
    public Step getDbToCsvStep() {
        StepBuilder stepBuilder = stepBuilderFactory.get("getDbToCsvStep");
        SimpleStepBuilder<Student, Student> simpleStepBuilder = stepBuilder.chunk(1);
        return simpleStepBuilder.reader(reader()).processor(compositeProcessor()).writer(writer()).build();
    }

    @Bean
    public Job dbToCsvJob() {
        JobBuilder jobBuilder = jobBuilderFactory.get("dbToCsvJob");
        jobBuilder.incrementer(new RunIdIncrementer());
        FlowJobBuilder flowJobBuilder = jobBuilder.flow(getDbToCsvStep()).end();
        Job job = flowJobBuilder.build();
        return job;
    }

    @Bean
    public CompositeItemProcessor compositeProcessor() {
        List<ItemProcessor> itemProcessors = new ArrayList<>(2);

        itemProcessors.add(new StudentNameChangeItemProcessor());
        itemProcessors.add(new StudentRollnumberChangeItemProcessor());

        CompositeItemProcessor processor = new CompositeItemProcessor();
        processor.setDelegates(itemProcessors);

        return processor;
    }
}

That’s all about CompositeItemProcessor Spring Batch Example.

Download source code from git.

Note – For more details about CompositeItemProcessor spring batch example docs.

Other Spring Batch tutorials.

Spring Data JPA tutorial.

Hibernate example using spring boot.