[Spring Batch] 스프링 배치 강좌 4. CSV 파일 읽어서 JDBC로 디비에 저장하기

스프링 배치 강좌 목록: https://www.fwantastic.com/p/spring-batch.html


이전 글 다시 보기: [Spring Batch] 스프링 배치 강좌 3. JobExecutionContext를 활용하여 스텝간 데이터 공유하기



목적

여러 개념을 익혔으니 실전처럼  read -> process -> write  프로세스를 만들어 보자.




Input

src/test/resourcescom/fwantastic/example3 폴더를 생성 후 input.csv를 만든다.

csv에는 Person ID와 사람의 full name (이름 + 스페이스 + 성) 이 저장되어 있다. 첫째줄엔 컬럼 헤더임을 잊지 말자.

input.csv
PERSON_ID,FULL_NAME
1,Suji Bae
2,Jieun Lee




Output

csv 데이터를 저장 할 디비 테이블을 만들자. Person 테이블에 Person ID와 사람의 이름과 성을 따로 저장해보자.

src/test/resources/com/fwantastic/example3create-table.sql 파일을 생성하자.

create-table.sql
CREATE TABLE PERSON (
    PERSON_ID       INT
    , FIRST_NAME    VARCHAR2(255)
    , LAST_NAME     VARCHAR2(255)
)
;






Person.java

csv 데이터를 매핑 자바 클래스를 만든다. csv에 있는 ID와 fullName, Person 테이블에 저장 할 firstNamelastName 필드를 추가한다. 원한다면 read 데이터 모델과 write 데이터 모델을 달리 해도 되는데 편하게 하나로 사용하자.

Person.java
package com.fwantastic.example3;

public class Person {
    private Long personId;
    private String fullName;
    private String firstName;
    private String lastName;

    public Long getPersonId() {
        return personId;
    }

    public void setPersonId(Long personId) {
        this.personId = personId;
    }

    public String getFullName() {
        return fullName;
    }

    public void setFullName(String fullName) {
        this.fullName = fullName;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    @Override
    public String toString() {
        return "Person [personId=" + personId + ", fullName=" + fullName + ", firstName=" + firstName + ", lastName="
                + lastName + "]";
    }
}




PersonProcessor.java

csv에는 풀 네임이 있고 디비에는 이름과 성을 따로 저장하기 위해선 데이터 가공이 필요하다. 프로세서를 만들어보자. com.fwantastic.example3 패키지에 아래 클래스를 추가하자.

PersonProcessor.java
package com.fwantastic.example3;

import org.springframework.batch.item.ItemProcessor;

/**
 * Person의 풀 네임을 이름과 성으로 쪼갬. 
 * 예로 풀 네임이 Suji Bae라면 firstName = Suji, lastName = Bae가 된다.
 */
public class PersonProcessor implements ItemProcessor<Person, Person> {

    private static final String SPACE = " ";

    public Person process(Person person) throws Exception {
        String[] names = person.getFullName().split(SPACE);
        person.setFirstName(names[0]);
        person.setLastName(names[1]);
        return person;
    }

}




job3.xml

src/main/resources/com/fwantastic/example3에 아래의 job3.xml을 만든다.

각 컴포넌트의 사용 이유와 자세한 설명은 주석을 참고하고 개념을 보자.

FlatFileItemReader: 파일을 읽을 수 있다. csv에 한정 된게 아니라 lineMapper 적용 방법에 따라 아무 파일이나 사용 가능하다.

DefaultLineMapper: 일반적인 단순한 라인을 읽는 경우에 사용한다. 라인을 읽어서 자바 객체에 매핑을 담당한다.

JdbcBatchItemWriter: JDBC를 사용해 디비 작업을 수행하는 ItemWriter이다.


job3.xml
<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:batch="http://www.springframework.org/schema/batch"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:jdbc="http://www.springframework.org/schema/jdbc"
    xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
  http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.3.xsd
  http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
  http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
  http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.3.xsd">

    <import resource="classpath:/common-context.xml" />

    <!-- job을 시작하기 전에 Person 테이블 생성. -->
    <jdbc:initialize-database
        data-source="dataSource">
        <jdbc:script
            location="classpath:com/fwantastic/example3/create-table.sql" />
    </jdbc:initialize-database>

    <job id="myJob3"
        xmlns="http://www.springframework.org/schema/batch">

        <step id="myStep1">
            <tasklet>
                <chunk reader="csvReader"
                    processor="personProcessor" writer="personWriter"
                    commit-interval="10" />
            </tasklet>
        </step>
    </job>

    <bean id="csvReader"
        class="org.springframework.batch.item.file.FlatFileItemReader">
        <property name="resource"
            value="classpath:com/fwantastic/example3/input.csv" />

        <property name="lineMapper">
            <bean
                class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <!-- 라인을 어떻게 잘라야 하는지 설정해준다. -->
                    <bean
                        class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
                        <!-- 기본 구분자인 comma로 필드 값을 구분한다. tokenize 메소드가 
                        FieldSet 객체를 반환 하는데 FieldSet이 names와 구분한 값들을 가지고 있다. -->
                        <property name="names"
                            value="PERSON_ID,FULL_NAME" />
                    </bean>
                </property>

                <property name="fieldSetMapper">
                    <!-- lineTokenizer로부터 넘겨받은 FieldSet 객체를 person 객체로 변환하여 반환한다.
                                        매핑 할 때 FieldSet의 names 값을 camelCase로 변환 후 같은 이름의 setter로 매칭된다. 
                                        예로 PERSON_IN -> personId, FULL_NAME -> fullName -->
                    <bean id="fieldSetMapper"
                        class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"
                        scope="step">

                        <property name="prototypeBeanName"
                            value="person"></property>
                    </bean>
                </property>
            </bean>
        </property>

        <!-- 첫 번째 라인은 컬럼 헤더이기 때문에 두 번째 라인부터 읽기를 시작한다. -->
        <property name="linesToSkip" value="1" />
    </bean>

    <!-- fieldSetMapper가 새로 반환 할 Person 객체. scope이 프로토파입이기 때문에 매번 새로운 객체를 반환한다. -->
    <bean id="person" class="com.fwantastic.example3.Person"
        scope="prototype" />

    <!-- Person full name 데이터 가공 -->
    <bean id="personProcessor"
        class="com.fwantastic.example3.PersonProcessor" scope="step" />

    <!-- JDBC writer. Insert나 Update 쿼리를 실행 할 수 있다. -->
    <bean id="personWriter"
        class="org.springframework.batch.item.database.JdbcBatchItemWriter"
        scope="step">
        <property name="dataSource" ref="dataSource" />

        <property name="sql">
            <value>
                <![CDATA[
                INSERT INTO PERSON
                (PERSON_ID, FIRST_NAME, LAST_NAME) VALUES
                (:personId, :firstName, :lastName)
                ]]>
            </value>
        </property>

        <property name="itemSqlParameterSourceProvider">
            <bean
                class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
        </property>
    </bean>

</beans>





테스트

src/test/java에 com.fwantastic.example3 패키지 만들고 아래 junit 클래스를 추가하자.

MyJobTest.java
package com.fwantastic.example3;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "/com/fwantastic/example3/job3.xml" })
public class MyJobTest {

    @Autowired
    private JobLauncherTestUtils jobLauncherTestUtils;

    @Autowired
    private JdbcTemplate jdbcTemplate;

    private static AtomicBoolean isLaunched = new AtomicBoolean(false);

    private JobExecution jobExecution;

    @Before
    public void setUp() throws Exception {
        if (!isLaunched.getAndSet(true)) {
            jobExecution = jobLauncherTestUtils.launchJob();
        }
    }

    @Test
    public void testExitCode() {
        Assert.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode());
    }

    @Test
    public void testPersonsCreated() {
        // jdbcTemplate을 사용 해 Person 테이블의 모든 row를 가져온다.
        List<Person> createdPersons = jdbcTemplate.query("SELECT * FROM PERSON",
                new BeanPropertyRowMapper<Person>(Person.class));

        System.out.println(createdPersons);

        // input.csv 라인 갯수와 Person 테이블 row 갯수가 일치하는지 확인한다.
        Assert.assertEquals(2, createdPersons.size());
    }
}





실행 결과

INFO: Job: [FlowJob: [name=myJob3]] launched with the following parameters: [{random=702926}]
Dec 19, 2019 12:09:35 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [myStep1]
Dec 19, 2019 12:09:35 PM org.springframework.batch.core.step.AbstractStep execute
INFO: Step: [myStep1] executed in 53ms
Dec 19, 2019 12:09:35 PM org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [FlowJob: [name=myJob3]] completed with the following parameters: [{random=702926}] and the following status: [COMPLETED] in 85ms
[Person [personId=1, fullName=null, firstName=Suji, lastName=Bae], Person [personId=2, fullName=null, firstName=Jieun, lastName=Lee]]



CSV 파일을 읽어서 가공 후 JDBC를 사용해 디비에 저장하는 방법을 알아보았다. 다음은 반대로 디비를 읽어서 CSV 파일로 저장하는 방법을 알아보자.



다음 글

[Spring Batch] 스프링 배치 강좌 5. JDBC 로 디비 읽어서 CSV 파일에 쓰기