A Spring Batch application demonstrating CSV processing with database persistence and Kafka event publishing using Java 21 and Spring Batch 5.x.
- CSV Processing: Read and parse CSV files with configurable delimiters and headers
- Database Integration: Write processed data to various database systems (MySQL, PostgreSQL, etc.)
- Kafka Integration: Publish events to Kafka topics for real-time processing
- Batch Processing: Efficient processing of large datasets with chunk-based processing
- Error Handling: Comprehensive error handling with retry mechanisms
- Monitoring: Job execution monitoring and metrics
- Configurable: Flexible configuration for different data formats and destinations
- Java 21 or higher
- Maven 3.8+ or Gradle 8+
- Database: MySQL 8.0+, PostgreSQL 13+, or H2 (for testing)
- Apache Kafka 3.0+ (optional, for event publishing)
- IDE: IntelliJ IDEA, Eclipse, or VS Code
- Java 21 - Latest LTS version with modern features
- Spring Boot 3.x - Application framework
- Spring Batch 5.x - Batch processing framework
- Spring Data JPA - Database operations
- Apache Kafka - Event streaming
- Maven/Gradle - Build tools
- Docker - Containerization (optional)
The application follows a layered architecture pattern with clear separation of concerns:
Data Flow:
- CSV Input β FlatFileItemReader reads CSV files
- Processing β ItemProcessor applies business logic
- Output β ItemWriter persists to database or publishes to Kafka
- Metadata β JobRepository tracks execution status
- Monitoring β Actuator endpoints provide metrics
Key Components:
- Job: Defines the overall batch process
- Step: Individual processing unit within a job
- Chunk: Configurable batch size for processing
- Reader: Extracts data from CSV source
- Processor: Transforms and validates data
- Writer: Persists data to target systems
- Repository: Stores job execution metadata
Error Handling:
- Retry mechanism for transient failures
- Skip policy for invalid records
- Comprehensive logging and monitoring
##οΈ Installation
git clone https://github.com/yourusername/spring-batch-service.git
cd spring-batch-serviceUpdate application.yml with your database configuration:
spring:
datasource:
url: jdbc:mysql://localhost:3306/batch_service
username: your_username
password: your_password
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
hibernate:
ddl-auto: create-drop
show-sql: trueIf using Kafka, update the configuration:
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer# Using Maven
mvn clean install
# Using Gradle
./gradlew build# Run with Maven
mvn spring-boot:run
# Run with Gradle
./gradlew bootRun
# Run the JAR file
java -jar target/spring-batch-service-1.0.0.jar# Build Docker image
docker build -t spring-batch-service .
# Run container
docker run -p 8080:8080 spring-batch-serviceThe project follows a standard Spring Boot structure with batch-specific components:
src/
βββ main/
β βββ java/
β β βββ com/example/spring/batch/service
β β βββ config/
β β β βββ BatchConfig.java # Batch job configuration
β β β βββ DatabaseConfig.java # Database configuration
β β β βββ KafkaConfig.java # Kafka configuration
β β βββ model/
β β β βββ DataRecord.java # Data model/entity
β β βββ processor/
β β β βββ DataProcessor.java # Business logic processor
β β βββ reader/
β β β βββ CustomCsvReader.java # CSV file reader
β β βββ writer/
β β β βββ DatabaseWriter.java # Database writer
β β β βββ KafkaWriter.java # Kafka event writer
β β βββ SpringBatchbatch-serviceApplication.java
β βββ resources/
β βββ application.yml # Application configuration
β βββ data/
β β βββ sample.csv # Sample input data
β βββ schema.sql # Database schema
βββ test/
βββ java/
βββ com/example/batch/
βββ BatchJobTest.java # Integration tests
Key Components:
- Config: Contains all configuration classes for batch, database, and Kafka
- Model: Data entities and DTOs
- Processor: Business logic for data transformation
- Reader: Custom CSV reading implementations
- Writer: Database and Kafka writing implementations
# Batch Configuration
spring:
batch:
job:
enabled: false # Disable auto-start
jdbc:
initialize-schema: always
# Job Parameters
job:
input-file: classpath:data/input.csv
chunk-size: 1000
retry-limit: 3
# Kafka Configuration
kafka:
topic:
name: batch-events
partitions: 3
replicas: 1| Parameter | Description | Default |
|---|---|---|
input.file |
Input CSV file path | classpath:data/input.csv |
chunk.size |
Processing chunk size | 1000 |
retry.limit |
Number of retry attempts | 3 |
output.destination |
Output destination (db/kafka) | db |
# Run job with parameters
java -jar spring-batch-service.jar \
--spring.batch.job.names=csvToDatabaseJob \
--input.file=classpath:data/input.csv \
--chunk.size=500# Run job with Kafka output
java -jar spring-batch-service.jar \
--spring.batch.job.names=csvToKafkaJob \
--input.file=classpath:data/input.csv \
--output.destination=kafka@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job csvToDatabaseJob;
public void runJob() throws Exception {
JobParameters params = new JobParametersBuilder()
.addString("input.file", "classpath:data/input.csv")
.addLong("timestamp", System.currentTimeMillis())
.toJobParameters();
jobLauncher.run(csvToDatabaseJob, params);
}Access job execution details via REST endpoints:
# Get all job executions
curl http://localhost:8080/actuator/batch/jobs
# Get specific job execution
curl http://localhost:8080/actuator/batch/jobs/{executionId}
# Get job execution metrics
curl http://localhost:8080/actuator/metrics/batch.jobs# Application health
curl http://localhost:8080/actuator/health
# Database health
curl http://localhost:8080/actuator/health/db
# Kafka health
curl http://localhost:8080/actuator/health/kafka# Run unit tests
mvn test
# Run with coverage
mvn test jacoco:report# Run integration tests
mvn verify -P integration-testSample CSV format for testing:
id,name,email,age,city
1,John Doe,[email protected],30,New York
2,Jane Smith,[email protected],25,Los Angeles
3,Bob Johnson,[email protected],35,Chicago@Entity
public class CustomDataRecord {
@Id
private Long id;
private String name;
private String email;
private Integer age;
private String city;
// Getters, setters, constructors
}@Component
public class CustomDataProcessor implements ItemProcessor<InputRecord, OutputRecord> {
@Override
public OutputRecord process(InputRecord item) throws Exception {
// Custom processing logic
return new OutputRecord(item);
}
}@Component
public class CustomCsvReader extends FlatFileItemReader<DataRecord> {
// Custom reading logic
}
@Component
public class CustomDatabaseWriter extends JpaItemWriter<DataRecord> {
// Custom writing logic
}Job not starting:
- Check if
spring.batch.job.enabled=falsein application.yml - Verify job parameters are correct
- Check database connectivity
CSV parsing errors:
- Verify CSV format and delimiter
- Check for encoding issues (UTF-8 recommended)
- Validate header mapping
Database connection issues:
- Verify database credentials
- Check database server is running
- Ensure proper JDBC driver
Kafka connection issues:
- Verify Kafka broker is running
- Check topic exists and has proper permissions
- Validate serializer configuration
Enable debug logging:
logging:
level:
com.example.batch: DEBUG
org.springframework.batch: DEBUGjob:
chunk-size: 1000 # Adjust based on memory and performancespring:
jpa:
properties:
hibernate:
jdbc:
batch_size: 50
order_inserts: true
order_updates: true# JVM options for large datasets
java -Xmx4g -Xms2g -jar spring-batch-service.jar- Use connection pooling
- Implement proper authentication
- Use encrypted connections (SSL/TLS)
- Enable SASL authentication
- Use SSL/TLS encryption
- Implement proper ACLs
| Endpoint | Method | Description |
|---|---|---|
/api/jobs |
GET | List all jobs |
/api/jobs/{jobName}/executions |
GET | Get job executions |
/api/jobs/{jobName}/start |
POST | Start a job |
/api/jobs/{executionId}/stop |
POST | Stop a job execution |
# Start a job
curl -X POST http://localhost:8080/api/jobs/csvToDatabaseJob/start \
-H "Content-Type: application/json" \
-d '{"inputFile": "classpath:data/input.csv"}'
# Get job status
curl http://localhost:8080/api/jobs/csvToDatabaseJob/executions- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
# Clone and setup
git clone https://github.com/yourusername/spring-batch-service.git
cd spring-batch-service
# Install dependencies
mvn clean install
# Run tests
mvn test
# Start development server
mvn spring-boot:runThis project is licensed under the MIT License - see the LICENSE file for details.
- Issues: GitHub Issues
- Documentation: Wiki
- Discussions: GitHub Discussions
- Spring Batch - The batch processing framework
- Spring Boot - The application framework
- Apache Kafka - The event streaming platform
- Initial release
- CSV to database processing
- CSV to Kafka processing
- Basic monitoring and metrics
- Comprehensive error handling
β If this project helps you, please consider giving it a star!
For more information about Spring Batch, visit the official documentation.