AsyncItemProcessor Spring Batch Example

AsyncItemProcessor Spring Batch Example

In this tutorial, we will see AsyncItemProcessor Spring Batch Example using MySql. We will read data from the MySQL database and write it to a CSV file using Async ItemProcessor.

Consider we have few records in DataBase as below.

AsyncItemProcessor Spring Batch Example
Record in Db

In this AsyncItemProcessor Spring Batch example, We will read data using JpaPagingItemReader and also sort on the basis of id and then write to excel


AsyncItemProcessor Spring Batch Example

Before going ahead let’s create a student table

CREATE TABLE student
(
    id int NOT NULL PRIMARY KEY AUTO_INCREMENT,
    name VARCHAR(260),
    roll_number VARCHAR(260)
);

Note – We need to add the below dependency in order to use AsyncItemProcessor.

        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>
        </dependency>

The directory structure of the example


Spring Batch AsyncItemProcessor Example

Define pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.netsurfingzone.com</groupId>
    <artifactId>springbatchasyncitemprocessor</artifactId>
    <version>1.0-SNAPSHOT</version>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
    </parent>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-core</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-batch</artifactId>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-integration</artifactId>

        </dependency>
        <!-- Thanks for using https://jar-download.com -->
    </dependencies>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

</project>

Define Student.java class


package com.springbatchexample.entity;

public class Student {

    private int id;
    private String name;
    private String rollNumber;

    public Student() {

    }

    public Student(int id, String name, String rollNumber) {
        this.id = id;
        this.name = name;
        this.rollNumber = rollNumber;
    }

    //Getter-Setter

}

Create async reader


    @Bean
    public ItemReader<Student> asyncreader() {
        JdbcPagingItemReaderBuilder jdbcPagingItemReaderBuilder = new JdbcPagingItemReaderBuilder();
        jdbcPagingItemReaderBuilder.name("reader");
        jdbcPagingItemReaderBuilder.dataSource(dataSource);
        jdbcPagingItemReaderBuilder.selectClause("select id, roll_number, name ");
        jdbcPagingItemReaderBuilder.fromClause("FROM student ");
        jdbcPagingItemReaderBuilder.sortKeys(Collections.singletonMap("id", Order.ASCENDING));
        jdbcPagingItemReaderBuilder.rowMapper(new StudentResultRowMapper());
        return jdbcPagingItemReaderBuilder.build();
    }

Create asyncItemProcessor

    @Bean
    public ItemProcessor<Student, Student> studentItemProcessor() {
        return new StudentItemProcessor();
    }

    @Bean
    public ItemProcessor<Student, Future<Student>> asyncItemProcessor() {
        AsyncItemProcessor<Student, Student> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(studentItemProcessor());
        asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
        return asyncItemProcessor;
    }

Create asyncWriter

    @Bean
    public ItemWriter<Future<Student>> asyncItemWriter() {
        AsyncItemWriter<Student> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(writer());
        return asyncItemWriter;
    }

    @Bean
    public FlatFileItemWriter<Student> writer() {
        FlatFileItemWriter<Student> writer = new FlatFileItemWriter<>();
        writer.setResource(new FileSystemResource("C://data/batch/data.csv"));
        writer.setLineAggregator(getDelimitedLineAggregator());
        return writer;
    }

    private DelimitedLineAggregator<Student> getDelimitedLineAggregator() {
        BeanWrapperFieldExtractor<Student> beanWrapperFieldExtractor = new BeanWrapperFieldExtractor<Student>();
        beanWrapperFieldExtractor.setNames(new String[]{"id", "rollNumber", "name"});

        DelimitedLineAggregator<Student> aggregator = new DelimitedLineAggregator<Student>();
        aggregator.setDelimiter(",");
        aggregator.setFieldExtractor(beanWrapperFieldExtractor);
        return aggregator;

    }

Create task executor

    @Bean(name = "asyncExecutor")
    public TaskExecutor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(100);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(100);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("netsurfingzone 1-");
        return executor;
    }

Create Step and Job

    @Bean
    public Step asyncStep1() {
        StepBuilder stepBuilder = stepBuilderFactory.get("asyncStep1");
        SimpleStepBuilder<Student, Future<Student>> simpleStepBuilder = stepBuilder.chunk(10);
        return simpleStepBuilder.reader(asyncreader()).processor(asyncItemProcessor()).writer(asyncItemWriter()).build();
    }

    @Bean
    public Job asyncJob() {
        JobBuilder jobBuilder = jobBuilderFactory.get("asyncJob");
        jobBuilder.incrementer(new RunIdIncrementer());
        FlowJobBuilder flowJobBuilder = jobBuilder.flow(asyncStep1()).end();
        Job job = flowJobBuilder.build();
        return job;
    }

We also need RowMapper

@Component
public class StudentResultRowMapper implements RowMapper<Student> {
    @Override
    public Student mapRow(ResultSet rs, int i) throws SQLException {
        Student student = new Student();
        student.setId(rs.getInt("id"));
        student.setRollNumber(rs.getString("roll_number"));
        student.setName(rs.getString("name"));
        return student;
    }
}

Let’s keep all configurations code together in the SpringBatchConfig file.

@EnableBatchProcessing
@Configuration
public class SpringBatchConfig {


    @Autowired
    private DataSource dataSource;


    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public ItemReader<Student> asyncreader() {
        JdbcPagingItemReaderBuilder jdbcPagingItemReaderBuilder = new JdbcPagingItemReaderBuilder();
        jdbcPagingItemReaderBuilder.name("reader");
        jdbcPagingItemReaderBuilder.dataSource(dataSource);
        jdbcPagingItemReaderBuilder.selectClause("select id, roll_number, name ");
        jdbcPagingItemReaderBuilder.fromClause("FROM student ");
        jdbcPagingItemReaderBuilder.sortKeys(Collections.singletonMap("id", Order.ASCENDING));
        jdbcPagingItemReaderBuilder.rowMapper(new StudentResultRowMapper());
        return jdbcPagingItemReaderBuilder.build();
    }

    @Bean
    public ItemProcessor<Student, Student> studentItemProcessor() {
        return new StudentItemProcessor();
    }

    @Bean
    public ItemProcessor<Student, Future<Student>> asyncItemProcessor() {
        AsyncItemProcessor<Student, Student> asyncItemProcessor = new AsyncItemProcessor<>();
        asyncItemProcessor.setDelegate(studentItemProcessor());
        asyncItemProcessor.setTaskExecutor(getAsyncExecutor());
        return asyncItemProcessor;
    }

    @Bean
    public ItemWriter<Future<Student>> asyncItemWriter() {
        AsyncItemWriter<Student> asyncItemWriter = new AsyncItemWriter<>();
        asyncItemWriter.setDelegate(writer());
        return asyncItemWriter;
    }

    @Bean
    public FlatFileItemWriter<Student> writer() {
        FlatFileItemWriter<Student> writer = new FlatFileItemWriter<>();
        writer.setResource(new FileSystemResource("C://data/batch/data.csv"));
        writer.setLineAggregator(getDelimitedLineAggregator());
        return writer;
    }

    private DelimitedLineAggregator<Student> getDelimitedLineAggregator() {
        BeanWrapperFieldExtractor<Student> beanWrapperFieldExtractor = new BeanWrapperFieldExtractor<Student>();
        beanWrapperFieldExtractor.setNames(new String[]{"id", "rollNumber", "name"});

        DelimitedLineAggregator<Student> aggregator = new DelimitedLineAggregator<Student>();
        aggregator.setDelimiter(",");
        aggregator.setFieldExtractor(beanWrapperFieldExtractor);
        return aggregator;

    }

    @Bean(name = "asyncExecutor")
    public TaskExecutor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(100);
        executor.setMaxPoolSize(100);
        executor.setQueueCapacity(100);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setThreadNamePrefix("netsurfingzone 1-");
        return executor;
    }

    @Bean
    public Step asyncStep1() {
        StepBuilder stepBuilder = stepBuilderFactory.get("asyncStep1");
        SimpleStepBuilder<Student, Future<Student>> simpleStepBuilder = stepBuilder.chunk(10);
        return simpleStepBuilder.reader(asyncreader()).processor(asyncItemProcessor()).writer(asyncItemWriter()).build();
    }

    @Bean
    public Job asyncJob() {
        JobBuilder jobBuilder = jobBuilderFactory.get("asyncJob");
        jobBuilder.incrementer(new RunIdIncrementer());
        FlowJobBuilder flowJobBuilder = jobBuilder.flow(asyncStep1()).end();
        Job job = flowJobBuilder.build();
        return job;
    }
}

Define SpringMain.class

package com.springbatchexample.main;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan(basePackages = "com.springbatchexample.*")
public class SpringMain {
    public static void main(String[] args) {
        SpringApplication.run(SpringMain.class, args);
    }
}

application.properties

spring.datasource.url=jdbc:mysql://localhost:3306/springbootcrudexample
spring.datasource.username=root
spring.datasource.password=root
spring.jpa.hibernate.ddl-auto=create
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.format_sql=true
server.port = 9091
spring.batch.initialize-schema=always	

That’s all about AsyncItemProcessor Spring Batch Example.

Download code from github.

See Spring batch AsyncItemProcessor example docs.

Other Spring Batch Examples.