TL;DR
  • Structured Streaming은 스트림을 무한히 추가되는 테이블로 처리
  • 배치와 동일한 DataFrame API 사용 (readStream/writeStream만 변경)
  • Output Mode: append(새 행), complete(전체), update(변경분)
  • Watermark로 늦게 도착하는 데이터(late data) 처리

대상 독자: 실시간 데이터 처리가 필요한 데이터 엔지니어

선수 지식:

  • DataFrame과 Dataset API
  • Kafka 기본 개념 (선택 사항)
  • 이벤트 시간 vs 처리 시간 개념 (이벤트 시간=사건이 실제 발생한 시각, 처리 시간=서버가 데이터를 수신한 시각)

소요 시간: 약 25-30분


Structured Streaming은 Spark의 스트림 처리 엔진입니다. 배치 처리와 동일한 DataFrame/Dataset API를 사용하여 실시간 데이터를 처리합니다.

비유로 이해하는 Structured Streaming#

개념비유핵심 아이디어
Structured Streaming끝없이 추가되는 회계 장부새 거래가 계속 기록되지만, 장부 읽는 방법은 동일
Watermark출석부 마감 시간“10분 늦으면 지각 처리” — 너무 늦은 데이터는 무시
Output Mode회의록 작성 방식append(새 내용만), complete(전체 다시), update(수정분만)
Trigger알람 시계얼마나 자주 장부를 정산할지 결정
Window시간대별 정산“오전/오후별 매출”, “1시간 단위 방문자”
Checkpoint중간 저장 (게임 세이브)문제 발생 시 여기서부터 재시작
Stream-Stream Join두 줄 서기 합류광고 노출 기록과 클릭 기록을 시간 내 매칭

왜 테이블 모델인가? (설계 철학)#

Spark가 스트림을 무한 테이블로 모델링한 이유가 있습니다.

기존 스트리밍의 문제점

기존 스트림 처리 (Storm, Flink 초기):
├── 레코드 단위 처리 (한 건씩)
├── 배치와 완전히 다른 API
├── 동일 로직을 두 번 구현 (배치용 + 스트림용)
└── "Lambda 아키텍처" 복잡성 (배치와 스트리밍을 별도로 구축하는 전통 패턴)

테이블 모델의 해결책

문제테이블 모델 해결
API 이중화배치/스트림 동일한 DataFrame API
학습 비용배치 알면 스트리밍도 가능
코드 중복동일 로직 재사용
테스트 어려움배치 테스트로 스트림 로직 검증
// 핵심 통찰: read() → readStream(), write() → writeStream()만 바꾸면 됨
// 이것이 가능한 이유는 스트림을 "끝없이 추가되는 테이블" 추상화했기 때문

Watermark의 존재 이유

실세계 문제:
├── 이벤트 발생 시간 ≠ 도착 시간
├── 네트워크 지연, 디바이스 오프라인 등
├── 영원히 기다릴 수 없음
└── 언제 "충분히 기다렸다"고 판단할까?

Watermark 해결책:
├── "max(event_time) - threshold" 계산
├── 이보다 오래된 데이터는 무시
├── 정확성 vs 지연 트레이드오프
└── 비즈니스 요구에 맞게 threshold 조정
설계 원칙
“배치와 스트리밍은 같은 문제의 다른 표현” — Spark는 시간 축만 다르고 본질적으로 같은 데이터 처리라는 통찰을 기반으로, 하나의 API로 두 세계를 통합했습니다.

Structured Streaming이란?#

Structured Streaming은 스트림 데이터를 무한히 추가되는 테이블로 취급합니다. 새 데이터가 도착하면 증분 처리(incremental processing)를 수행합니다.

핵심 개념

입력 스트림 (무한 테이블)
    ↓
[데이터 도착] → [증분 처리] → [결과 업데이트]
    ↓
출력 (지속적으로 업데이트되는 결과 테이블)

배치 vs 스트리밍 코드 비교

// 배치 처리 (정적 데이터)
Dataset<Row> batchDf = spark.read()
        .json("input/data.json");

Dataset<Row> result = batchDf
        .groupBy("category")
        .count();

result.write().parquet("output");

// 스트림 처리 (동적 데이터) - 거의 동일!
Dataset<Row> streamDf = spark.readStream()  // readStream으로 변경
        .json("input/");

Dataset<Row> result = streamDf              // 동일한 처리 로직
        .groupBy("category")
        .count();

result.writeStream()                        // writeStream으로 변경
        .outputMode("complete")
        .format("console")
        .start()
        .awaitTermination();
핵심 포인트
  • 스트림은 무한히 추가되는 테이블로 모델링
  • read()readStream(), write()writeStream()으로 변경만 하면 됨
  • 동일한 DataFrame API로 배치와 스트리밍 코드 통합
  • 증분 처리(incremental processing)로 효율적인 스트림 처리

기본 사용법#

스트림 읽기

SparkSession spark = SparkSession.builder()
        .appName("Structured Streaming")
        .master("local[*]")
        .getOrCreate();

// 파일 소스 (디렉토리 모니터링)
Dataset<Row> fileStream = spark.readStream()
        .schema(schema)  // 스트림에서는 스키마 필수
        .json("input/");

// Kafka 소스
Dataset<Row> kafkaStream = spark.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "topic-name")
        .load();

// Socket 소스 (테스트용)
Dataset<Row> socketStream = spark.readStream()
        .format("socket")
        .option("host", "localhost")
        .option("port", 9999)
        .load();

// Rate 소스 (테스트용 - 초당 지정 수의 행 생성)
Dataset<Row> rateStream = spark.readStream()
        .format("rate")
        .option("rowsPerSecond", 10)
        .load();

스트림 처리

스트림 DataFrame에 일반 DataFrame과 동일한 연산 적용:

// 필터링
Dataset<Row> filtered = stream.filter(col("value").gt(100));

// 변환
Dataset<Row> transformed = stream
        .withColumn("timestamp", current_timestamp())
        .withColumn("doubled", col("value").multiply(2));

// 집계 (상태 유지)
Dataset<Row> aggregated = stream
        .groupBy("category")
        .agg(
            count("*").alias("count"),
            sum("value").alias("total")
        );

스트림 쓰기

StreamingQuery query = result.writeStream()
        .outputMode("append")           // 출력 모드
        .format("parquet")              // 출력 포맷
        .option("path", "output/")      // 출력 경로
        .option("checkpointLocation", "checkpoint/")  // 체크포인트 필수
        .trigger(Trigger.ProcessingTime("10 seconds"))  // 트리거
        .start();

// 쿼리 종료 대기
query.awaitTermination();

// 또는 백그라운드 실행
// query.isActive(), query.stop() 등으로 관리

출력 모드 (Output Mode)#

모드설명사용 시점
append새 행만 출력집계 없는 단순 변환
complete전체 결과 테이블 출력집계 쿼리
update변경된 행만 출력집계 쿼리 (일부 싱크만 지원)
// append - 새 행만 (집계 없을 때)
filtered.writeStream()
    .outputMode("append")
    .format("parquet")
    .start();

// complete - 전체 결과 (집계 시)
aggregated.writeStream()
    .outputMode("complete")
    .format("console")
    .start();

// update - 변경된 것만 (집계 시)
aggregated.writeStream()
    .outputMode("update")
    .format("console")
    .start();

트리거 (Trigger)#

데이터 처리 빈도를 제어합니다.

import org.apache.spark.sql.streaming.Trigger;

// 기본 - 가능한 빨리 (마이크로 배치)
.trigger(Trigger.ProcessingTime(0))

// 지정 간격
.trigger(Trigger.ProcessingTime("10 seconds"))
.trigger(Trigger.ProcessingTime("1 minute"))

// 한 번만 실행 (배치처럼)
.trigger(Trigger.Once())

// 사용 가능한 데이터 모두 처리 후 종료
.trigger(Trigger.AvailableNow())

// 연속 처리 (밀리초 지연, 실험적)
.trigger(Trigger.Continuous("1 second"))

Kafka 연동#

Kafka에서 읽기

Dataset<Row> kafkaStream = spark.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "my-topic")  // 단일 토픽
        // .option("subscribePattern", "topic.*")  // 패턴
        // .option("assign", "{\"topic\":[0,1,2]}")  // 특정 파티션
        .option("startingOffsets", "earliest")  // earliest, latest, 또는 JSON
        .load();

// Kafka 메시지 구조:
// key (binary), value (binary), topic, partition, offset, timestamp, ...

// 값 파싱
Dataset<Row> parsed = kafkaStream
        .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
        .select(from_json(col("value"), schema).alias("data"))
        .select("data.*");

Kafka에 쓰기

// 결과를 Kafka로 전송
result
    .selectExpr("CAST(key AS STRING)", "to_json(struct(*)) AS value")
    .writeStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "output-topic")
    .option("checkpointLocation", "checkpoint/kafka-output")
    .start();

윈도우 연산#

시간 기반 집계를 위한 윈도우 함수입니다.

Tumbling Window (텀블링 윈도우)

겹치지 않는 고정 크기 윈도우:

Dataset<Row> windowedCounts = stream
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window(col("timestamp"), "5 minutes"),  // 5분 윈도우
        col("category")
    )
    .count();

// 결과:
// +------------------------------------------+--------+-----+
// |window                                    |category|count|
// +------------------------------------------+--------+-----+
// |{2024-01-01 10:00:00, 2024-01-01 10:05:00}|A       |  150|
// |{2024-01-01 10:05:00, 2024-01-01 10:10:00}|A       |  180|
// +------------------------------------------+--------+-----+

Sliding Window (슬라이딩 윈도우)

겹치는 윈도우:

Dataset<Row> slidingCounts = stream
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        window(col("timestamp"), "10 minutes", "5 minutes"),  // 10분 윈도우, 5분 슬라이드
        col("category")
    )
    .count();

//  이벤트가 2개 윈도우에 포함될  있음

Session Window (세션 윈도우)

활동 기반 동적 윈도우입니다. 사용자가 이벤트를 발생시키는 동안 세션이 유지되며, 지정한 시간(아래 예시에서는 5분) 동안 활동이 없으면 세션이 종료됩니다. 웹사이트 방문 세션 분석, 앱 사용 패턴 추적 등에 적합합니다:

// Spark 3.2+
Dataset<Row> sessionCounts = stream
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        session_window(col("timestamp"), "5 minutes"),  // 5분 동안 활동 없으면 세션 종료
        col("user_id")
    )
    .count();

Watermark#

늦게 도착하는 데이터(late data)를 처리하기 위한 메커니즘입니다.

Dataset<Row> result = stream
    .withWatermark("eventTime", "10 minutes")  // 10분까지 지연 허용
    .groupBy(
        window(col("eventTime"), "5 minutes"),
        col("category")
    )
    .count();

// 워터마크 = max(eventTime) - 10 minutes
// 워터마크보다 오래된 데이터는 무시됨

Watermark 동작

이벤트 시간 순서:
10:00 → 10:05 → 10:03 (늦음) → 10:10 → 09:55 (매우 늦음)

워터마크 = 10:00 (10분 지연 허용)
- 10:03 이벤트: 워터마크(10:00) 이후 → 처리됨
- 09:55 이벤트: 워터마크(10:00) 이전 → 무시됨

상태 관리#

집계 쿼리는 상태(state)를 유지합니다.

상태 저장소 설정

// RocksDB 상태 저장소 사용 (대용량 상태에 적합)
spark.conf().set(
    "spark.sql.streaming.stateStore.providerClass",
    "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider"
);

// 상태 저장소 메모리 설정
spark.conf().set("spark.sql.streaming.stateStore.rocksdb.memory.mb", "256");

커스텀 상태 관리#

groupBy().agg()는 합계, 평균 등 표준 집계만 가능합니다. 하지만 “사용자의 연속 로그인 일수 추적"이나 “세션 내 행동 패턴 분석"처럼 복잡한 상태 로직이 필요한 경우에는 mapGroupsWithState 또는 flatMapGroupsWithState를 사용합니다.

이 API는 각 키(그룹)마다 직접 정의한 상태 객체를 유지하며, 새 데이터가 도착할 때마다 상태를 업데이트하는 함수를 작성할 수 있습니다. mapGroupsWithState는 그룹당 하나의 결과를, flatMapGroupsWithState는 여러 결과를 반환할 수 있습니다.

심화 내용
mapGroupsWithStateflatMapGroupsWithState는 Structured Streaming의 가장 유연한 API이지만, 상태 크기 관리와 타임아웃 설정에 주의가 필요합니다. 표준 집계로 해결 가능하다면 groupBy().agg()를 먼저 고려하세요.

상태 타임아웃

// 그룹 상태에 타임아웃 설정 (mapGroupsWithState/flatMapGroupsWithState)
.groupByKey(row -> row.getString(0), Encoders.STRING())
.mapGroupsWithState(
    mappingFunc,                              // 상태 업데이트 함수
    Encoders.bean(State.class),               // 상태 객체 인코더
    Encoders.bean(Output.class),              // 출력 객체 인코더
    GroupStateTimeout.ProcessingTimeTimeout()  // 또는 EventTimeTimeout
);

조인#

Stream-Static 조인

스트림과 정적 데이터 조인:

Dataset<Row> staticDf = spark.read().parquet("dimension-data");

Dataset<Row> enriched = stream.join(
    staticDf,
    stream.col("product_id").equalTo(staticDf.col("id")),
    "left"
);

Stream-Stream 조인

두 스트림 조인 (워터마크 필수). 예를 들어, 광고 노출(impressions) 스트림과 클릭(clicks) 스트림을 매칭하여 “노출 후 1시간 내 클릭만” 유효 전환으로 집계할 수 있습니다:

Dataset<Row> impressions = spark.readStream()
    .format("kafka")
    .option("subscribe", "impressions")
    .load()
    .withWatermark("timestamp", "2 hours");

Dataset<Row> clicks = spark.readStream()
    .format("kafka")
    .option("subscribe", "clicks")
    .load()
    .withWatermark("timestamp", "3 hours");

// 시간 범위 조인 조건
Dataset<Row> joined = impressions.join(
    clicks,
    expr("""
        impressionId = clickImpressionId AND
        clickTime >= impressionTime AND
        clickTime <= impressionTime + interval 1 hour
    """),
    "leftOuter"
);

쿼리 모니터링#

StreamingQuery query = result.writeStream()
    .format("console")
    .start();

// 쿼리 상태 확인
System.out.println("Query ID: " + query.id());
System.out.println("Run ID: " + query.runId());
System.out.println("Is Active: " + query.isActive());
System.out.println("Status: " + query.status());

// 진행 상황
StreamingQueryProgress progress = query.lastProgress();
if (progress != null) {
    System.out.println("Input rows: " + progress.numInputRows());
    System.out.println("Processing rate: " + progress.processedRowsPerSecond());
}

// 쿼리 중지
query.stop();

쿼리 리스너

spark.streams().addListener(new StreamingQueryListener() {
    @Override
    public void onQueryStarted(QueryStartedEvent event) {
        System.out.println("Query started: " + event.id());
    }

    @Override
    public void onQueryProgress(QueryProgressEvent event) {
        System.out.println("Progress: " + event.progress().numInputRows() + " rows");
    }

    @Override
    public void onQueryTerminated(QueryTerminatedEvent event) {
        System.out.println("Query terminated: " + event.id());
    }
});

실전 예제: 실시간 매출 집계#

public class RealTimeSalesAggregation {
    public static void main(String[] args) throws Exception {
        SparkSession spark = SparkSession.builder()
                .appName("Real-Time Sales")
                .master("local[*]")
                .getOrCreate();

        // 스키마 정의
        StructType schema = new StructType()
                .add("orderId", StringType, false)
                .add("productId", StringType, false)
                .add("category", StringType, false)
                .add("amount", DoubleType, false)
                .add("timestamp", TimestampType, false);

        // Kafka에서 주문 데이터 읽기
        Dataset<Row> orders = spark.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("subscribe", "orders")
                .option("startingOffsets", "latest")
                .load()
                .selectExpr("CAST(value AS STRING)")
                .select(from_json(col("value"), schema).alias("order"))
                .select("order.*")
                .withWatermark("timestamp", "5 minutes");

        // 5분 윈도우별 카테고리 매출 집계
        Dataset<Row> salesByCategory = orders
                .groupBy(
                    window(col("timestamp"), "5 minutes"),
                    col("category")
                )
                .agg(
                    count("*").alias("orderCount"),
                    sum("amount").alias("totalSales"),
                    avg("amount").alias("avgOrderValue")
                );

        // 콘솔 출력 (디버깅용)
        StreamingQuery consoleQuery = salesByCategory.writeStream()
                .outputMode("update")
                .format("console")
                .option("truncate", false)
                .trigger(Trigger.ProcessingTime("30 seconds"))
                .start();

        // Kafka로 결과 전송
        StreamingQuery kafkaQuery = salesByCategory
                .selectExpr("to_json(struct(*)) AS value")
                .writeStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("topic", "sales-summary")
                .option("checkpointLocation", "checkpoint/sales-summary")
                .outputMode("update")
                .start();

        spark.streams().awaitAnyTermination();
    }
}

관련 문서#

다음 단계#