Spring Batch- MultiResourceItemReader & HibernateItemWriter example

In this post we will learn about how to use Spring Batch to read multiple flat files using MultiResourceItemReader and write to database with Hibernate (using HibernateItemWriter). We will also witness the usage of JobExecutionListener and itemProcessor. Let’s get going.


Following technologies being used:

  • Spring Batch 3.0.1.RELEASE
  • Spring core 4.0.6.RELEASE
  • Hibernate 4.3.6.Final
  • MySQL Server 5.6
  • Joda Time 2.3
  • JDK 1.6
  • Eclipse JUNO Service Release 2

Let’s begin.

Step 1: Create project directory structure

Following will be the final project structure:

SpringBatchMultiReaderHibernateWriter_img1

What we will do here is read multiple files from src/main/resources/csv/*.txt and write in MySQL database using Hibernate.

Step 2: Create Database Table

Create a fairly simple table in MySQL database which maps to our domain model.

create table EXAM_RESULT (
   id INT NOT NULL auto_increment PRIMARY KEY,   
   student_name VARCHAR(30) NOT NULL,
   dob DATE NOT NULL,
   percentage  double NOT NULL
);

Please visit MySQL installation on Local PC in case you are finding difficulties in setting up MySQL locally.

Step 2: Prepare Input files

Below is the content of flat files /src/main/resources/csv/*.txt which will be inserted into database using hibernate.

ExamResult-Year2001.txt

Brian Burlet   |   01/02/1985  |   76
Jimmy Snuka    |   01/02/1983  |   39
Renard konig   |   01/02/1970  |   61
Kevin Richard  |   01/02/2002  |   59

ExamResult-Year2002.txt

Sam Disilva    |   01/05/1992  |   76
Bob corbet     |   10/07/1990  |   29
Rick Ricky     |   01/02/1973  |   54

ExamResult-Year2003.txt

Igor Watson    |   01/02/1986  |   34
Peet Sampras   |   01/02/1978  |   97
Rita Paul      |   01/02/1993  |   92
Han Yenn       |   01/02/1965  |   83

Now let’s add all contents mentioned in project structure in step 1.

Step 4: Update pom.xml to include required dependencies

Following is the updated minimalistic pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.websystique.springbatch</groupId>
  <artifactId>SpringBatchMultiReaderHibernateWriter</artifactId>
  <version>1.0.0</version>
  <packaging>jar</packaging>

  <name>SpringBatchMultiReaderHibernateWriter</name>

  	<properties>
		<springframework.version>4.0.6.RELEASE</springframework.version>
		<springbatch.version>3.0.1.RELEASE</springbatch.version>
		<hibernate.version>4.3.6.Final</hibernate.version>
		<javassist.version>3.18.1-GA</javassist.version>
		<mysql.connector.version>5.1.31</mysql.connector.version>
		<joda-time.version>2.3</joda-time.version>
		<c3p0.version>0.9.5-pre8</c3p0.version>
	</properties>

	<dependencies>
	
		<!-- Spring Core -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${springframework.version}</version>
		</dependency>

		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-tx</artifactId>
			<version>${springframework.version}</version>
		</dependency>

		<!-- Spring ORM support -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-orm</artifactId>
			<version>${springframework.version}</version>
		</dependency>
	
		<!-- Spring Batch -->
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-core</artifactId>
			<version>${springbatch.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-infrastructure</artifactId>
			<version>${springbatch.version}</version>
		</dependency>
	
		<!-- Hibernate related dependencies -->
		<dependency>
			<groupId>org.hibernate</groupId>
			<artifactId>hibernate-core</artifactId>
			<version>${hibernate.version}</version>
		</dependency>
		<dependency>
			<groupId>org.javassist</groupId>
			<artifactId>javassist</artifactId>
			<version>${javassist.version}</version>
		</dependency>
      			
		<!-- Joda-Time -->		
		<dependency>
  			<groupId>joda-time</groupId>
  			<artifactId>joda-time</artifactId>
  			<version>${joda-time.version}</version>
  		</dependency>
		
		<!-- To map JodaTime with database type -->  		
  		<dependency>
  			<groupId>org.jadira.usertype</groupId>
  			<artifactId>usertype.core</artifactId>
  			<version>3.0.0.CR1</version>
		</dependency>

		<!-- MySQL -->
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>${mysql.connector.version}</version>
		</dependency>
		
		<!-- ComboPooledDataSource -->
		<dependency>
			<groupId>com.mchange</groupId>
			<artifactId>c3p0</artifactId>
			<version>${c3p0.version}</version>
		</dependency>		
	</dependencies>
	<build>
		<pluginManagement>
			<plugins>
				<plugin>
					<groupId>org.apache.maven.plugins</groupId>
					<artifactId>maven-compiler-plugin</artifactId>
					<version>3.2</version>
					<configuration>
						<source>1.6</source>
						<target>1.6</target>
					</configuration>
				</plugin>
			</plugins>
		</pluginManagement>
	</build>

</project>

Since we are using Hibernate this time, we’ve included hibernate-core dependency. We will be using a pooled data source ComboPooledDataSource. We also need mysql-connector-java to connect to MySQL database, we will also need usertype.core to handle the conversion between Jodatime and MySQL date. Spring dependency are pretty obvious as in earlier tutorials.

Step 5: Create Entity Class

Below is the Entity class for our example with Standard JPA annotations.

com.websystique.springbatch.model.ExamResult

package com.websystique.springbatch.model;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;
import javax.persistence.Table;

import org.hibernate.annotations.Type;
import org.joda.time.LocalDate;


@Entity
@Table(name = "EXAM_RESULT")
public class ExamResult {

	@Id
	@GeneratedValue(strategy = GenerationType.IDENTITY)
	private long id;
	
	@Column(name = "STUDENT_NAME", nullable = false)
	private String studentName;
	
	@Column(name = "DOB", nullable = false)
	@Type(type="org.jadira.usertype.dateandtime.joda.PersistentLocalDate")
	private LocalDate dob;
	
	@Column(name = "PERCENTAGE", nullable = false)
	private double percentage;

	public long getId() {
		return id;
	}

	public void setId(long id) {
		this.id = id;
	}

	public String getStudentName() {
		return studentName;
	}

	public void setStudentName(String studentName) {
		this.studentName = studentName;
	}

	public LocalDate getDob() {
		return dob;
	}

	public void setDob(LocalDate dob) {
		this.dob = dob;
	}

	public double getPercentage() {
		return percentage;
	}

	public void setPercentage(double percentage) {
		this.percentage = percentage;
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + (int) (id ^ (id >>> 32));
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (!(obj instanceof ExamResult))
			return false;
		ExamResult other = (ExamResult) obj;
		if (id != other.id)
			return false;
		return true;
	}

	@Override
	public String toString() {
		return "ExamResult [id=" + id + ", studentName=" + studentName
				+ ", dob=" + dob + ", percentage=" + percentage + "]";
	}
	
}

Only special thing here is declaration of @Type which will help Hibernate easily map between jodatime LocalDate and database specific Date.

Step 6: Create Mapper to map the File fields to Entity Class

com.websystique.springbatch.ExamResultFieldSetMapper

package com.websystique.springbatch;

import org.joda.time.LocalDate;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.validation.BindException;

import com.websystique.springbatch.model.ExamResult;

public class ExamResultFieldSetMapper implements FieldSetMapper<ExamResult>{

	public ExamResult mapFieldSet(FieldSet fieldSet) throws BindException {
		ExamResult result = new ExamResult();
		result.setStudentName(fieldSet.readString(0));
		result.setDob(new LocalDate(fieldSet.readDate(1,"dd/MM/yyyy")));
		result.setPercentage(fieldSet.readDouble(2));
		return result;
	}

}

Step 7: Create an ItemProcessor

ItemProcessor is Optional, and called after item read but before item write. It gives us the opportunity to perform a business logic on each item. In our case, for example, we will filter out all the items whose percentage is less than 60. So final result will only have records with percentage >= 60.

com.websystique.springbatch.ExamResultItemProcessor

package com.websystique.springbatch;

import org.springframework.batch.item.ItemProcessor;

import com.websystique.springbatch.model.ExamResult;

public class ExamResultItemProcessor implements ItemProcessor<ExamResult, ExamResult> {

	@Override
	public ExamResult process(ExamResult result) throws Exception {
		System.out.println("Processing result :"+result);
		
		/*
		 * Only return results which are more than 60%
		 * 
		 */
		if(result.getPercentage() < 60){
			return null;
		}
		
		return result;
	}
}

Step 8: Add a Job listener(JobExecutionListener)

Job listener is Optional and provide the opportunity to execute some business logic before job start and after job completed.For example setting up environment can be done before job and cleanup can be done after job completed.

com.websystique.springbatch.ExamResultJobListener

package com.websystique.springbatch;

import java.util.List;

import org.joda.time.DateTime;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;

public class ExamResultJobListener implements JobExecutionListener{

	private DateTime startTime, stopTime;
	
	@Override
	public void beforeJob(JobExecution jobExecution) {
		startTime = new DateTime();
		System.out.println("ExamResult Job starts at :"+startTime);
	}
	

	@Override
	public void afterJob(JobExecution jobExecution) {
		stopTime = new DateTime();
		System.out.println("ExamResult Job stops at :"+stopTime);
		System.out.println("Total time take in millis :"+getTimeInMillis(startTime , stopTime));
		
		if(jobExecution.getStatus() == BatchStatus.COMPLETED){
			System.out.println("ExamResult job completed successfully");
			//Here you can perform some other business logic like cleanup
		}else if(jobExecution.getStatus() == BatchStatus.FAILED){
			System.out.println("ExamResult job failed with following exceptions ");
			List<Throwable> exceptionList = jobExecution.getAllFailureExceptions();
			for(Throwable th : exceptionList){
				System.err.println("exception :" +th.getLocalizedMessage());
			}
		}
	}
	
	private long getTimeInMillis(DateTime start, DateTime stop){
		return stop.getMillis() - start.getMillis();
	}
	
}

Step 9: Create Spring Context with Hibernate SessionFactory & Spring Batch job configuration

Create a Pooled datasource [context-datasource.xml]

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/beans 
		http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">

	<bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource"
    	destroy-method="close">
    	<property name="driverClass" value="com.mysql.jdbc.Driver" />
    	<property name="jdbcUrl" value="jdbc:mysql://localhost:3306/websystique" />
    	<property name="user" value="myuser" />
    	<property name="password" value="mypassword" />
	</bean>

</beans>			

Create Hibernate SessionFactory [context-model.xml]

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xsi:schemaLocation="http://www.springframework.org/schema/beans	http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
        		http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
        		http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
        		default-autowire="byName"  default-init-method="init">

	<import resource="classpath:context-datasource.xml"/>

	<bean id="sessionFactory" class="org.springframework.orm.hibernate4.LocalSessionFactoryBean" >
		<property name="dataSource" ref="dataSource"/>
		<property name="packagesToScan">
			<list>
				<value>com.websystique.springbatch.model</value>
			</list>
		</property>
		<property name="hibernateProperties">
			<props>
 				<prop key="hibernate.dialect">org.hibernate.dialect.MySQLDialect</prop>
				<!--  	<prop key="hibernate.show_sql">true</prop> -->
				<!-- 	<prop key="hibernate.format_sql">true</prop> -->
			</props>
		
		</property>		
	</bean>
	
	<bean id="transactionManager" class="org.springframework.orm.hibernate4.HibernateTransactionManager" />

	<tx:annotation-driven transaction-manager="transactionManager"/>
	
</beans>

Create Spring-batch context putting everything together [spring-batch-context.xml]

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-3.0.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd">

	<import resource="classpath:context-model.xml"/>


	<!-- JobRepository and JobLauncher are configuration/setup classes -->
	<bean id="jobRepository" class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean" />

	<bean id="jobLauncher" 	class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>


	<bean id="multiResourceItemReader" class="org.springframework.batch.item.file.MultiResourceItemReader">
		<property name="resources" value="classpath:csv/ExamResult*.txt" />
		<property name="delegate" ref="flatFileItemReader" />
  	</bean>

	<!-- ItemReader reads a complete line one by one from input file -->
	<bean id="flatFileItemReader" class="org.springframework.batch.item.file.FlatFileItemReader"  scope="step">

		<property name="lineMapper">

			<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

				<property name="fieldSetMapper">
					<!-- Mapper which maps each individual items in a record to properties in POJO -->
					<bean class="com.websystique.springbatch.ExamResultFieldSetMapper" />
				</property>

				<property name="lineTokenizer">
					<!-- A tokenizer class to be used when items in input record are separated by specific characters -->
					<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
						<property name="delimiter" value="|" />
					</bean>
				</property>

			</bean>

		</property>

	</bean>


	<!-- ItemWriter which writes data to database -->
  	<bean id="databaseItemWriter" class="org.springframework.batch.item.database.HibernateItemWriter">
		<property name="sessionFactory" ref="sessionFactory" />
	</bean>


	<!-- Optional ItemProcessor to perform business logic/filtering on the input records -->
	<bean id="itemProcessor" class="com.websystique.springbatch.ExamResultItemProcessor" />

	<!-- Optional JobExecutionListener to perform business logic before and after the job -->
	<bean id="jobListener" class="com.websystique.springbatch.ExamResultJobListener" />


	<!-- Actual Job -->
	<batch:job id="examResultJob">
		<batch:step id="step1">
			<batch:tasklet transaction-manager="transactionManager">
				<batch:chunk reader="multiResourceItemReader" writer="databaseItemWriter"
					processor="itemProcessor" commit-interval="10" />
			</batch:tasklet>
		</batch:step>
		<batch:listeners>
			<batch:listener ref="jobListener" />
		</batch:listeners>
	</batch:job>

</beans>			

As you can see, we have setup a job with only one step. Step uses MultiResourceItemReader to read all the files matching specific pattern in a defined directory. MultiResourceItemReader then delegates individual file reading to FlatFileItemReader which reads ‘|’ seperated fields from input file. On the write side, we have used HibernateItemWriter which needs sessionFactory as property.

With ItemProcessor we are filtering out the records which does not meet certain criteria(percentage > 60).jobExecutionListener can contain any arbitrary logic you might need to run before and after the job.

Step 10: Create Main application to finally run the job

Create a Java application to run the job.

com.websystique.springbatch.Main

package com.websystique.springbatch;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Main {

	@SuppressWarnings("resource")
	public static void main(String areg[]){
		
		ApplicationContext context = new ClassPathXmlApplicationContext("spring-batch-context.xml");
		
		JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
		Job job = (Job) context.getBean("examResultJob");
	 
		try {
			JobExecution execution = jobLauncher.run(job, new JobParameters());
			System.out.println("Job Exit Status : "+ execution.getStatus());
	 
		} catch (JobExecutionException e) {
			System.out.println("Job ExamResult failed");
			e.printStackTrace();
		}
	}

}

Below is the output

ExamResult Job starts at :2014-08-06T19:57:00.871+02:00
Aug 6, 2014 7:57:00 PM org.springframework.batch.core.job.SimpleStepHandler handleStep
INFO: Executing step: [step1]
Processing result :ExamResult [id=0, studentName=Brian Burlet, dob=1985-02-01, percentage=76.0]
Processing result :ExamResult [id=0, studentName=Jimmy Snuka, dob=1983-02-01, percentage=39.0]
Processing result :ExamResult [id=0, studentName=Renard konig, dob=1970-02-01, percentage=61.0]
Processing result :ExamResult [id=0, studentName=Kevin Richard, dob=2002-02-01, percentage=59.0]
Processing result :ExamResult [id=0, studentName=Sam Disilva, dob=1992-05-01, percentage=76.0]
Processing result :ExamResult [id=0, studentName=Bob corbet, dob=1990-07-10, percentage=29.0]
Processing result :ExamResult [id=0, studentName=Rick Ricky, dob=1973-02-01, percentage=54.0]
Processing result :ExamResult [id=0, studentName=Igor Watson, dob=1986-02-01, percentage=34.0]
Processing result :ExamResult [id=0, studentName=Peet Sampras, dob=1978-02-01, percentage=97.0]
Processing result :ExamResult [id=0, studentName=Rita Paul, dob=1993-02-01, percentage=92.0]
Processing result :ExamResult [id=0, studentName=Han Yenn, dob=1965-02-01, percentage=83.0]
ExamResult Job stops at :2014-08-06T19:57:01.384+02:00
Total time take in millis :513
ExamResult job completed successfully
Job Exit Status : COMPLETED
Aug 6, 2014 7:57:01 PM org.springframework.batch.core.launch.support.SimpleJobLauncher run
INFO: Job: [FlowJob: [name=examResultJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

As you can see, we have read all records from all files. Finally, check the EXAM_RESULT table in database. Below is the snapshot from database after this job.

SpringBatchMultiReaderHibernateWriter_img2

All Records from input files who satisfied the criteria ( percentage >=60 )defined in ItemProcessor are saved successfully in database.

That’s it.

Download Source Code


References