8.Spring Batch의 Chunk와 ItemReader
Spring Batch

8.Spring Batch의 Chunk와 ItemReader

반응형

스프링 배치 청크 프로세스

기본개념

  • 2차원 데이터(표)로 표현된 유형의 파일을 처리하는 ItemReader
  • 일반적으로 고정 위치로 정의된 데이터 필드나 특수 문자에 의해 구별된 데이터의 행을 읽는다
  • Resource 와 LineMapper 두 가지 요소가 필요하다

구조

Resource

  • FileSystemResource – new FileSystemResource(“resource/path/config.xml”)
  • ClassPathResource – new ClassPathResource(“classpath:path/config.xml)

LineMapper

  • 파일의 라인 한줄을 Object 로 변환해서 FlatFileItemReader 로 리턴한다
  • 단순히 문자열을 받기 때문에 문자열을 토큰화해서 객체로 매핑하는 과정이 필요하다
  • LineTokenizer 와 FieldSetMapper 를 사용해서 처리한다
  • FieldSet
    • 라인을 필드로 구분해서 만든 배열 토큰을 전달하면 토큰 필드를 참조 할수 있도록 한다
    • JDBC 의 ResultSet 과 유사하다 ex) fs.readString(0), fs.readString(“name”)
  • LineTokenizer
    • 입력받은 라인을 FieldSet 으로 변환해서 리턴한다
    • 파일마다 형식이 다르기 때문에 문자열을 FieldSet 으로 변환하는 작업을 추상화시켜야 한다.
  • FieldSetMapper
    • FieldSet 객체를 받아서 원하는 객체로 매핑해서 리턴한다
    • JdbcTemplate 의 RowMapper 와 동일한 패턴을 사용한다

예제 코드

package com.example.springbatch_8_1_flatfileitemreader;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

import java.util.List;

/**
 * packageName    : com.example.springbatch_8_1_flatfileitemreader
 * fileName       : FlatFilesConfiguration
 * author         : namhyeop
 * date           : 2022/08/08
 * description    :
 * FlatFilesItemReader 예제
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/08        namhyeop       최초 생성
 */

@RequiredArgsConstructor
@Configuration
public class FlatFilesConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .start(step1())
                .next(step2())
                .build();
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<String, String>chunk(5)
                .reader(itemReader())
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        System.out.println("items = " + items);
                    }
                }).build();
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step2 has executed");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    @Bean
    public ItemReader itemReader(){
        FlatFileItemReader<Customer> itemReader = new FlatFileItemReader<Customer>();
        itemReader.setResource(new ClassPathResource("/customer.csv"));

        DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
        lineMapper.setFieldSetMapper(new CustomerFieldSetMapper());

        itemReader.setLineMapper(lineMapper);
        //customer.csv 맨윗줄 한줄 생략
        itemReader.setLinesToSkip(1);
        return itemReader;
    }
}
package com.example.springbatch_8_1_flatfileitemreader;

import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.LineTokenizer;

/**
 * packageName    : com.example.springbatch_8_1_flatfileitemreader
 * fileName       : DefaultLineMapper
 * author         : namhyeop
 * date           : 2022/08/08
 * description    :
 * DefaultLineMapper 예제, customer의 정보를 읽고 token화 하기 위해서 필요
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/08        namhyeop       최초 생성
 */
public class DefaultLineMapper <T> implements LineMapper {

    private LineTokenizer tokenizer;
    private FieldSetMapper<T> fieldSetMapper;

    @Override
    public T mapLine(String line, int lineNumber) throws Exception{
        return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
    }

    public void setLineTokenizer(LineTokenizer tokenizer){
        this.tokenizer = tokenizer;
    }

    public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper){
        this.fieldSetMapper = fieldSetMapper;
    }
}
package com.example.springbatch_8_1_flatfileitemreader;

import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

/**
 * packageName    : com.example.springbatch_8_1_flatfileitemreader
 * fileName       : CustomerFieldSetMapper
 * author         : namhyeop
 * date           : 2022/08/08
 * description    :
 * Customer 정보를 읽는 자료형 설정
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/08        namhyeop       최초 생성
 */
public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
    @Override
    public Customer mapFieldSet(FieldSet fs) throws BindException {
        if(fs == null){
            return null;
        }
        Customer customer = new Customer();
        customer.setName(fs.readString(0));
        customer.setAge(fs.readInt(1));
        customer.setYear(fs.readString(2));
        return customer;
    }
}
package com.example.springbatch_8_1_flatfileitemreader;

import lombok.Data;

/**
 * packageName    : com.example.springbatch_8_1_flatfileitemreader
 * fileName       : Customer
 * author         : namhyeop
 * date           : 2022/08/08
 * description    :
 * Customer Dto
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/08d        namhyeop       최초 생성
 */
@Data
public class Customer {
    private String name;
    private int age;
    private String year;
}

DelimitedLineTokenizer

기본개념

  • 한 개 라인의 String을 구분자 기준으로 나누어 토큰화 하는 방식

구조

예제 코드

package com.example.springbatch_8_2_flatfileitemreader_delimetedlinetokenizer;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

import java.util.List;

/**
 * packageName    : com.example.springbatch_8_1_flatfileitemreader
 * fileName       : FlatFilesConfiguration
 * author         : namhyeop
 * date           : 2022/08/08
 * description    :
 * FlatFilesItemReader-delimetedlinetokenizer 예제
 * 이전 8-1에서는 직접 delimitedlinetokenizer를 구현해서 사용했다면 이번에는 실제 존재하는 라이브러리를 itemReadr에 적용한다.
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/08        namhyeop       최초 생성
 */

@RequiredArgsConstructor
@Configuration
public class FlatFilesConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .start(step1())
                .next(step2())
                .build();
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<String, String>chunk(3)
                .reader(itemReader())
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List<? extends String> items) throws Exception {
                        System.out.println("items = " + items);
                    }
                }).build();
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step2 has executed");
                    return RepeatStatus.FINISHED;
                }).build();
    }

    @Bean
    public ItemReader itemReader(){
        return new FlatFileItemReaderBuilder<Customer>()
                .name("flatFile")
                .resource(new ClassPathResource("customer.csv"))
                //1.아래의 직접 만든 CustomerFieldSeMapper를 사용하지 않고 Batch의 BeanWrapperFieldSetMapper를 사용
                .fieldSetMapper(new BeanWrapperFieldSetMapper<>())
                //2.BeanWrapperFieldSetMapper를 사용한다면 targetType을 반드시 명시해줘야한다.
                .targetType(Customer.class)
//                .fieldSetMapper(new CustomerFieldSetMapper())
                .linesToSkip(1)
                .delimited().delimiter(",")
                .names("name","age","year")
                .build();
    }
}

 

package com.example.springbatch_8_2_flatfileitemreader_delimetedlinetokenizer;

import lombok.Data;

/**
 * packageName    : com.example.springbatch_8_1_flatfileitemreader
 * fileName       : Customer
 * author         : namhyeop
 * date           : 2022/08/08
 * description    :
 * Customer Dto
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/08d        namhyeop       최초 생성
 */
@Data
public class Customer {
    private String name;
    private int age;
    private String year;
}

FixedLengthTokenizer

기본개념

  • 한 개 라인의 String을 사용자가 설정한 고정길이 기준으로 나누어 토큰화 하는 방식
  • 범위는 문자열 형식으로 설정 할 수 있다
    • “1-4"또는 "1-3,4-6,7"또는 "1-2,4-5,7-10”
    • 마직막 범위가 열려 있으면 나머지 행이 해당 열로 읽혀진다

구조

package com.example.springbatch_8_3_fixedlengthtokenizer;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.transform.Range;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;

import java.util.List;

/**
 * packageName    : com.example.springbatch_8_3_fixedlengthtokenizer
 * fileName       : FlatFilesFixedLengthConfiguration
 * author         : namhyeop
 * date           : 2022/08/11
 * description    :
 * Reader를 문자열의 길이로 읽어오는 예
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/11        namhyeop       최초 생성
 */
@RequiredArgsConstructor
@Configuration
public class FlatFilesFixedLengthConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .start(step1())
                .next(step2())
                .build();
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<String,String>chunk(3)
                .reader(itemReader())
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List items) throws Exception {
                        items.forEach(item -> System.out.println(item));
                        System.out.println("==========");
                    }
                }).build();
    }

    public FlatFileItemReader itemReader(){
        return new FlatFileItemReaderBuilder<Customer>()
                .name("flatFile")
                .resource(new FileSystemResource("/Users/namhyeop/Desktop/git자료/Spring_Boot_Study/8.Spring_Batch/SpringBatch_8_3_fixedlengthtokenizer/src/main/resources/customer.txt"))
                .fieldSetMapper(new BeanWrapperFieldSetMapper<>())
                .targetType(Customer.class)
                .linesToSkip(1)
                .fixedLength()
                .addColumns(new Range(1,5))
                .addColumns(new Range(6,9))
                .addColumns(new Range(10, 11))
                .names("name", "year", "age")
                .build();
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step2 has executeed");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
}
package com.example.springbatch_8_3_fixedlengthtokenizer;

import lombok.Data;

/**
 * packageName    : com.example.springbatch_8_3_fixedlengthtokenizer
 * fileName       : Customer
 * author         : namhyeop
 * date           : 2022/08/11
 * description    :
 * Customer Dto
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/11        namhyeop       최초 생성
 */
@Data
public class Customer {
    private String name;
    private int age;
    private String year;
}

Exception Handling

기본개념

  • 라인을 읽거나 토큰화 할 때 발생하는 Parsing 예외를 처리할 수 있도록 예외 계층 제공
  • 토큰화 검증을 엄격하게 적용하지 않도록 설정하면 Parsing 예외가 발생하지 않도록 할 수 있

토큰화 검증 기준 설정

  • LineTokenizer 의 Strict 속성을 false 로 설정하게 되면 Tokenizer 가 라인 길이를 검증하지 않는다
  • Tokenizer 가 라인 길이나 컬럼명을 검증하지 않을 경우 예외가 발생하지 않는다
  • FieldSet 은 성공적으로 리턴이 되며 두번째 범위 값은 빈 토큰을 가지게 된다

예제코드

package com.example.springbatch_8_3_fixedlengthtokenizer;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.builder.FlatFileItemReaderBuilder;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.transform.Range;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.FileSystemResource;

import java.util.List;

/**
 * packageName    : com.example.springbatch_8_3_fixedlengthtokenizer
 * fileName       : FlatFilesFixedLengthConfiguration
 * author         : namhyeop
 * date           : 2022/08/11
 * description    :
 * Exception 설정을 꺼봄으로써 Exeption 예외처리의 존재를 확인하는 예제
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/11        namhyeop       최초 생성
 */
@RequiredArgsConstructor
@Configuration
public class FlatFilesFixedLengthConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .start(step1())
                .next(step2())
                .build();
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<String,String>chunk(3)
                .reader(itemReader())
                .writer(new ItemWriter<String>() {
                    @Override
                    public void write(List items) throws Exception {
                        items.forEach(item -> System.out.println(item));
                        System.out.println("==========");
                    }
                }).build();
    }

    public FlatFileItemReader itemReader(){
        return new FlatFileItemReaderBuilder<Customer>()
                .name("flatFile")
                .resource(new FileSystemResource("/Users/namhyeop/Desktop/git자료/Spring_Boot_Study/8.Spring_Batch/SpringBatch_8_3_fixedlengthtokenizer/src/main/resources/customer.txt"))
                .fieldSetMapper(new BeanWrapperFieldSetMapper<>())
                .targetType(Customer.class)
                .linesToSkip(1)
                .fixedLength()
                .strict(false)
                .addColumns(new Range(1,5))
                .addColumns(new Range(6,9))
                .addColumns(new Range(10, 11))
                .names("name", "year", "age")
                .build();
    }

    @Bean
    public Step step2(){
        return stepBuilderFactory.get("step2")
                .tasklet((contribution, chunkContext) -> {
                    System.out.println("step2 has executeed");
                    return RepeatStatus.FINISHED;
                })
                .build();
    }
}
package com.example.springbatch_8_3_fixedlengthtokenizer;

import lombok.Data;

/**
 * packageName    : com.example.springbatch_8_3_fixedlengthtokenizer
 * fileName       : Customer
 * author         : namhyeop
 * date           : 2022/08/11
 * description    :
 * Customer Dto
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/11        namhyeop       최초 생성
 */
@Data
public class Customer {
    private String name;
    private String year;
    private int age;
}

XML-StaxEventItemReader

1.JAVA XML API

  • DOM 방식
    • 문서 전체를 메모리에 로드한 후 Tree 형태로 만들어서 데이터를 처리하는 방식, pull 방식
    • 엘리멘트 제어는 유연하나 문서크기가 클 경우 메모리 사용이 많고 속도가 느림
  • SAX 방식
    • 문서의 항목을 읽을 때 마다 이벤트가 발생하여 데이터를 처리하는 push 방식
    • 메모리 비용이 적고 속도가 빠른 장점은 있으나 엘리멘트 제어가 어려움
  • StAX 방식 (Streaming API for XML)
    • DOM과SAX의장점과단점을보완한API모델로서push와pull을동시에제공함
    • XML문서를읽고쓸수있는양방향파서기지원
    • XML 파일의 항목에서 항목으로 직접 이동하면서 Stax 파서기를 통해 구문 분석
    • 유형
      • Iterator API 방식
        • XMLEventReader 의 nextEvent() 를 호출해서 이벤트 객체를 가지고 옴
        • 이벤트 객체는 XML 태그 유형 (요소, 텍스트, 주석 등)에 대한 정보를 제공함
      • Cursor API 방식
        • JDBC Resultset 처럼 작동하는 API 로서 XMLStreamReader 는 XML 문서의 다음 요소로 커서를 이동한다
        • 커서에서 직접 메서드를 호출하여 현재 이벤트에 대한 자세한 정보를 얻는다
  • Spring-OXM
    • 스프링의 Object XML Mapping 기술로 XML 바인딩 기술을 추상화함
      • Marshaller
        • marshall – 객체를 XML 로 직렬화하는 행위
      • Unmarchaller
        • unmarshall – XML 을 객체로 역직렬화하는 행위
      • Marshaller 와 Unmarshaller 바인딩 기능을 제공하는 오픈소스로 JaxB2, Castor, XmlBeans, Xstream 등이 있다
  • 스프링 배치는 특정한 XML 바인딩 기술을 강요하지 않고 Spring OXM 에 위임한다
    • 바인딩 기술을 제공하는 구현체를 선택해서 처리하도록 한다.
  • Spring Batch XML
    • 스프링 배치에서는 StAX 방식으로 XML 문서를 처리하는 StaxEventItemReader 를 제공한다
    • XML 을 읽어 자바 객체로 매핑하고 자바 객체를 XML 로 쓸 수 있는 트랜잭션 구조를 지원

2.StAX 아키텍처

3.API 구조


예제

기본개념

    • Stax API 방식으로 데이터를 읽어들이는 ItemReader
    • Spring-OXM 과 Xstream 의존성을 추가해야 한다

<dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-oxm</artifactId>
            <version>5.3.7</version>
        </dependency>
        <dependency>
            <groupId>com.thoughtworks.xstream</groupId>
            <artifactId>xstream</artifactId>
            <version>1.4.16</version>
        </dependency>

2.StaxEventItemReader

  • 데이터를 읽는 Reader인 StaxEventItemReader에 대해서 살펴본다.

예제코드

package com.example.springbatch_8_5_staxeventitemreader;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.xml.StaxEventItemReader;
import org.springframework.batch.item.xml.builder.StaxEventItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.oxm.Unmarshaller;
import org.springframework.oxm.xstream.XStreamMarshaller;

import java.util.HashMap;
import java.util.Map;

/**
 * packageName    : PACKAGE_NAME
 * fileName       : com.example.springbatch_8_5_staxeventitemreader.XMLConfiguration
 * author         : namhyeop
 * date           : 2022/08/11
 * description    :
 * XML데이터를 객체로 직렬화하는 예제
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/11        namhyeop       최초 생성
 */

@RequiredArgsConstructor
@Configuration
public class XMLConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(3)
                .reader(customItemReader())
                .writer(customItemWriter())
                .build();
    }

    @Bean
    public ItemReader<? extends Customer> customItemReader(){
        return new StaxEventItemReaderBuilder<Customer>()
                .name("statXml")
                .resource(new ClassPathResource("customer.xml"))
                //xml에서 데이터를 읽으면서 나눌 속성 명시
                .addFragmentRootElements("customer")
                .unmarshaller(itemUnmarshaller())
                .build();

    }

    @Bean
    public Unmarshaller itemUnmarshaller() {
        Map<String, Class<?>> aliases = new HashMap<>();
        aliases.put("customer", Customer.class);
        aliases.put("id", Long.class);
        aliases.put("name", String.class);
        aliases.put("age", Integer.class);

        XStreamMarshaller xStreamMarshaller = new XStreamMarshaller();
        xStreamMarshaller.setAliases(aliases);
        return xStreamMarshaller;
    }

    @Bean
    public ItemWriter<Customer> customItemWriter(){
        return items->{
            for (Customer item : items) {
                System.out.println(item.toString());
            }
        };
    }
}
package com.example.springbatch_8_5_staxeventitemreader;

import lombok.Data;

/**
 * packageName    : com.example.springbatch_8_5_staxeventitemreader
 * fileName       : Customer
 * author         : namhyeop
 * date           : 2022/08/11
 * description    :
 * Customer Dto
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/11        namhyeop       최초 생성
 */
@Data
public class Customer {
    private final long id;
    private final String name ;
    private final int age;
}

Json - JsonItemReader

기본개념

  • Json 데이터의 Parsing 과 Binding 을 JsonObjectReader 인터페이스 구현체에 위임하여 처리하는 ItemReader
    두 가지 구현체 제공
    • JacksonJsonObjectReader
    • GsonJsonObjectReader

구조

API 설정

예제코드

package com.example.springbatch_8_6_jsonitemreader;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.json.JacksonJsonObjectReader;
import org.springframework.batch.item.json.builder.JsonItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;

/**
 * packageName    : com.example.springbatch_8_6_jsonitemreader
 * fileName       : JsonConfiguration
 * author         : namhyeop
 * date           : 2022/08/11
 * description    :
 * JSON 데이터를 읽어오는 예제
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/11        namhyeop       최초 생성
 */
@RequiredArgsConstructor
@Configuration
public class JsonConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(3)
                .reader(customItemReader())
                .writer(customItemWriter())
                .build();
    }

    @Bean
    public ItemReader<? extends Customer> customItemReader(){
        return new JsonItemReaderBuilder<Customer>()
                .name("jsonReader")
                .resource(new ClassPathResource("customer.json"))
                .jsonObjectReader(new JacksonJsonObjectReader<>(Customer.class))
                .build();
    }

    @Bean
    public ItemWriter<Customer> customItemWriter(){
        return items -> {
            for(Customer item : items){
                System.out.println(item.toString());
            }
        };
    }
}
package com.example.springbatch_8_6_jsonitemreader;

import lombok.Data;

/**
 * packageName    : com.example.springbatch_8_6_jsonitemreader
 * fileName       : Customer
 * author         : namhyeop
 * date           : 2022/08/11
 * description    :
 * Customer Dto
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/11        namhyeop       최초 생성
 */
@Data
public class Customer {
    private Long id;
    private String name;
    private int age;

}

Cursor Based & Paging Based

기본개념

  • 배치 어플리케이션은 실시간적 처리가 어려운 대용량 데이터를 다루며 이 때 DB I/O 의 성능문제와 메모리 자원의 효율성 문제를 해결할 수 있어야 한다.
  • 스프링 배치에서는 대용량 데이터 처리를 위한 두 가지 해결방안을 제시하고 있다

Cursor Based 처리

  • JDBC ResultSet 의 기본 메커니즘을 사용
  • 현재 행에 커서를 유지하며 다음 데이터를 호출하면 다음 행으로 커서를 이동하며 데이터 반환이 이루어지는 Streaming 방식의 I/O 이다
  • ResultSet이 open 될 때마다 next() 메소드가 호출 되어 Database의 데이터가 반환되고 객체와 매핑이 이루어진다.
  • DB Connection 이 연결되면 배치 처리가 완료될 때 까지 데이터를 읽어오기 때문에 DB와 SocketTimeout을 충분히 큰 값으로 설정 필요
  • 모든 결과를 메모리에 할당하기 때문에 메모리 사용량이 많아지는 단점이 있다
  • Connection 연결 유지 시간과 메모리 공간이 충분하다면 대량의 데이터 처리에 적합할 수 있다 (fetchSize 조절)

Paging Based 처리

  • 페이징 단위로 데이터를 조회하는 방식으로 Page Size 만큼 한번에 메모리로 가지고 온 다음 한 개씩 읽는다.
  • 한 페이지를 읽을때마다 Connection을 맺고 끊기 때문에 대량의 데이터를 처리하더라도 SocketTimeout 예외가 거의 일어나지 않는다
  • 시작 행 번호를 지정하고 페이지에 반환시키고자 하는 행의 수를 지정한 후 사용 – Offset, Limit
  • 페이징 단위의 결과만 메모리에 할당하기 때문에 메모리 사용량이 적어지는 장점이 있다
  • Connection 연결 유지 시간이 길지 않고 메모리 공간을 효율적으로 사용해야 하는 데이터 처리에 적합할 수 있다

 

JdbcCursorItemReader

기본개념

  • Cursor 기반의 JDBC 구현체로서 ResultSet 과 함께 사용되며 Datasource 에서 Connection 을 얻어와서 SQL 을 실행한다
  • Thread 안정성을 보장하지 않기 때문에 멀티 스레드 환경에서 사용할 경우 동시성 이슈가 발생하지 않도록 별도 동기화 처리가 필요하다

API 소개

API 설정

예제코드

package com.example.springbatch_8_7_jdbccursoritemreader;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcCursorItemReader;
import org.springframework.batch.item.database.builder.JdbcCursorItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;

/**
 * packageName    : com.example.springbatch_8_7_jdbccursoritemreader
 * fileName       : JdbcCursorConfiguration
 * author         : namhyeop
 * date           : 2022/08/12
 * description    :
 * JdbcCursor 예제
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/12        namhyeop       최초 생성
 */
@RequiredArgsConstructor
@Configuration
public class JdbcCursorConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;
    private int chunkSize = 10;
    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .start(step1())
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(chunkSize)
                .reader(customItemReader())
                .writer(customItemWriter())
                .build();
    }

    @Bean
    public JdbcCursorItemReader<Customer> customItemReader() {
        return new JdbcCursorItemReaderBuilder<Customer>()
                .name("jdbcCursorItemReader")
                .fetchSize(chunkSize)
                .sql("select id, firstName, lastName, birthdate from customer where firstName like ? order by lastName, firstName")
                .beanRowMapper(Customer.class)
                .queryArguments("A%")
                //조회 시작할 데이터의 위치
                .currentItemCount(2)
                //조회를 끝낼 데이터의 위치
                .maxItemCount(3)
                .maxRows(100)
                .dataSource(dataSource)
                .build();
    }

//    @Bean
//    public JdbcCursorItemReader<Customer> customItemReader() {
//        return new JdbcCursorItemReaderBuilder()
//                .name("jdbcCursorItemReader")
//                .fetchSize(10)
//                .sql("select id, firstName, lastName, birthdate from customer where firstName like ? order by lastName, firstName")
//                .beanRowMapper(Customer.class)
//                .queryArguments("A%")
//                .maxItemCount(3)
//                .currentItemCount(2)
//                .maxRows(100)
//                .dataSource(dataSource)
//                .build();
//    }

    private ItemWriter<Customer> customItemWriter() {
        return items->{
            for (Customer item : items) {
                System.out.println(item.toString());
            }
            System.out.println("=============");
        };
    }
}
package com.example.springbatch_8_7_jdbccursoritemreader;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * packageName    : com.example.springbatch_8_7_jdbccursoritemreader
 * fileName       : Customer
 * author         : namhyeop
 * date           : 2022/08/12
 * description    :
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/12        namhyeop       최초 생성
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Customer {

    private Long Id;
    private String firstName;
    private String lastName;
    private String birthdate;

}
INSERT INTO `customer` (`id`, `firstName`, `lastName`, `birthdate`)
VALUES (1, "Reed", "Edwards", "1952-08-16 12:34:53"),
       (2, "Hoyt", "Park", "1981-02-18 08:07:58"),
       (3, "Leila", "Petty", "1972-06-11 08:43:55"),
       (4, "Denton", "Strong", "1989-03-11 18:38:31"),
       (5, "Zoe", "Romero", "1990-10-02 13:06:31"),
       (6, "Rana", "Compton", "1957-06-09 12:51:11"),
       (7, "Velma", "King", "1988-02-02 05:52:25"),
       (8, "Uriah", "Carter", "1972-08-31 07:32:05"),
       (9, "Michael", "Graves", "1958-04-13 18:47:44"),
       (10, "Leigh", "Stone", "1967-06-23 23:41:43");
INSERT INTO `customer` (`id`, `firstName`, `lastName`, `birthdate`)
VALUES (11, "Iliana", "Glenn", "1965-02-27 14:33:56"),
       (12, "Harrison", "Haley", "1956-06-28 03:15:41"),
       (13, "Leonard", "Zamora", "1956-03-28 15:03:09"),
       (14, "Hiroko", "Wyatt", "1960-08-22 23:53:50"),
       (15, "Cameron", "Carlson", "1969-05-12 11:10:09"),
       (16, "Hunter", "Avery", "1953-11-19 12:52:42"),
       (17, "Aimee", "Cox", "1976-10-15 12:56:50"),
       (18, "Yen", "Delgado", "1990-02-06 10:25:36"),
       (19, "Gemma", "Peterson", "1989-04-02 23:42:09"),
       (20, "Lani", "Faulkner", "1970-09-18 17:22:14");
INSERT INTO `customer` (`id`, `firstName`, `lastName`, `birthdate`)
VALUES (21, "Iola", "Cannon", "1954-01-12 16:56:45"),
       (22, "Whitney", "Shaffer", "1951-03-19 01:27:18"),
       (23, "Jerome", "Moran", "1968-03-16 05:26:22"),
       (24, "Quinn", "Wheeler", "1979-06-19 16:24:22"),
       (25, "Mira", "Wilder", "1961-12-27 12:11:07"),
       (26, "Tobias", "Holloway", "1968-08-13 20:36:19"),
       (27, "Shaine", "Schneider", "1958-03-08 09:47:10"),
       (28, "Harding", "Gonzales", "1952-04-11 02:06:25"),
       (29, "Calista", "Nieves", "1970-02-17 13:29:59"),
       (30, "Duncan", "Norman", "1987-09-13 00:54:49");
INSERT INTO `customer` (`id`, `firstName`, `lastName`, `birthdate`)
VALUES (31, "Fatima", "Hamilton", "1961-06-16 14:29:11"),
       (32, "Ali", "Browning", "1979-03-27 17:09:37"),
       (33, "Erin", "Sosa", "1990-08-23 10:43:58"),
       (34, "Carol", "Harmon", "1972-01-14 07:19:39"),
       (35, "Illiana", "Fitzgerald", "1970-08-19 02:33:46"),
       (36, "Stephen", "Riley", "1954-06-05 08:34:03"),
       (37, "Hermione", "Waller", "1969-09-08 01:19:07"),
       (38, "Desiree", "Flowers", "1952-06-25 13:34:45"),
       (39, "Karyn", "Blackburn", "1977-03-30 13:08:02"),
       (40, "Briar", "Carroll", "1985-03-26 01:03:34");
INSERT INTO `customer` (`id`, `firstName`, `lastName`, `birthdate`)
VALUES (41, "Chaney", "Green", "1987-04-20 18:56:53"),
       (42, "Robert", "Higgins", "1985-09-26 11:25:10"),
       (43, "Lillith", "House", "1982-12-06 02:24:23"),
       (44, "Astra", "Winters", "1952-03-13 01:13:07"),
       (45, "Cherokee", "Stephenson", "1955-10-23 16:57:33"),
       (46, "Yuri", "Shaw", "1958-07-14 15:10:07"),
       (47, "Boris", "Sparks", "1982-01-01 10:56:34"),
       (48, "Wilma", "Blake", "1963-06-07 16:32:33"),
       (49, "Brynne", "Morse", "1964-09-21 01:05:25"),
       (50, "Ila", "Conley", "1953-11-02 05:12:57");
INSERT INTO `customer` (`id`, `firstName`, `lastName`, `birthdate`)
VALUES (51, "Sharon", "Watts", "1964-01-09 16:32:37"),
       (52, "Kareem", "Vaughan", "1952-04-18 15:37:10"),
       (53, "Eden", "Barnes", "1954-07-04 01:26:44"),
       (54, "Kenyon", "Fulton", "1975-08-23 22:17:52"),
       (55, "Mona", "Ball", "1972-02-11 04:15:45"),
       (56, "Moses", "Cortez", "1979-04-24 15:26:46"),
       (57, "Macy", "Banks", "1956-12-31 00:41:15"),
       (58, "Brenna", "Mendez", "1972-10-02 07:58:27"),
       (59, "Emerald", "Ewing", "1985-11-28 21:15:20"),
       (60, "Lev", "Mcfarland", "1951-05-20 14:30:07");
INSERT INTO `customer` (`id`, `firstName`, `lastName`, `birthdate`)
VALUES (61, "Norman", "Tanner", "1959-07-29 15:41:45"),
       (62, "Alexa", "Walters", "1977-12-06 16:41:17"),
       (63, "Dara", "Hyde", "1989-08-04 14:06:43"),
       (64, "Hu", "Sampson", "1978-11-01 17:10:23"),
       (65, "Jasmine", "Cardenas", "1969-02-15 20:08:06"),
       (66, "Julian", "Bentley", "1954-07-11 03:27:51"),
       (67, "Samson", "Brown", "1967-10-15 07:03:59"),
       (68, "Gisela", "Hogan", "1985-01-19 03:16:20"),
       (69, "Jeanette", "Cummings", "1986-09-07 18:25:52"),
       (70, "Galena", "Perkins", "1984-01-13 02:15:31");
INSERT INTO `customer` (`id`, `firstName`, `lastName`, `birthdate`)
VALUES (71, "Olga", "Mays", "1981-11-20 22:39:27"),
       (72, "Ferdinand", "Austin", "1956-08-08 09:08:02"),
       (73, "Zenia", "Anthony", "1964-08-21 05:45:16"),
       (74, "Hop", "Hampton", "1982-07-22 14:11:00"),
       (75, "Shaine", "Vang", "1970-08-13 15:58:28"),
       (76, "Ariana", "Cochran", "1959-12-04 01:18:36"),
       (77, "India", "Paul", "1963-10-10 05:24:03"),
       (78, "Karina", "Doyle", "1979-12-01 00:05:21"),
       (79, "Delilah", "Johnston", "1989-03-04 23:50:01"),
       (80, "Hilel", "Hood", "1959-08-22 06:40:48");
INSERT INTO `customer` (`id`, `firstName`, `lastName`, `birthdate`)
VALUES (81, "Kennedy", "Hoffman", "1963-10-14 20:18:35"),
       (82, "Kameko", "Bell", "1976-06-08 15:35:54"),
       (83, "Lunea", "Gutierrez", "1964-06-07 16:21:24"),
       (84, "William", "Burris", "1980-05-01 17:58:23"),
       (85, "Kiara", "Walls", "1955-12-27 18:57:15"),
       (86, "Latifah", "Alexander", "1980-06-19 10:39:50"),
       (87, "Keaton", "Ward", "1964-10-12 16:03:18"),
       (88, "Jasper", "Clements", "1970-03-05 00:29:49"),
       (89, "Claire", "Brown", "1972-02-11 00:43:58"),
       (90, "Noble", "Morgan", "1955-09-05 05:35:01");
INSERT INTO `customer` (`id`, `firstName`, `lastName`, `birthdate`)
VALUES (91, "Evangeline", "Horn", "1952-12-28 14:06:27"),
       (92, "Jonah", "Harrell", "1951-06-25 17:37:35"),
       (93, "Mira", "Espinoza", "1982-03-26 06:01:16"),
       (94, "Brennan", "Oneill", "1979-04-23 08:49:02"),
       (95, "Dacey", "Howe", "1983-02-06 19:11:00"),
       (96, "Yoko", "Pittman", "1982-09-12 02:18:52"),
       (97, "Cody", "Conway", "1971-05-26 07:09:58"),
       (98, "Jordan", "Knowles", "1981-12-30 02:20:01"),
       (99, "Pearl", "Boyer", "1957-10-19 14:26:49"),
       (100, "Keely", "Montoya", "1985-03-24 01:18:09");
drop table customer;
CREATE TABLE customer(
    id mediumint(8) unsigned NOT NULL auto_increment,
    firstName varchar(255) default null,
    lastName varchar(255) default null,
    birthdate varchar(255),
    PRIMARY KEY (id)
) AUTO_INCREMENT=1;

select id, firstName, lastName, birthdate from customer where firstName like ? order by lastName, firstName
select * from customer where firstName like ? order by lastName, firstName
  • maxItemCount가 예제와 약간 다른거 같다. 사용시 유의하자

JpaCursorItemReader

기본개념

  • Spring Batch 4.3 버전부터 지원함
  • Cursor 기반의 JPA 구현체로서 EntityManagerFactory 객체가 필요하며 쿼리는 JPQL 을 사용한다

API 구조

API 설정

예제코드

package com.example.springbatch_8_8_jpacursoritemreader;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaCursorItemReader;
import org.springframework.batch.item.database.builder.JpaCursorItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.persistence.EntityManagerFactory;
import java.util.HashMap;
import java.util.Map;

/**
 * packageName    : com.example.springbatch_8_8_jpacursoritemreader
 * fileName       : JpaCursorConfiguration
 * author         : namhyeop
 * date           : 2022/08/12
 * description    :
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/12        namhyeop       최초 생성
 */

@RequiredArgsConstructor
@Configuration
public class JpaCursorConfiguration {

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(2)
                .reader(customItemReader())
                .writer(customItemWriter())
                .build();
    }

    @Bean
    public ItemReader<? extends Customer> customItemReader() {
        /**
         * queryString에 첨부할 변수 설정.
         * 첫 번째 매개변수는 queryString에 들어갈 변수 이름, 두 번째는 변수 안에 들어갈 내용을 의미
         */
        Map<String, Object> parameters = new HashMap<>();
        parameters.put("firstName", "A%");

        return new JpaCursorItemReaderBuilder<Customer>()
                .name("jpaCursorItemReader")
                .entityManagerFactory(entityManagerFactory)
                .queryString("select c from Customer c where firstName like :firstName")
                .parameterValues(parameters)
//                                .maxItemCount(4)
//                                .currentItemCount(2)
                .build();
    }

    @Bean
    public ItemWriter<Customer> customItemWriter() {
        return items -> {
            for (Customer item : items) {
                System.out.println(item.toString());
            }
            System.out.println("=================");
        };
    }
}
package com.example.springbatch_8_8_jpacursoritemreader;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;

/**
 * packageName    : com.example.springbatch_8_8_jpacursoritemreader
 * fileName       : Customer
 * author         : namhyeop
 * date           : 2022/08/12
 * description    :
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/12        namhyeop       최초 생성
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Entity
public class Customer {
    @Id @GeneratedValue
    private Long id;
    private String firstname;
    private String lastname;
    private String birthdate;
}

JdbcPagingItemReader

기본개념

  • Paging 기반의 JDBC 구현체로서 쿼리에 시작 행 번호 (offset) 와 페이지에서 반환 할 행 수 (limit)를 지정해서 SQL 을 실행한다
  • 스프링 배치에서 offset과 limit을 PageSize에 맞게 자동으로 생성해 주며 페이징 단위로 데이터를 조회할 때 마다 새로운 쿼리가 실행한다
  • 페이지마다 새로운 쿼리를 실행하기 때문에 페이징 시 결과 데이터의 순서가 보장될 수 있도록 order by 구문이 작성되도록 한다
  • 멀티 스레드 환경에서 Thread 안정성을 보장하기 때문에 별도의 동기화를 할 필요가 없다
  • PagingQueryProvider
    • 쿼리 실행에 필요한 쿼리문을 ItemReader 에게 제공하는 클래스
    • 데이터베이스마다 페이징 전략이 다르기 때문에 각 데이터 베이스 유형마다 다른 PagingQueryProvider 를 사용한다
    • Select 절, from 절, sortKey 는 필수로 설정해야 하며 where, group by 절은 필수가 아니다

API 소개

예제코드

package com.example.springbatch_8_9_jdbcpagingitemreader;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JdbcPagingItemReader;
import org.springframework.batch.item.database.Order;
import org.springframework.batch.item.database.PagingQueryProvider;
import org.springframework.batch.item.database.builder.JdbcPagingItemReaderBuilder;
import org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.BeanPropertyRowMapper;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

/**
 * packageName    : com.example.springbatch_8_9_jdbcpagingitemreader
 * fileName       : JdbcPagingConfiguration
 * author         : namhyeop
 * date           : 2022/08/12
 * description    :
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/12        namhyeop       최초 생성
 */
@RequiredArgsConstructor
@Configuration
public class JdbcPagingConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final DataSource dataSource;

    @Bean
    public Job job() throws Exception{
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() throws Exception{
        return stepBuilderFactory.get("step1")
                .<Customer,Customer>chunk(2)
                .reader(customItemReader())
                .writer(customItemWriter())
                .build();
    }

    @Bean
    public JdbcPagingItemReader<Customer> customItemReader() throws Exception{
        HashMap<String, Object> parameters = new HashMap<>();
        parameters.put("firstName", "A%");

        return new JdbcPagingItemReaderBuilder<Customer>()
                .name("jdbcPagingItemReader")
                //chunksize와 pagesize는 같아야 좋다
                .pageSize(2)
                //fetchSize는 뭐지?
//                .fetchSize(10)
                .dataSource(dataSource)
                .rowMapper(new BeanPropertyRowMapper<>(Customer.class))
                .queryProvider(createQueryProvider())
                .parameterValues(parameters)
                .build();
    }

    @Bean
    public PagingQueryProvider createQueryProvider() throws Exception {
        SqlPagingQueryProviderFactoryBean queryProvider = new SqlPagingQueryProviderFactoryBean();
        queryProvider.setDataSource(dataSource);
        queryProvider.setSelectClause("id,firstName,lastName,birthDate");
        queryProvider.setFromClause("from customer");
        queryProvider.setWhereClause("where firstName like :firstName");

        Map<String, Order> sortKeys = new HashMap<>();
        sortKeys.put("id", Order.ASCENDING);
        queryProvider.setSortKeys(sortKeys);
        return queryProvider.getObject();
    }

    @Bean
    public ItemWriter<Customer> customItemWriter(){
        return items ->{
            for (Customer item : items) {
                System.out.println(item.toString());
            }
            System.out.println("======");
        };
    }
}
package com.example.springbatch_8_9_jdbcpagingitemreader;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * packageName    : com.example.springbatch_8_9_jdbcpagingitemreader
 * fileName       : Customer
 * author         : namhyeop
 * date           : 2022/08/12
 * description    :
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/12        namhyeop       최초 생성
 */
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Customer {
    private Long id;
    private String firstName;
    private String lastName;
    private String birthdate;
}

JpaPagingItemReader

기본개념

  • Paging 기반의 JPA 구현체로서 EntityManagerFactory 객체가 필요하며 쿼리는 JPQL 을 사용한다

API 구조

API 설정

예제코드

package com.example.springbatch_8_10_jpapagingitemreader;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.JpaPagingItemReader;
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.persistence.EntityManagerFactory;

/**
 * packageName    : com.example.springbatch_8_10_jpapagingitemreader
 * fileName       : JpaPagingConfiguration
 * author         : namhyeop
 * date           : 2022/08/12
 * description    :Jpa Paging 예제
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/12        namhyeop       최초 생성
 */
@RequiredArgsConstructor
@Configuration
public class JpaPagingConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final EntityManagerFactory entityManagerFactory;

    @Bean
    public Job job(){
        return jobBuilderFactory.get("batchJob")
                .start(step1())
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    public Step step1(){
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
                .reader(customItemReader())
                .writer(customItemWriter())
                .build();
    }

    @Bean
    public JpaPagingItemReader<Customer> customItemReader(){
        return new JpaPagingItemReaderBuilder<Customer>()
                .name("jpaPagingItemReader")
                .entityManagerFactory(entityManagerFactory)
                .pageSize(10)
                .queryString("select c from Customer c join fetch c.address")
                .build();
    }

    @Bean
    public ItemWriter<Customer> customItemWriter(){
        return items ->{
            for (Customer customer : items) {
                System.out.println(customer.getAddress().getLocation());
            }
        };
    }
}
package com.example.springbatch_8_10_jpapagingitemreader;

import lombok.Data;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import javax.persistence.*;

/**
 * packageName    : com.example.springbatch_8_10_jpapagingitemreader
 * fileName       : Address
 * author         : namhyeop
 * date           : 2022/08/12
 * description    :
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/12        namhyeop       최초 생성
 */
@Getter
@Setter
@Entity
public class Address {
    @Id @GeneratedValue
    private Long id;
    private String location;

    @OneToOne
    @JoinColumn(name = "customer_id")
    private Customer customer;
}
package com.example.springbatch_8_10_jpapagingitemreader;

import lombok.Getter;
import lombok.Setter;

import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.Id;
import javax.persistence.OneToOne;

/**
 * packageName    : com.example.springbatch_8_10_jpapagingitemreader
 * fileName       : Customer
 * author         : namhyeop
 * date           : 2022/08/12
 * description    :
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/12        namhyeop       최초 생성
 */
@Getter
@Setter
@Entity
public class Customer {
    @Id @GeneratedValue
    private Long id;
    private String username;
    private Long age;

    @OneToOne(mappedBy = "customer")
    private Address address;
}

ItemReaderAdapter

기본 개념

  • 배치 Job 안에서 이미 있는 DAO 나 다른 서비스를 ItemReader 안에서 사용하고자 할 때 위임 역할을 한다

  • 그림의 예시가 무슨 의미냐면 ItemReader를 통해서 다른 서비스를 호출하는 예제이다.
  • reader를 통해 TargerObject를 CustomerService를 등록하고 실행할 method를 JoinMember를 지정한것을 알 수있다.
  • 우측은 실제 Batch의 메소드가 설정되는 핵심 메서드 부분을 캡쳐해온것이다.

예제코드

package com.example.springbatch_8_11_itemreaderadapter;

import lombok.RequiredArgsConstructor;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.adapter.ItemReaderAdapter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * packageName    : com.example.springbatch_8_10_itemreaderadapter
 * fileName       : ItemReaderAdapterConfiguration
 * author         : namhyeop
 * date           : 2022/08/13
 * description    :
 * batch 작업중 별도로 등록한 Service를 사용하고 싶을때 사용하는 ItemReader 예제
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/13        namhyeop       최초 생성
 */
@RequiredArgsConstructor
@Configuration
public class ItemReaderAdapterConfiguration {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job job() {
        return jobBuilderFactory.get("batchJob")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<String, String>chunk(10)
                .reader(customItemReader())
                .writer(customItemWriter())
                .build();
    }

    @Bean
    public ItemReaderAdapter customItemReader() {
        ItemReaderAdapter reader = new ItemReaderAdapter();
        reader.setTargetObject(customService());
        reader.setTargetMethod("joinCustomer");
        return reader;
    }

    @Bean
    public ItemWriter<String> customItemWriter(){
        return items ->{
            System.out.println(items);
        };
    }

    private CustomService<String> customService(){
        return new CustomService<>();
    }
}
package com.example.springbatch_8_11_itemreaderadapter;

/**
 * packageName    : com.example.springbatch_8_10_itemreaderadapter
 * fileName       : CustomService
 * author         : namhyeop
 * date           : 2022/08/13
 * description    :
 * 예제에서 종료 메소드를 설정 안해줬기 때문에 무한으로 cnt가 증가한다.
 * ===========================================================
 * DATE              AUTHOR             NOTE
 * -----------------------------------------------------------
 * 2022/08/13        namhyeop       최초 생성
 */
public class CustomService<T> {
    private int cnt = 0;
    public T joinCustomer(){
        return(T)("item" + cnt++);
    }
}

REFERENCE.

https://www.inflearn.com/course/%EC%8A%A4%ED%94%84%EB%A7%81-%EB%B0%B0%EC%B9%98

반응형

'Spring Batch' 카테고리의 다른 글

10.Spring Batch의 Chunk와 ItemProcessor  (0) 2024.11.08
9.Spring Batch의 Chunk와 ItemWriter  (1) 2024.11.08
7.Spring Batch의 Chunk와 동작원리 살펴보기  (0) 2024.11.08
6.Spring Batch의 Flow  (1) 2024.11.07
5.Spring Batch의 Step  (0) 2024.11.07