TL;DR
- SparkSession Bean:
@Configuration으로 싱글톤 관리,@PreDestroy로 정리- 프로파일 분리: local, production, test 환경별 Spark 설정
- 비동기 배치:
@Async+CompletableFuture로 대용량 처리- REST API: DataFrame 결과를 JSON 응답으로 변환
대상 독자 및 선수 지식#
| 구분 | 내용 |
|---|---|
| 대상 독자 | Spring Boot 애플리케이션에 Spark를 통합하려는 백엔드 개발자 |
| 선수 지식 | Spring Boot 기초, 환경 설정 완료, Spark DataFrame API |
| 학습 목표 | Spring Boot에서 Spark를 활용한 데이터 분석 API를 구현할 수 있다 |
| 예상 소요 시간 | 약 40분 |
Java/Spring 개발자를 위한 Spark와 Spring Boot 통합 패턴입니다.
아키텍처 패턴#
flowchart TB
subgraph SpringBoot["Spring Boot Application"]
Controller[REST Controller]
Service[Business Service]
SparkService[Spark Service]
Config[Spark Configuration]
end
subgraph Spark["Spark Context"]
SparkSession[SparkSession Bean]
Jobs[Batch Jobs]
end
subgraph Storage["Data Storage"]
DB[(Database)]
HDFS[(HDFS/S3)]
Kafka[Kafka]
end
Controller --> Service
Service --> SparkService
SparkService --> SparkSession
SparkSession --> Jobs
Jobs --> HDFS
Jobs --> DB
Jobs --> Kafka
Config --> SparkSession다이어그램 설명: Spring Boot Application 내 REST Controller가 Service를 통해 Spark Service에 요청하고, SparkSession Bean이 Batch Jobs를 실행하여 HDFS/S3, Database, Kafka 등 스토리지와 연동하는 아키텍처
Gradle 설정#
// build.gradle.kts
plugins {
java
id("org.springframework.boot") version "3.2.5"
id("io.spring.dependency-management") version "1.1.4"
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
}
}
repositories {
mavenCentral()
}
val sparkVersion = "3.5.1"
val scalaVersion = "2.13"
dependencies {
// Spring Boot
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-actuator")
// Spark (Log4j 제외 필수)
implementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j", module = "log4j")
exclude(group = "org.apache.logging.log4j")
}
implementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
}
// Spark JDBC (선택)
implementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion")
// Jackson for JSON (버전 통일)
implementation("com.fasterxml.jackson.module:jackson-module-scala_$scalaVersion:2.15.3")
// Logging (Spring Boot 표준)
implementation("org.springframework.boot:spring-boot-starter-logging")
// Test
testImplementation("org.springframework.boot:spring-boot-starter-test")
}
// 의존성 충돌 해결
configurations.all {
exclude(group = "org.slf4j", module = "slf4j-log4j12")
exclude(group = "log4j", module = "log4j")
resolutionStrategy {
force("com.fasterxml.jackson.core:jackson-databind:2.15.3")
force("com.fasterxml.jackson.core:jackson-core:2.15.3")
}
}SparkSession 빈 구성#
기본 설정
package com.example.spark.config;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Configuration
public class SparkConfig {
private static final Logger logger = LoggerFactory.getLogger(SparkConfig.class);
@Value("${spark.app.name:spring-spark-app}")
private String appName;
@Value("${spark.master:local[*]}")
private String masterUrl;
@Value("${spark.driver.memory:2g}")
private String driverMemory;
@Value("${spark.executor.memory:2g}")
private String executorMemory;
@Value("${spark.sql.shuffle.partitions:200}")
private int shufflePartitions;
private SparkSession sparkSession;
@Bean
@Profile("!test") // 테스트에서는 별도 설정 사용
public SparkSession sparkSession() {
logger.info("Initializing SparkSession: app={}, master={}", appName, masterUrl);
SparkConf conf = new SparkConf()
.setAppName(appName)
.setMaster(masterUrl)
.set("spark.driver.memory", driverMemory)
.set("spark.executor.memory", executorMemory)
.set("spark.sql.shuffle.partitions", String.valueOf(shufflePartitions))
// Adaptive Query Execution
.set("spark.sql.adaptive.enabled", "true")
.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
// 직렬화 최적화
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryoserializer.buffer.max", "1024m")
// UI 설정
.set("spark.ui.enabled", "true")
.set("spark.ui.port", "4040");
this.sparkSession = SparkSession.builder()
.config(conf)
.getOrCreate();
// Spark 로그 레벨 조정
sparkSession.sparkContext().setLogLevel("WARN");
logger.info("SparkSession initialized successfully");
return sparkSession;
}
@Bean
@Profile("test")
public SparkSession testSparkSession() {
logger.info("Initializing Test SparkSession");
this.sparkSession = SparkSession.builder()
.appName("test-spark")
.master("local[2]")
.config("spark.sql.shuffle.partitions", "2")
.config("spark.ui.enabled", "false")
.config("spark.driver.bindAddress", "127.0.0.1")
.getOrCreate();
return sparkSession;
}
@PreDestroy
public void cleanup() {
if (sparkSession != null && !sparkSession.sparkContext().isStopped()) {
logger.info("Stopping SparkSession");
sparkSession.stop();
}
}
}application.yml 설정
# application.yml
spring:
application:
name: spark-spring-app
# Spark 설정
spark:
app:
name: ${spring.application.name}
master: local[*]
driver:
memory: 2g
executor:
memory: 2g
sql:
shuffle:
partitions: 100
# 프로파일별 설정
---
spring:
config:
activate:
on-profile: production
spark:
master: spark://spark-master:7077
driver:
memory: 4g
executor:
memory: 8g
sql:
shuffle:
partitions: 200
---
spring:
config:
activate:
on-profile: test
spark:
master: local[2]
driver:
memory: 512m
sql:
shuffle:
partitions: 2핵심 포인트: SparkSession 빈 구성
- 싱글톤 관리: SparkSession은 JVM당 하나만 존재해야 함
- 프로파일 분리:
@Profile("!test")로 테스트 환경 별도 설정- @PreDestroy: 애플리케이션 종료 시 SparkSession 안전하게 정리
- AQE 활성화:
spark.sql.adaptive.enabled로 쿼리 최적화 자동화
서비스 레이어 패턴#
기본 Spark 서비스
package com.example.spark.service;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.spark.sql.functions.*;
@Service
public class DataAnalysisService {
private static final Logger logger = LoggerFactory.getLogger(DataAnalysisService.class);
private final SparkSession spark;
public DataAnalysisService(SparkSession spark) {
this.spark = spark;
}
/**
* 매출 요약 리포트 생성
*/
public SalesSummary generateSalesSummary(String dataPath, String startDate, String endDate) {
logger.info("매출 요약 생성: {} ~ {}", startDate, endDate);
Dataset<Row> sales = spark.read()
.option("header", "true")
.option("inferSchema", "true")
.parquet(dataPath);
Dataset<Row> filtered = sales
.filter(col("sale_date").between(startDate, endDate));
Row summary = filtered.agg(
count("*").alias("total_orders"),
sum("amount").alias("total_revenue"),
avg("amount").alias("avg_order_value"),
countDistinct("customer_id").alias("unique_customers")
).first();
return new SalesSummary(
summary.getLong(0),
summary.getDouble(1),
summary.getDouble(2),
summary.getLong(3)
);
}
/**
* 카테고리별 매출 집계
*/
public List<CategoryRevenue> getCategoryRevenue(String dataPath) {
Dataset<Row> result = spark.read()
.parquet(dataPath)
.groupBy("category")
.agg(
sum("amount").alias("revenue"),
count("*").alias("order_count")
)
.orderBy(col("revenue").desc());
return result.collectAsList().stream()
.map(row -> new CategoryRevenue(
row.getString(0),
row.getDouble(1),
row.getLong(2)
))
.toList();
}
// DTO 클래스
public record SalesSummary(
long totalOrders,
double totalRevenue,
double avgOrderValue,
long uniqueCustomers
) {}
public record CategoryRevenue(
String category,
double revenue,
long orderCount
) {}
}비동기 배치 작업
package com.example.spark.service;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CompletableFuture;
import static org.apache.spark.sql.functions.*;
@Service
public class BatchProcessingService {
private static final Logger logger = LoggerFactory.getLogger(BatchProcessingService.class);
private final SparkSession spark;
public BatchProcessingService(SparkSession spark) {
this.spark = spark;
}
/**
* 일일 ETL 작업 (스케줄링)
*/
@Scheduled(cron = "0 0 2 * * *") // 매일 새벽 2시
public void dailyETL() {
String yesterday = LocalDate.now().minusDays(1)
.format(DateTimeFormatter.ISO_DATE);
logger.info("일일 ETL 시작: {}", yesterday);
try {
// 1. 원본 데이터 읽기
Dataset<Row> rawData = spark.read()
.parquet("s3://data-lake/raw/events/date=" + yesterday);
// 2. 데이터 정제
Dataset<Row> cleaned = rawData
.filter(col("event_type").isNotNull())
.dropDuplicates("event_id")
.withColumn("processed_at", current_timestamp());
// 3. 집계 테이블 생성
Dataset<Row> aggregated = cleaned
.groupBy("user_id", "event_type")
.agg(
count("*").alias("event_count"),
sum("value").alias("total_value")
);
// 4. 저장
aggregated.write()
.mode(SaveMode.Overwrite)
.partitionBy("event_type")
.parquet("s3://data-lake/processed/daily/" + yesterday);
logger.info("일일 ETL 완료: {} 레코드 처리", cleaned.count());
} catch (Exception e) {
logger.error("일일 ETL 실패: {}", e.getMessage(), e);
throw new RuntimeException("ETL 실패", e);
}
}
/**
* 비동기 대량 데이터 처리
*/
@Async
public CompletableFuture<ProcessingResult> processLargeDataset(String inputPath, String outputPath) {
logger.info("비동기 처리 시작: {}", inputPath);
long startTime = System.currentTimeMillis();
try {
Dataset<Row> data = spark.read().parquet(inputPath);
Dataset<Row> processed = data
.repartition(200) // 병렬 처리 최적화
.transform(this::applyBusinessLogic);
processed.write()
.mode(SaveMode.Overwrite)
.parquet(outputPath);
long duration = System.currentTimeMillis() - startTime;
long recordCount = processed.count();
logger.info("비동기 처리 완료: {} 레코드, {}ms", recordCount, duration);
return CompletableFuture.completedFuture(
new ProcessingResult(true, recordCount, duration, null)
);
} catch (Exception e) {
logger.error("비동기 처리 실패: {}", e.getMessage(), e);
return CompletableFuture.completedFuture(
new ProcessingResult(false, 0, 0, e.getMessage())
);
}
}
private Dataset<Row> applyBusinessLogic(Dataset<Row> df) {
return df
.filter(col("status").equalTo("ACTIVE"))
.withColumn("score", col("value").multiply(0.8).plus(col("bonus")));
}
public record ProcessingResult(
boolean success,
long recordCount,
long durationMs,
String errorMessage
) {}
}핵심 포인트: 서비스 레이어 패턴
- 생성자 주입: SparkSession을 생성자로 주입받아 테스트 용이성 확보
- @Async 배치: 대용량 처리는 비동기로 실행하여 API 응답 지연 방지
- @Scheduled: cron 표현식으로 일일/주간 배치 작업 스케줄링
- CompletableFuture: 비동기 작업 결과와 상태 추적
REST API 통합#
분석 API 컨트롤러
package com.example.spark.controller;
import com.example.spark.service.DataAnalysisService;
import com.example.spark.service.DataAnalysisService.SalesSummary;
import com.example.spark.service.DataAnalysisService.CategoryRevenue;
import com.example.spark.service.BatchProcessingService;
import com.example.spark.service.BatchProcessingService.ProcessingResult;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/api/v1/analytics")
public class AnalyticsController {
private final DataAnalysisService analysisService;
private final BatchProcessingService batchService;
public AnalyticsController(
DataAnalysisService analysisService,
BatchProcessingService batchService) {
this.analysisService = analysisService;
this.batchService = batchService;
}
/**
* 매출 요약 조회
* GET /api/v1/analytics/sales/summary?start=2024-01-01&end=2024-01-31
*/
@GetMapping("/sales/summary")
public ResponseEntity<SalesSummary> getSalesSummary(
@RequestParam String start,
@RequestParam String end) {
SalesSummary summary = analysisService.generateSalesSummary(
"data/sales.parquet", start, end);
return ResponseEntity.ok(summary);
}
/**
* 카테고리별 매출 조회
* GET /api/v1/analytics/category/revenue
*/
@GetMapping("/category/revenue")
public ResponseEntity<List<CategoryRevenue>> getCategoryRevenue() {
List<CategoryRevenue> revenues = analysisService.getCategoryRevenue(
"data/sales.parquet");
return ResponseEntity.ok(revenues);
}
/**
* 비동기 배치 작업 시작
* POST /api/v1/analytics/batch/process
*/
@PostMapping("/batch/process")
public ResponseEntity<String> startBatchProcess(
@RequestBody BatchRequest request) {
CompletableFuture<ProcessingResult> future = batchService
.processLargeDataset(request.inputPath(), request.outputPath());
// 작업 ID 반환 (실제 구현에서는 작업 추적 시스템 연동)
String jobId = "job-" + System.currentTimeMillis();
return ResponseEntity.accepted()
.body("{\"jobId\": \"" + jobId + "\", \"status\": \"PROCESSING\"}");
}
public record BatchRequest(String inputPath, String outputPath) {}
}핵심 포인트: REST API 통합
- 동기 조회: 소규모 집계는 즉시 응답 (GET 엔드포인트)
- 비동기 처리: 대용량 배치는 작업 ID 반환 후 백그라운드 실행 (POST + 202 Accepted)
- JSON 변환:
collectAsList()+ Stream API로 DataFrame → List 변환- 작업 추적: 실제 운영에서는 작업 ID로 상태 조회 API 추가 필요
테스트 작성#
Spark 통합 테스트
package com.example.spark.service;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;
import java.util.Arrays;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest
@ActiveProfiles("test")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class DataAnalysisServiceTest {
@Autowired
private SparkSession spark;
@Autowired
private DataAnalysisService analysisService;
private String testDataPath;
@BeforeAll
void setupTestData() {
// 테스트 데이터 스키마
StructType schema = new StructType(new StructField[]{
DataTypes.createStructField("order_id", DataTypes.StringType, false),
DataTypes.createStructField("category", DataTypes.StringType, false),
DataTypes.createStructField("amount", DataTypes.DoubleType, false),
DataTypes.createStructField("customer_id", DataTypes.StringType, false),
DataTypes.createStructField("sale_date", DataTypes.StringType, false)
});
// 테스트 데이터 생성
List<Row> testData = Arrays.asList(
RowFactory.create("O001", "Electronics", 150.0, "C001", "2024-01-15"),
RowFactory.create("O002", "Electronics", 200.0, "C002", "2024-01-15"),
RowFactory.create("O003", "Clothing", 50.0, "C001", "2024-01-16"),
RowFactory.create("O004", "Books", 25.0, "C003", "2024-01-17"),
RowFactory.create("O005", "Electronics", 300.0, "C002", "2024-01-18")
);
Dataset<Row> testDf = spark.createDataFrame(testData, schema);
testDataPath = "target/test-data/sales.parquet";
testDf.write()
.mode("overwrite")
.parquet(testDataPath);
}
@Test
@DisplayName("매출 요약이 올바르게 계산되어야 함")
void shouldCalculateSalesSummary() {
// when
var summary = analysisService.generateSalesSummary(
testDataPath, "2024-01-01", "2024-01-31");
// then
assertThat(summary.totalOrders()).isEqualTo(5);
assertThat(summary.totalRevenue()).isEqualTo(725.0);
assertThat(summary.avgOrderValue()).isEqualTo(145.0);
assertThat(summary.uniqueCustomers()).isEqualTo(3);
}
@Test
@DisplayName("카테고리별 매출이 내림차순으로 정렬되어야 함")
void shouldReturnCategoryRevenuesSorted() {
// when
var revenues = analysisService.getCategoryRevenue(testDataPath);
// then
assertThat(revenues).hasSize(3);
assertThat(revenues.get(0).category()).isEqualTo("Electronics");
assertThat(revenues.get(0).revenue()).isEqualTo(650.0);
}
@AfterAll
void cleanup() {
// 테스트 데이터 정리
new java.io.File(testDataPath).delete();
}
}핵심 포인트: 테스트 작성
- @ActiveProfiles(“test”): 테스트용 SparkSession 설정 사용
- @BeforeAll: 테스트 데이터 생성 및 Parquet 저장
- createDataFrame(): 인메모리 테스트 데이터 생성
- @AfterAll: 테스트 완료 후 임시 파일 정리
Java vs Scala 비교#
| 구분 | Java | Scala |
|---|---|---|
| 타입 선언 | Dataset<Row> | DataFrame (타입 alias) |
| 람다 표현식 | df.filter(row -> row.getInt(0) > 10) | df.filter(row => row.getInt(0) > 10) |
| 컬럼 참조 | col("name") | $"name" |
| 문자열 보간 | "value: " + value | s"value: $value" |
| Case Class | record (Java 17+) | case class |
| 패턴 매칭 | switch expression (Java 21+) | match 표현식 |
| Spring 통합 | 네이티브 지원 | 어노테이션 호환 |
| IDE 지원 | IntelliJ, VS Code | IntelliJ + Scala 플러그인 |
동일 로직 비교
Java:
Dataset<Row> result = df
.filter(col("age").gt(18))
.groupBy("city")
.agg(avg("salary").alias("avg_salary"))
.orderBy(col("avg_salary").desc());Scala:
val result = df
.filter($"age" > 18)
.groupBy("city")
.agg(avg("salary").alias("avg_salary"))
.orderBy($"avg_salary".desc)