Environment Setup#
Configure the environment for using Spark in Java/Spring Boot projects.
Pure Java Project#
The simplest configuration.
build.gradle#
plugins {
id 'java'
id 'application'
}
group = 'com.example'
version = '1.0.0'
java {
sourceCompatibility = '17'
}
repositories {
mavenCentral()
}
dependencies {
// Spark Core
implementation 'org.apache.spark:spark-core_2.13:3.5.1'
// Spark SQL (DataFrame API)
implementation 'org.apache.spark:spark-sql_2.13:3.5.1'
// Kafka integration (optional)
implementation 'org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.1'
// MLlib (optional)
implementation 'org.apache.spark:spark-mllib_2.13:3.5.1'
// Logging (conflict prevention)
implementation 'org.slf4j:slf4j-simple:2.0.9'
}
// Logging conflict prevention
configurations.all {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'log4j', module: 'log4j'
}
application {
mainClass = 'com.example.SparkApp'
}
// Memory settings for execution
tasks.named('run') {
jvmArgs = ['-Xmx2g']
}Basic Structure#
src/
├── main/
│ ├── java/
│ │ └── com/example/
│ │ └── SparkApp.java
│ └── resources/
│ └── data/
│ └── sample.csvSparkApp.java#
package com.example;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class SparkApp {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("My Spark App")
.master("local[*]")
.config("spark.driver.memory", "2g")
.getOrCreate();
spark.sparkContext().setLogLevel("WARN");
// Data processing
Dataset<Row> df = spark.read()
.option("header", "true")
.option("inferSchema", "true")
.csv("src/main/resources/data/sample.csv");
df.show();
spark.stop();
}
}Spring Boot Integration#
Configuration for using Spark with Spring Boot.
build.gradle#
plugins {
id 'java'
id 'org.springframework.boot' version '3.2.0'
id 'io.spring.dependency-management' version '1.1.4'
}
group = 'com.example'
version = '1.0.0'
java {
sourceCompatibility = '17'
}
repositories {
mavenCentral()
}
dependencies {
// Spring Boot
implementation 'org.springframework.boot:spring-boot-starter-web'
// Spark - exclude from Spring dependency management
implementation('org.apache.spark:spark-core_2.13:3.5.1') {
exclude group: 'org.slf4j'
exclude group: 'log4j'
}
implementation('org.apache.spark:spark-sql_2.13:3.5.1') {
exclude group: 'org.slf4j'
exclude group: 'log4j'
}
// Test
testImplementation 'org.springframework.boot:spring-boot-starter-test'
}
// Prevent logging conflicts between Spark and Spring
configurations.all {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'log4j', module: 'log4j'
exclude group: 'ch.qos.logback', module: 'logback-classic'
}SparkConfig.java#
package com.example.config;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import jakarta.annotation.PreDestroy;
@Configuration
public class SparkConfig {
@Value("${spark.app.name:SpringSparkApp}")
private String appName;
@Value("${spark.master:local[*]}")
private String master;
@Value("${spark.driver.memory:2g}")
private String driverMemory;
private SparkSession sparkSession;
@Bean
public SparkConf sparkConf() {
return new SparkConf()
.setAppName(appName)
.setMaster(master)
.set("spark.driver.memory", driverMemory)
.set("spark.sql.adaptive.enabled", "true");
}
@Bean
public SparkSession sparkSession(SparkConf sparkConf) {
sparkSession = SparkSession.builder()
.config(sparkConf)
.getOrCreate();
sparkSession.sparkContext().setLogLevel("WARN");
return sparkSession;
}
@PreDestroy
public void closeSparkSession() {
if (sparkSession != null) {
sparkSession.stop();
}
}
}application.yml#
spring:
application:
name: spark-spring-app
spark:
app:
name: ${spring.application.name}
master: local[*]
driver:
memory: 2g
server:
port: 8080DataService.java#
package com.example.service;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.springframework.stereotype.Service;
import static org.apache.spark.sql.functions.*;
@Service
public class DataService {
private final SparkSession spark;
public DataService(SparkSession spark) {
this.spark = spark;
}
public Dataset<Row> loadData(String path) {
return spark.read()
.option("header", "true")
.option("inferSchema", "true")
.csv(path);
}
public Dataset<Row> aggregateByCategory(Dataset<Row> df, String categoryCol, String valueCol) {
return df.groupBy(categoryCol)
.agg(
count("*").alias("count"),
sum(valueCol).alias("total"),
avg(valueCol).alias("average")
)
.orderBy(col("total").desc());
}
}DataController.java#
package com.example.controller;
import com.example.service.DataService;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@RestController
@RequestMapping("/api/data")
public class DataController {
private final DataService dataService;
public DataController(DataService dataService) {
this.dataService = dataService;
}
@GetMapping("/summary")
public List<Map<String, Object>> getSummary(
@RequestParam String path,
@RequestParam String category,
@RequestParam String value) {
Dataset<Row> df = dataService.loadData(path);
Dataset<Row> result = dataService.aggregateByCategory(df, category, value);
// Convert DataFrame to JSON response
return result.collectAsList().stream()
.map(row -> {
return Map.of(
"category", row.getAs(category),
"count", row.getAs("count"),
"total", row.getAs("total"),
"average", row.getAs("average")
);
})
.collect(Collectors.toList());
}
}Logging Settings#
log4j2.properties#
Spark uses Log4j2. Create src/main/resources/log4j2.properties:
# Root logger
rootLogger.level = WARN
rootLogger.appenderRef.console.ref = console
# Console output
appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
# Spark logs
logger.spark.name = org.apache.spark
logger.spark.level = WARN
# Kafka logs
logger.kafka.name = org.apache.kafka
logger.kafka.level = WARN
# Application logs
logger.app.name = com.example
logger.app.level = INFOWindows Environment Setup#
Windows requires Hadoop binaries.
winutils Setup#
# 1. Download winutils
# Download from https://github.com/steveloughran/winutils
# 2. Create directory
mkdir C:\hadoop\bin
# 3. Copy winutils.exe to C:\hadoop\bin
# 4. Set environment variables
setx HADOOP_HOME C:\hadoop
setx PATH "%PATH%;%HADOOP_HOME%\bin"Or Set in Code#
// Set Hadoop home on Windows
System.setProperty("hadoop.home.dir", "C:\\hadoop");Troubleshooting#
Logging Conflicts#
SLF4J: Class path contains multiple SLF4J bindingsSolution: Exclude conflicting logging libraries in build.gradle
configurations.all {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'log4j', module: 'log4j'
exclude group: 'ch.qos.logback', module: 'logback-classic'
}Duplicate SparkSession Creation#
Only one SparkContext may be running in this JVMSolution: Manage SparkSession as singleton or use getOrCreate()
// Use getOrCreate
SparkSession spark = SparkSession.builder()
.appName("App")
.getOrCreate(); // Reuse existing sessionOut of Memory#
java.lang.OutOfMemoryError: Java heap spaceSolution: Increase memory settings
// build.gradle
tasks.named('run') {
jvmArgs = ['-Xmx4g']
}Or in SparkSession:
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "4g")Java Version Compatibility#
Unsupported class file major versionSolution: Spark 3.5 supports Java 8, 11, 17. Java 21 is not supported.
java -version # Check versionNext Steps#
After environment setup is complete:
- Basic Examples - Data processing examples