스프링 배치 강좌 목록: https://www.fwantastic.com/p/spring-batch.html
이전 글 다시 보기: [Spring Batch] 스프링 배치 강좌 3. JobExecutionContext를 활용하여 스텝간 데이터 공유하기
csv에는 Person ID와 사람의 full name (이름 + 스페이스 + 성) 이 저장되어 있다. 첫째줄엔 컬럼 헤더임을 잊지 말자.
input.csv
src/test/resources/com/fwantastic/example3에 create-table.sql 파일을 생성하자.
create-table.sql
Person.java
PersonProcessor.java
각 컴포넌트의 사용 이유와 자세한 설명은 주석을 참고하고 개념을 보자.
FlatFileItemReader: 파일을 읽을 수 있다. csv에 한정 된게 아니라 lineMapper 적용 방법에 따라 아무 파일이나 사용 가능하다.
DefaultLineMapper: 일반적인 단순한 라인을 읽는 경우에 사용한다. 라인을 읽어서 자바 객체에 매핑을 담당한다.
JdbcBatchItemWriter: JDBC를 사용해 디비 작업을 수행하는 ItemWriter이다.
job3.xml
MyJobTest.java
CSV 파일을 읽어서 가공 후 JDBC를 사용해 디비에 저장하는 방법을 알아보았다. 다음은 반대로 디비를 읽어서 CSV 파일로 저장하는 방법을 알아보자.
이전 글 다시 보기: [Spring Batch] 스프링 배치 강좌 3. JobExecutionContext를 활용하여 스텝간 데이터 공유하기
목적
여러 개념을 익혔으니 실전처럼 read -> process -> write 프로세스를 만들어 보자.Input
src/test/resources에 com/fwantastic/example3 폴더를 생성 후 input.csv를 만든다.csv에는 Person ID와 사람의 full name (이름 + 스페이스 + 성) 이 저장되어 있다. 첫째줄엔 컬럼 헤더임을 잊지 말자.
input.csv
PERSON_ID,FULL_NAME
1,Suji Bae
2,Jieun Lee
Output
csv 데이터를 저장 할 디비 테이블을 만들자. Person 테이블에 Person ID와 사람의 이름과 성을 따로 저장해보자.
create-table.sql
CREATE TABLE PERSON (
PERSON_ID INT
, FIRST_NAME VARCHAR2(255)
, LAST_NAME VARCHAR2(255)
)
;
Person.java
csv 데이터를 매핑 할 자바 클래스를 만든다. csv에 있는 ID와 fullName, Person 테이블에 저장 할 firstName과 lastName 필드를 추가한다. 원한다면 read 데이터 모델과 write 데이터 모델을 달리 해도 되는데 편하게 하나로 사용하자.Person.java
package com.fwantastic.example3;
public class Person {
private Long personId;
private String fullName;
private String firstName;
private String lastName;
public Long getPersonId() {
return personId;
}
public void setPersonId(Long personId) {
this.personId = personId;
}
public String getFullName() {
return fullName;
}
public void setFullName(String fullName) {
this.fullName = fullName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
public String getLastName() {
return lastName;
}
public void setLastName(String lastName) {
this.lastName = lastName;
}
@Override
public String toString() {
return "Person [personId=" + personId + ", fullName=" + fullName + ", firstName=" + firstName + ", lastName="
+ lastName + "]";
}
}
PersonProcessor.java
csv에는 풀 네임이 있고 디비에는 이름과 성을 따로 저장하기 위해선 데이터 가공이 필요하다. 프로세서를 만들어보자. com.fwantastic.example3 패키지에 아래 클래스를 추가하자.PersonProcessor.java
package com.fwantastic.example3;
import org.springframework.batch.item.ItemProcessor;
/**
* Person의 풀 네임을 이름과 성으로 쪼갬.
* 예로 풀 네임이 Suji Bae라면 firstName = Suji, lastName = Bae가 된다.
*/
public class PersonProcessor implements ItemProcessor<Person, Person> {
private static final String SPACE = " ";
public Person process(Person person) throws Exception {
String[] names = person.getFullName().split(SPACE);
person.setFirstName(names[0]);
person.setLastName(names[1]);
return person;
}
}
job3.xml
src/main/resources/com/fwantastic/example3에 아래의 job3.xml을 만든다.각 컴포넌트의 사용 이유와 자세한 설명은 주석을 참고하고 개념을 보자.
FlatFileItemReader: 파일을 읽을 수 있다. csv에 한정 된게 아니라 lineMapper 적용 방법에 따라 아무 파일이나 사용 가능하다.
DefaultLineMapper: 일반적인 단순한 라인을 읽는 경우에 사용한다. 라인을 읽어서 자바 객체에 매핑을 담당한다.
JdbcBatchItemWriter: JDBC를 사용해 디비 작업을 수행하는 ItemWriter이다.
job3.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-4.3.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.3.xsd">
<import resource="classpath:/common-context.xml" />
<!-- job을 시작하기 전에 Person 테이블 생성. -->
<jdbc:initialize-database
data-source="dataSource">
<jdbc:script
location="classpath:com/fwantastic/example3/create-table.sql" />
</jdbc:initialize-database>
<job id="myJob3"
xmlns="http://www.springframework.org/schema/batch">
<step id="myStep1">
<tasklet>
<chunk reader="csvReader"
processor="personProcessor" writer="personWriter"
commit-interval="10" />
</tasklet>
</step>
</job>
<bean id="csvReader"
class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource"
value="classpath:com/fwantastic/example3/input.csv" />
<property name="lineMapper">
<bean
class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<!-- 라인을 어떻게 잘라야 하는지 설정해준다. -->
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<!-- 기본 구분자인 comma로 필드 값을 구분한다. tokenize 메소드가
FieldSet 객체를 반환 하는데 FieldSet이 names와 구분한 값들을 가지고 있다. -->
<property name="names"
value="PERSON_ID,FULL_NAME" />
</bean>
</property>
<property name="fieldSetMapper">
<!-- lineTokenizer로부터 넘겨받은 FieldSet 객체를 person 객체로 변환하여 반환한다.
매핑 할 때 FieldSet의 names 값을 camelCase로 변환 후 같은 이름의 setter로 매칭된다.
예로 PERSON_IN -> personId, FULL_NAME -> fullName -->
<bean id="fieldSetMapper"
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"
scope="step">
<property name="prototypeBeanName"
value="person"></property>
</bean>
</property>
</bean>
</property>
<!-- 첫 번째 라인은 컬럼 헤더이기 때문에 두 번째 라인부터 읽기를 시작한다. -->
<property name="linesToSkip" value="1" />
</bean>
<!-- fieldSetMapper가 새로 반환 할 Person 객체. scope이 프로토파입이기 때문에 매번 새로운 객체를 반환한다. -->
<bean id="person" class="com.fwantastic.example3.Person"
scope="prototype" />
<!-- Person full name 데이터 가공 -->
<bean id="personProcessor"
class="com.fwantastic.example3.PersonProcessor" scope="step" />
<!-- JDBC writer. Insert나 Update 쿼리를 실행 할 수 있다. -->
<bean id="personWriter"
class="org.springframework.batch.item.database.JdbcBatchItemWriter"
scope="step">
<property name="dataSource" ref="dataSource" />
<property name="sql">
<value>
<![CDATA[
INSERT INTO PERSON
(PERSON_ID, FIRST_NAME, LAST_NAME) VALUES
(:personId, :firstName, :lastName)
]]>
</value>
</property>
<property name="itemSqlParameterSourceProvider">
<bean
class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider" />
</property>
</bean>
</beans>
테스트
src/test/java에 com.fwantastic.example3 패키지 만들고 아래 junit 클래스를 추가하자.MyJobTest.java
package com.fwantastic.example3;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.test.JobLauncherTestUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.BeanPropertyRowMapper;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = { "/com/fwantastic/example3/job3.xml" })
public class MyJobTest {
@Autowired
private JobLauncherTestUtils jobLauncherTestUtils;
@Autowired
private JdbcTemplate jdbcTemplate;
private static AtomicBoolean isLaunched = new AtomicBoolean(false);
private JobExecution jobExecution;
@Before
public void setUp() throws Exception {
if (!isLaunched.getAndSet(true)) {
jobExecution = jobLauncherTestUtils.launchJob();
}
}
@Test
public void testExitCode() {
Assert.assertEquals(ExitStatus.COMPLETED.getExitCode(), jobExecution.getExitStatus().getExitCode());
}
@Test
public void testPersonsCreated() {
// jdbcTemplate을 사용 해 Person 테이블의 모든 row를 가져온다.
List<Person> createdPersons = jdbcTemplate.query("SELECT * FROM PERSON",
new BeanPropertyRowMapper<Person>(Person.class));
System.out.println(createdPersons);
// input.csv 라인 갯수와 Person 테이블 row 갯수가 일치하는지 확인한다.
Assert.assertEquals(2, createdPersons.size());
}
}
실행 결과
INFO: Job: [FlowJob: [name=myJob3]] launched with the following parameters: [{random=702926}]
Dec 19, 2019 12:09:35 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [myStep1]
Dec 19, 2019 12:09:35 PM org.springframework.batch.core.step.AbstractStep execute
INFO: Step: [myStep1] executed in 53ms
Dec 19, 2019 12:09:35 PM org.springframework.batch.core.launch.support.SimpleJobLauncher$1 run
INFO: Job: [FlowJob: [name=myJob3]] completed with the following parameters: [{random=702926}] and the following status: [COMPLETED] in 85ms
[Person [personId=1, fullName=null, firstName=Suji, lastName=Bae], Person [personId=2, fullName=null, firstName=Jieun, lastName=Lee]]
CSV 파일을 읽어서 가공 후 JDBC를 사용해 디비에 저장하는 방법을 알아보았다. 다음은 반대로 디비를 읽어서 CSV 파일로 저장하는 방법을 알아보자.