TL;DR
  • SparkSession Bean: @Configuration으로 싱글톤 관리, @PreDestroy로 정리
  • 프로파일 분리: local, production, test 환경별 Spark 설정
  • 비동기 배치: @Async + CompletableFuture로 대용량 처리
  • REST API: DataFrame 결과를 JSON 응답으로 변환

대상 독자 및 선수 지식#

구분내용
대상 독자Spring Boot 애플리케이션에 Spark를 통합하려는 백엔드 개발자
선수 지식Spring Boot 기초, 환경 설정 완료, Spark DataFrame API
학습 목표Spring Boot에서 Spark를 활용한 데이터 분석 API를 구현할 수 있다
예상 소요 시간약 40분

Java/Spring 개발자를 위한 Spark와 Spring Boot 통합 패턴입니다.

아키텍처 패턴#

flowchart TB
    subgraph SpringBoot["Spring Boot Application"]
        Controller[REST Controller]
        Service[Business Service]
        SparkService[Spark Service]
        Config[Spark Configuration]
    end

    subgraph Spark["Spark Context"]
        SparkSession[SparkSession Bean]
        Jobs[Batch Jobs]
    end

    subgraph Storage["Data Storage"]
        DB[(Database)]
        HDFS[(HDFS/S3)]
        Kafka[Kafka]
    end

    Controller --> Service
    Service --> SparkService
    SparkService --> SparkSession
    SparkSession --> Jobs
    Jobs --> HDFS
    Jobs --> DB
    Jobs --> Kafka
    Config --> SparkSession

다이어그램 설명: Spring Boot Application 내 REST Controller가 Service를 통해 Spark Service에 요청하고, SparkSession Bean이 Batch Jobs를 실행하여 HDFS/S3, Database, Kafka 등 스토리지와 연동하는 아키텍처

Gradle 설정#

// build.gradle.kts
plugins {
    java
    id("org.springframework.boot") version "3.2.5"
    id("io.spring.dependency-management") version "1.1.4"
}

java {
    toolchain {
        languageVersion.set(JavaLanguageVersion.of(17))
    }
}

repositories {
    mavenCentral()
}

val sparkVersion = "3.5.1"
val scalaVersion = "2.13"

dependencies {
    // Spring Boot
    implementation("org.springframework.boot:spring-boot-starter-web")
    implementation("org.springframework.boot:spring-boot-starter-actuator")

    // Spark (Log4j 제외 필수)
    implementation("org.apache.spark:spark-core_$scalaVersion:$sparkVersion") {
        exclude(group = "org.slf4j", module = "slf4j-log4j12")
        exclude(group = "log4j", module = "log4j")
        exclude(group = "org.apache.logging.log4j")
    }
    implementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") {
        exclude(group = "org.slf4j", module = "slf4j-log4j12")
    }

    // Spark JDBC (선택)
    implementation("org.apache.spark:spark-hive_$scalaVersion:$sparkVersion")

    // Jackson for JSON (버전 통일)
    implementation("com.fasterxml.jackson.module:jackson-module-scala_$scalaVersion:2.15.3")

    // Logging (Spring Boot 표준)
    implementation("org.springframework.boot:spring-boot-starter-logging")

    // Test
    testImplementation("org.springframework.boot:spring-boot-starter-test")
}

// 의존성 충돌 해결
configurations.all {
    exclude(group = "org.slf4j", module = "slf4j-log4j12")
    exclude(group = "log4j", module = "log4j")
    resolutionStrategy {
        force("com.fasterxml.jackson.core:jackson-databind:2.15.3")
        force("com.fasterxml.jackson.core:jackson-core:2.15.3")
    }
}

SparkSession 빈 구성#

기본 설정

package com.example.spark.config;

import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import jakarta.annotation.PreDestroy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Configuration
public class SparkConfig {
    private static final Logger logger = LoggerFactory.getLogger(SparkConfig.class);

    @Value("${spark.app.name:spring-spark-app}")
    private String appName;

    @Value("${spark.master:local[*]}")
    private String masterUrl;

    @Value("${spark.driver.memory:2g}")
    private String driverMemory;

    @Value("${spark.executor.memory:2g}")
    private String executorMemory;

    @Value("${spark.sql.shuffle.partitions:200}")
    private int shufflePartitions;

    private SparkSession sparkSession;

    @Bean
    @Profile("!test")  // 테스트에서는 별도 설정 사용
    public SparkSession sparkSession() {
        logger.info("Initializing SparkSession: app={}, master={}", appName, masterUrl);

        SparkConf conf = new SparkConf()
                .setAppName(appName)
                .setMaster(masterUrl)
                .set("spark.driver.memory", driverMemory)
                .set("spark.executor.memory", executorMemory)
                .set("spark.sql.shuffle.partitions", String.valueOf(shufflePartitions))
                // Adaptive Query Execution
                .set("spark.sql.adaptive.enabled", "true")
                .set("spark.sql.adaptive.coalescePartitions.enabled", "true")
                // 직렬화 최적화
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .set("spark.kryoserializer.buffer.max", "1024m")
                // UI 설정
                .set("spark.ui.enabled", "true")
                .set("spark.ui.port", "4040");

        this.sparkSession = SparkSession.builder()
                .config(conf)
                .getOrCreate();

        // Spark 로그 레벨 조정
        sparkSession.sparkContext().setLogLevel("WARN");

        logger.info("SparkSession initialized successfully");
        return sparkSession;
    }

    @Bean
    @Profile("test")
    public SparkSession testSparkSession() {
        logger.info("Initializing Test SparkSession");

        this.sparkSession = SparkSession.builder()
                .appName("test-spark")
                .master("local[2]")
                .config("spark.sql.shuffle.partitions", "2")
                .config("spark.ui.enabled", "false")
                .config("spark.driver.bindAddress", "127.0.0.1")
                .getOrCreate();

        return sparkSession;
    }

    @PreDestroy
    public void cleanup() {
        if (sparkSession != null && !sparkSession.sparkContext().isStopped()) {
            logger.info("Stopping SparkSession");
            sparkSession.stop();
        }
    }
}

application.yml 설정

# application.yml
spring:
  application:
    name: spark-spring-app

# Spark 설정
spark:
  app:
    name: ${spring.application.name}
  master: local[*]
  driver:
    memory: 2g
  executor:
    memory: 2g
  sql:
    shuffle:
      partitions: 100

# 프로파일별 설정
---
spring:
  config:
    activate:
      on-profile: production

spark:
  master: spark://spark-master:7077
  driver:
    memory: 4g
  executor:
    memory: 8g
  sql:
    shuffle:
      partitions: 200

---
spring:
  config:
    activate:
      on-profile: test

spark:
  master: local[2]
  driver:
    memory: 512m
  sql:
    shuffle:
      partitions: 2
핵심 포인트: SparkSession 빈 구성
  • 싱글톤 관리: SparkSession은 JVM당 하나만 존재해야 함
  • 프로파일 분리: @Profile("!test")로 테스트 환경 별도 설정
  • @PreDestroy: 애플리케이션 종료 시 SparkSession 안전하게 정리
  • AQE 활성화: spark.sql.adaptive.enabled로 쿼리 최적화 자동화

서비스 레이어 패턴#

기본 Spark 서비스

package com.example.spark.service;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.spark.sql.functions.*;

@Service
public class DataAnalysisService {
    private static final Logger logger = LoggerFactory.getLogger(DataAnalysisService.class);

    private final SparkSession spark;

    public DataAnalysisService(SparkSession spark) {
        this.spark = spark;
    }

    /**
     * 매출 요약 리포트 생성
     */
    public SalesSummary generateSalesSummary(String dataPath, String startDate, String endDate) {
        logger.info("매출 요약 생성: {} ~ {}", startDate, endDate);

        Dataset<Row> sales = spark.read()
                .option("header", "true")
                .option("inferSchema", "true")
                .parquet(dataPath);

        Dataset<Row> filtered = sales
                .filter(col("sale_date").between(startDate, endDate));

        Row summary = filtered.agg(
                count("*").alias("total_orders"),
                sum("amount").alias("total_revenue"),
                avg("amount").alias("avg_order_value"),
                countDistinct("customer_id").alias("unique_customers")
        ).first();

        return new SalesSummary(
                summary.getLong(0),
                summary.getDouble(1),
                summary.getDouble(2),
                summary.getLong(3)
        );
    }

    /**
     * 카테고리별 매출 집계
     */
    public List<CategoryRevenue> getCategoryRevenue(String dataPath) {
        Dataset<Row> result = spark.read()
                .parquet(dataPath)
                .groupBy("category")
                .agg(
                    sum("amount").alias("revenue"),
                    count("*").alias("order_count")
                )
                .orderBy(col("revenue").desc());

        return result.collectAsList().stream()
                .map(row -> new CategoryRevenue(
                        row.getString(0),
                        row.getDouble(1),
                        row.getLong(2)
                ))
                .toList();
    }

    // DTO 클래스
    public record SalesSummary(
            long totalOrders,
            double totalRevenue,
            double avgOrderValue,
            long uniqueCustomers
    ) {}

    public record CategoryRevenue(
            String category,
            double revenue,
            long orderCount
    ) {}
}

비동기 배치 작업

package com.example.spark.service;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.CompletableFuture;

import static org.apache.spark.sql.functions.*;

@Service
public class BatchProcessingService {
    private static final Logger logger = LoggerFactory.getLogger(BatchProcessingService.class);

    private final SparkSession spark;

    public BatchProcessingService(SparkSession spark) {
        this.spark = spark;
    }

    /**
     * 일일 ETL 작업 (스케줄링)
     */
    @Scheduled(cron = "0 0 2 * * *")  // 매일 새벽 2시
    public void dailyETL() {
        String yesterday = LocalDate.now().minusDays(1)
                .format(DateTimeFormatter.ISO_DATE);

        logger.info("일일 ETL 시작: {}", yesterday);

        try {
            // 1. 원본 데이터 읽기
            Dataset<Row> rawData = spark.read()
                    .parquet("s3://data-lake/raw/events/date=" + yesterday);

            // 2. 데이터 정제
            Dataset<Row> cleaned = rawData
                    .filter(col("event_type").isNotNull())
                    .dropDuplicates("event_id")
                    .withColumn("processed_at", current_timestamp());

            // 3. 집계 테이블 생성
            Dataset<Row> aggregated = cleaned
                    .groupBy("user_id", "event_type")
                    .agg(
                        count("*").alias("event_count"),
                        sum("value").alias("total_value")
                    );

            // 4. 저장
            aggregated.write()
                    .mode(SaveMode.Overwrite)
                    .partitionBy("event_type")
                    .parquet("s3://data-lake/processed/daily/" + yesterday);

            logger.info("일일 ETL 완료: {} 레코드 처리", cleaned.count());

        } catch (Exception e) {
            logger.error("일일 ETL 실패: {}", e.getMessage(), e);
            throw new RuntimeException("ETL 실패", e);
        }
    }

    /**
     * 비동기 대량 데이터 처리
     */
    @Async
    public CompletableFuture<ProcessingResult> processLargeDataset(String inputPath, String outputPath) {
        logger.info("비동기 처리 시작: {}", inputPath);
        long startTime = System.currentTimeMillis();

        try {
            Dataset<Row> data = spark.read().parquet(inputPath);

            Dataset<Row> processed = data
                    .repartition(200)  // 병렬 처리 최적화
                    .transform(this::applyBusinessLogic);

            processed.write()
                    .mode(SaveMode.Overwrite)
                    .parquet(outputPath);

            long duration = System.currentTimeMillis() - startTime;
            long recordCount = processed.count();

            logger.info("비동기 처리 완료: {} 레코드, {}ms", recordCount, duration);

            return CompletableFuture.completedFuture(
                    new ProcessingResult(true, recordCount, duration, null)
            );

        } catch (Exception e) {
            logger.error("비동기 처리 실패: {}", e.getMessage(), e);
            return CompletableFuture.completedFuture(
                    new ProcessingResult(false, 0, 0, e.getMessage())
            );
        }
    }

    private Dataset<Row> applyBusinessLogic(Dataset<Row> df) {
        return df
                .filter(col("status").equalTo("ACTIVE"))
                .withColumn("score", col("value").multiply(0.8).plus(col("bonus")));
    }

    public record ProcessingResult(
            boolean success,
            long recordCount,
            long durationMs,
            String errorMessage
    ) {}
}
핵심 포인트: 서비스 레이어 패턴
  • 생성자 주입: SparkSession을 생성자로 주입받아 테스트 용이성 확보
  • @Async 배치: 대용량 처리는 비동기로 실행하여 API 응답 지연 방지
  • @Scheduled: cron 표현식으로 일일/주간 배치 작업 스케줄링
  • CompletableFuture: 비동기 작업 결과와 상태 추적

REST API 통합#

분석 API 컨트롤러

package com.example.spark.controller;

import com.example.spark.service.DataAnalysisService;
import com.example.spark.service.DataAnalysisService.SalesSummary;
import com.example.spark.service.DataAnalysisService.CategoryRevenue;
import com.example.spark.service.BatchProcessingService;
import com.example.spark.service.BatchProcessingService.ProcessingResult;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import java.util.List;
import java.util.concurrent.CompletableFuture;

@RestController
@RequestMapping("/api/v1/analytics")
public class AnalyticsController {

    private final DataAnalysisService analysisService;
    private final BatchProcessingService batchService;

    public AnalyticsController(
            DataAnalysisService analysisService,
            BatchProcessingService batchService) {
        this.analysisService = analysisService;
        this.batchService = batchService;
    }

    /**
     * 매출 요약 조회
     * GET /api/v1/analytics/sales/summary?start=2024-01-01&end=2024-01-31
     */
    @GetMapping("/sales/summary")
    public ResponseEntity<SalesSummary> getSalesSummary(
            @RequestParam String start,
            @RequestParam String end) {

        SalesSummary summary = analysisService.generateSalesSummary(
                "data/sales.parquet", start, end);

        return ResponseEntity.ok(summary);
    }

    /**
     * 카테고리별 매출 조회
     * GET /api/v1/analytics/category/revenue
     */
    @GetMapping("/category/revenue")
    public ResponseEntity<List<CategoryRevenue>> getCategoryRevenue() {
        List<CategoryRevenue> revenues = analysisService.getCategoryRevenue(
                "data/sales.parquet");

        return ResponseEntity.ok(revenues);
    }

    /**
     * 비동기 배치 작업 시작
     * POST /api/v1/analytics/batch/process
     */
    @PostMapping("/batch/process")
    public ResponseEntity<String> startBatchProcess(
            @RequestBody BatchRequest request) {

        CompletableFuture<ProcessingResult> future = batchService
                .processLargeDataset(request.inputPath(), request.outputPath());

        // 작업 ID 반환 (실제 구현에서는 작업 추적 시스템 연동)
        String jobId = "job-" + System.currentTimeMillis();

        return ResponseEntity.accepted()
                .body("{\"jobId\": \"" + jobId + "\", \"status\": \"PROCESSING\"}");
    }

    public record BatchRequest(String inputPath, String outputPath) {}
}
핵심 포인트: REST API 통합
  • 동기 조회: 소규모 집계는 즉시 응답 (GET 엔드포인트)
  • 비동기 처리: 대용량 배치는 작업 ID 반환 후 백그라운드 실행 (POST + 202 Accepted)
  • JSON 변환: collectAsList() + Stream API로 DataFrame → List 변환
  • 작업 추적: 실제 운영에서는 작업 ID로 상태 조회 API 추가 필요

테스트 작성#

Spark 통합 테스트

package com.example.spark.service;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;

import java.util.Arrays;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;

@SpringBootTest
@ActiveProfiles("test")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class DataAnalysisServiceTest {

    @Autowired
    private SparkSession spark;

    @Autowired
    private DataAnalysisService analysisService;

    private String testDataPath;

    @BeforeAll
    void setupTestData() {
        // 테스트 데이터 스키마
        StructType schema = new StructType(new StructField[]{
                DataTypes.createStructField("order_id", DataTypes.StringType, false),
                DataTypes.createStructField("category", DataTypes.StringType, false),
                DataTypes.createStructField("amount", DataTypes.DoubleType, false),
                DataTypes.createStructField("customer_id", DataTypes.StringType, false),
                DataTypes.createStructField("sale_date", DataTypes.StringType, false)
        });

        // 테스트 데이터 생성
        List<Row> testData = Arrays.asList(
                RowFactory.create("O001", "Electronics", 150.0, "C001", "2024-01-15"),
                RowFactory.create("O002", "Electronics", 200.0, "C002", "2024-01-15"),
                RowFactory.create("O003", "Clothing", 50.0, "C001", "2024-01-16"),
                RowFactory.create("O004", "Books", 25.0, "C003", "2024-01-17"),
                RowFactory.create("O005", "Electronics", 300.0, "C002", "2024-01-18")
        );

        Dataset<Row> testDf = spark.createDataFrame(testData, schema);

        testDataPath = "target/test-data/sales.parquet";
        testDf.write()
                .mode("overwrite")
                .parquet(testDataPath);
    }

    @Test
    @DisplayName("매출 요약이 올바르게 계산되어야 함")
    void shouldCalculateSalesSummary() {
        // when
        var summary = analysisService.generateSalesSummary(
                testDataPath, "2024-01-01", "2024-01-31");

        // then
        assertThat(summary.totalOrders()).isEqualTo(5);
        assertThat(summary.totalRevenue()).isEqualTo(725.0);
        assertThat(summary.avgOrderValue()).isEqualTo(145.0);
        assertThat(summary.uniqueCustomers()).isEqualTo(3);
    }

    @Test
    @DisplayName("카테고리별 매출이 내림차순으로 정렬되어야 함")
    void shouldReturnCategoryRevenuesSorted() {
        // when
        var revenues = analysisService.getCategoryRevenue(testDataPath);

        // then
        assertThat(revenues).hasSize(3);
        assertThat(revenues.get(0).category()).isEqualTo("Electronics");
        assertThat(revenues.get(0).revenue()).isEqualTo(650.0);
    }

    @AfterAll
    void cleanup() {
        // 테스트 데이터 정리
        new java.io.File(testDataPath).delete();
    }
}
핵심 포인트: 테스트 작성
  • @ActiveProfiles(“test”): 테스트용 SparkSession 설정 사용
  • @BeforeAll: 테스트 데이터 생성 및 Parquet 저장
  • createDataFrame(): 인메모리 테스트 데이터 생성
  • @AfterAll: 테스트 완료 후 임시 파일 정리

Java vs Scala 비교#

구분JavaScala
타입 선언Dataset<Row>DataFrame (타입 alias)
람다 표현식df.filter(row -> row.getInt(0) > 10)df.filter(row => row.getInt(0) > 10)
컬럼 참조col("name")$"name"
문자열 보간"value: " + values"value: $value"
Case Classrecord (Java 17+)case class
패턴 매칭switch expression (Java 21+)match 표현식
Spring 통합네이티브 지원어노테이션 호환
IDE 지원IntelliJ, VS CodeIntelliJ + Scala 플러그인

동일 로직 비교

Java:

Dataset<Row> result = df
    .filter(col("age").gt(18))
    .groupBy("city")
    .agg(avg("salary").alias("avg_salary"))
    .orderBy(col("avg_salary").desc());

Scala:

val result = df
  .filter($"age" > 18)
  .groupBy("city")
  .agg(avg("salary").alias("avg_salary"))
  .orderBy($"avg_salary".desc)

관련 문서#