TL;DR
  • CSV, JSON, Parquet, JDBC 등 다양한 데이터 소스 읽기/쓰기
  • DataFrame API로 필터, 변환, 집계, 조인 수행
  • SQL 쿼리와 DataFrame API 혼용 가능
  • 실무 예제: NYC 택시 데이터, Kaggle 데이터셋 분석

대상 독자 및 선수 지식#

구분내용
대상 독자Spark DataFrame API를 처음 사용하는 Java 개발자
선수 지식Java 기본 문법, SQL 기초, 환경 설정 완료
학습 목표데이터 로딩, 변환, 집계, 조인, 저장을 수행할 수 있다
예상 소요 시간약 30분

Spark의 핵심 기능을 활용하는 예제 코드입니다.

데이터 로딩#

CSV 파일 읽기

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

public class CsvExample {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("CSV Example")
                .master("local[*]")
                .getOrCreate();

        // 기본 읽기
        Dataset<Row> df = spark.read()
                .option("header", "true")       // 첫 줄을 헤더로
                .option("inferSchema", "true")  // 타입 자동 추론
                .csv("data/employees.csv");

        // 상세 옵션
        Dataset<Row> dfDetailed = spark.read()
                .option("header", "true")
                .option("inferSchema", "true")
                .option("sep", ",")             // 구분자
                .option("quote", "\"")          // 인용 문자
                .option("escape", "\\")         // 이스케이프 문자
                .option("nullValue", "NA")      // null 표현
                .option("dateFormat", "yyyy-MM-dd")
                .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
                .csv("data/employees.csv");

        df.show();
        df.printSchema();

        spark.stop();
    }
}

JSON 파일 읽기

// 단일 파일
Dataset<Row> df = spark.read().json("data/users.json");

// 멀티라인 JSON
Dataset<Row> dfMulti = spark.read()
        .option("multiLine", "true")
        .json("data/users_multiline.json");

// JSON Lines (한 줄에 하나의 JSON 객체)
Dataset<Row> dfLines = spark.read().json("data/users.jsonl");

df.show();

Parquet 파일 읽기

// Parquet (권장 포맷)
Dataset<Row> df = spark.read().parquet("data/users.parquet");

// 특정 컬럼만 읽기 (컬럼 프루닝)
Dataset<Row> selected = spark.read()
        .parquet("data/users.parquet")
        .select("id", "name");
핵심 포인트: 데이터 포맷 선택
  • Parquet: 컬럼 지향, 압축 효율, 스키마 내장 - 프로덕션 권장
  • CSV/JSON: 사람이 읽기 쉬움 - 소규모 데이터나 디버깅용
  • JDBC: 데이터베이스 직접 연동 - 증분 추출이나 lookup 테이블용

JDBC로 데이터베이스 읽기

Dataset<Row> df = spark.read()
        .format("jdbc")
        .option("url", "jdbc:mysql://localhost:3306/mydb")
        .option("dbtable", "employees")
        .option("user", "user")
        .option("password", "password")
        .option("driver", "com.mysql.cj.jdbc.Driver")
        .load();

// 쿼리 사용
Dataset<Row> dfQuery = spark.read()
        .format("jdbc")
        .option("url", "jdbc:mysql://localhost:3306/mydb")
        .option("query", "SELECT * FROM employees WHERE age > 30")
        .option("user", "user")
        .option("password", "password")
        .load();

데이터 변환#

컬럼 연산

import static org.apache.spark.sql.functions.*;

Dataset<Row> employees = spark.read()
        .option("header", "true")
        .option("inferSchema", "true")
        .csv("data/employees.csv");

// 새 컬럼 추가
Dataset<Row> withBonus = employees.withColumn(
    "bonus",
    col("salary").multiply(0.1)
);

// 여러 컬럼 추가
Dataset<Row> enhanced = employees
    .withColumn("bonus", col("salary").multiply(0.1))
    .withColumn("total_compensation", col("salary").plus(col("bonus")))
    .withColumn("hire_year", year(col("hire_date")))
    .withColumn("name_upper", upper(col("name")));

// 컬럼 이름 변경
Dataset<Row> renamed = employees.withColumnRenamed("name", "employee_name");

// 컬럼 삭제
Dataset<Row> dropped = employees.drop("middle_name", "suffix");

// 컬럼 타입 변환
Dataset<Row> casted = employees.withColumn(
    "salary",
    col("salary").cast("double")
);

enhanced.show();

필터링

// 단일 조건
Dataset<Row> highEarners = employees.filter(col("salary").gt(60000));

// 복합 조건
Dataset<Row> filtered = employees.filter(
    col("age").geq(30)
    .and(col("department").equalTo("Engineering"))
    .and(col("salary").between(50000, 80000))
);

// 문자열 조건
Dataset<Row> kimFamily = employees.filter(col("name").startsWith("Kim"));
Dataset<Row> hasEmail = employees.filter(col("email").contains("@company.com"));
Dataset<Row> pattern = employees.filter(col("name").rlike("^[A-Z][a-z]+$"));

// NULL 처리
Dataset<Row> withManager = employees.filter(col("manager_id").isNotNull());
Dataset<Row> noManager = employees.filter(col("manager_id").isNull());

// IN 조건
Dataset<Row> depts = employees.filter(
    col("department").isin("Engineering", "Marketing", "Sales")
);

filtered.show();
핵심 포인트: 데이터 변환
  • withColumn(): 새 컬럼 추가 또는 기존 컬럼 변환
  • filter(): 조건에 맞는 행만 선택 (SQL WHERE와 동일)
  • NULL 처리: isNull(), isNotNull()로 명시적 처리 필수
  • 체이닝: 여러 변환을 연결하여 파이프라인 구성

정렬

// 단일 컬럼
Dataset<Row> sorted = employees.orderBy("salary");
Dataset<Row> sortedDesc = employees.orderBy(col("salary").desc());

// 다중 컬럼
Dataset<Row> multiSort = employees.orderBy(
    col("department").asc(),
    col("salary").desc()
);

// NULL 처리
Dataset<Row> nullsFirst = employees.orderBy(col("manager_id").asc_nulls_first());
Dataset<Row> nullsLast = employees.orderBy(col("salary").desc_nulls_last());

multiSort.show();

집계#

기본 집계

// 전체 집계
Dataset<Row> stats = employees.agg(
    count("*").alias("total_count"),
    countDistinct("department").alias("dept_count"),
    sum("salary").alias("total_salary"),
    avg("salary").alias("avg_salary"),
    max("salary").alias("max_salary"),
    min("salary").alias("min_salary"),
    stddev("salary").alias("stddev_salary")
);

stats.show();

그룹별 집계

// 단일 그룹
Dataset<Row> byDept = employees
    .groupBy("department")
    .agg(
        count("*").alias("employee_count"),
        avg("salary").alias("avg_salary"),
        sum("salary").alias("total_salary")
    )
    .orderBy(col("total_salary").desc());

// 다중 그룹
Dataset<Row> byDeptLevel = employees
    .groupBy("department", "level")
    .agg(
        count("*").alias("count"),
        avg("salary").alias("avg_salary")
    );

byDept.show();

피벗

// 피벗 테이블
Dataset<Row> pivoted = employees
    .groupBy("department")
    .pivot("level", Arrays.asList("Junior", "Senior", "Lead"))
    .agg(avg("salary"));

pivoted.show();
// +----------+-------+-------+------+
// |department| Junior| Senior|  Lead|
// +----------+-------+-------+------+
// |     Sales| 45000 | 65000 | 85000|
// |     Eng  | 55000 | 75000 | 95000|
// +----------+-------+-------+------+
핵심 포인트: 집계
  • groupBy() + agg(): SQL GROUP BY와 동일한 그룹별 집계
  • pivot(): 행을 열로 변환하는 피벗 테이블 생성
  • alias(): 집계 결과 컬럼에 명확한 이름 부여
  • orderBy(): 집계 후 정렬로 가독성 향상

조인#

기본 조인

Dataset<Row> employees = spark.read().parquet("employees.parquet");
Dataset<Row> departments = spark.read().parquet("departments.parquet");

// Inner Join
Dataset<Row> joined = employees.join(
    departments,
    employees.col("department_id").equalTo(departments.col("id"))
);

// 컬럼명이 같은 경우
Dataset<Row> simpleJoin = employees.join(departments, "department_id");

// Left Join
Dataset<Row> leftJoined = employees.join(
    departments,
    employees.col("department_id").equalTo(departments.col("id")),
    "left"
);

// 조인 유형: inner, left, right, full, left_semi, left_anti, cross

다중 조건 조인

Dataset<Row> multiJoin = orders.join(
    products,
    orders.col("product_id").equalTo(products.col("id"))
        .and(orders.col("region").equalTo(products.col("region")))
);

브로드캐스트 조인

import static org.apache.spark.sql.functions.broadcast;

// 작은 테이블 브로드캐스트
Dataset<Row> optimized = employees.join(
    broadcast(departments),
    "department_id"
);
핵심 포인트: 조인
  • 조인 유형: inner, left, right, full, left_semi, left_anti, cross
  • broadcast(): 작은 테이블(수 MB 이하)을 모든 노드에 복제하여 성능 향상
  • 동일 컬럼명: .join(other, "key") 간단 문법 사용 가능
  • 다중 조건: .and() 연산자로 복합 조인 조건 구성

SQL 사용#

// 임시 뷰 등록
employees.createOrReplaceTempView("employees");
departments.createOrReplaceTempView("departments");

// SQL 쿼리 실행
Dataset<Row> result = spark.sql("""
    SELECT
        e.name,
        e.salary,
        d.department_name,
        AVG(e.salary) OVER (PARTITION BY e.department_id) as dept_avg_salary
    FROM employees e
    JOIN departments d ON e.department_id = d.id
    WHERE e.age > 25
    ORDER BY e.salary DESC
    LIMIT 100
    """);

result.show();

// CTE 사용
Dataset<Row> cteResult = spark.sql("""
    WITH dept_stats AS (
        SELECT
            department_id,
            AVG(salary) as avg_salary,
            COUNT(*) as emp_count
        FROM employees
        GROUP BY department_id
    )
    SELECT
        d.department_name,
        ds.avg_salary,
        ds.emp_count
    FROM dept_stats ds
    JOIN departments d ON ds.department_id = d.id
    ORDER BY ds.avg_salary DESC
    """);

cteResult.show();
핵심 포인트: SQL 사용
  • createOrReplaceTempView(): DataFrame을 SQL 테이블처럼 사용
  • spark.sql(): 표준 SQL 쿼리 실행 (Window 함수, CTE 포함)
  • 혼용 가능: DataFrame API와 SQL을 필요에 따라 조합
  • 복잡한 분석: Window 함수, CTE 등은 SQL이 더 직관적

데이터 저장#

파일 저장

// Parquet (권장)
result.write()
    .mode("overwrite")
    .parquet("output/result.parquet");

// 파티셔닝
result.write()
    .mode("overwrite")
    .partitionBy("year", "month")
    .parquet("output/partitioned");

// CSV
result.write()
    .mode("overwrite")
    .option("header", "true")
    .csv("output/result.csv");

// 단일 파일로 저장
result.coalesce(1)
    .write()
    .mode("overwrite")
    .option("header", "true")
    .csv("output/single_file");

저장 모드

모드설명
overwrite기존 데이터 덮어쓰기
append기존 데이터에 추가
ignore이미 존재하면 무시
error (기본)이미 존재하면 오류
핵심 포인트: 데이터 저장
  • Parquet + 파티셔닝: 대용량 데이터의 표준 저장 방식
  • partitionBy(): 자주 필터링하는 컬럼 기준 디렉토리 분할
  • coalesce(1): 단일 파일 출력 (소규모 결과 공유 시 유용)
  • 저장 모드: 프로덕션에서는 append 또는 명시적 overwrite 사용

종합 예제: 매출 분석#

public class SalesAnalysisExample {
    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Sales Analysis")
                .master("local[*]")
                .getOrCreate();

        spark.sparkContext().setLogLevel("WARN");

        // 데이터 로드
        Dataset<Row> orders = spark.read()
                .option("header", "true")
                .option("inferSchema", "true")
                .csv("data/orders.csv");

        Dataset<Row> products = spark.read()
                .option("header", "true")
                .option("inferSchema", "true")
                .csv("data/products.csv");

        Dataset<Row> customers = spark.read()
                .option("header", "true")
                .option("inferSchema", "true")
                .csv("data/customers.csv");

        // 1. 주문-제품 조인
        Dataset<Row> ordersWithProducts = orders
                .join(broadcast(products), "product_id")
                .withColumn("revenue", col("quantity").multiply(col("price")));

        // 2. 월별 매출 집계
        Dataset<Row> monthlyRevenue = ordersWithProducts
                .withColumn("month", date_format(col("order_date"), "yyyy-MM"))
                .groupBy("month")
                .agg(
                    sum("revenue").alias("total_revenue"),
                    count("*").alias("order_count"),
                    countDistinct("customer_id").alias("unique_customers")
                )
                .orderBy("month");

        System.out.println("=== 월별 매출 ===");
        monthlyRevenue.show();

        // 3. 카테고리별 Top 5 제품
        WindowSpec windowSpec = Window
                .partitionBy("category")
                .orderBy(col("total_revenue").desc());

        Dataset<Row> topProducts = ordersWithProducts
                .groupBy("category", "product_name")
                .agg(sum("revenue").alias("total_revenue"))
                .withColumn("rank", rank().over(windowSpec))
                .filter(col("rank").leq(5));

        System.out.println("=== 카테고리별 Top 5 제품 ===");
        topProducts.show(20);

        // 4. 고객 세그먼트 분석
        Dataset<Row> customerStats = ordersWithProducts
                .join(customers, "customer_id")
                .groupBy("customer_id", "customer_name", "segment")
                .agg(
                    sum("revenue").alias("total_spent"),
                    count("*").alias("order_count"),
                    avg("revenue").alias("avg_order_value")
                );

        Dataset<Row> segmentAnalysis = customerStats
                .groupBy("segment")
                .agg(
                    count("*").alias("customer_count"),
                    avg("total_spent").alias("avg_customer_value"),
                    sum("total_spent").alias("segment_revenue")
                )
                .orderBy(col("segment_revenue").desc());

        System.out.println("=== 고객 세그먼트 분석 ===");
        segmentAnalysis.show();

        // 5. SQL로 복합 분석
        ordersWithProducts.createOrReplaceTempView("orders_enriched");

        Dataset<Row> sqlAnalysis = spark.sql("""
            WITH monthly_category AS (
                SELECT
                    DATE_FORMAT(order_date, 'yyyy-MM') as month,
                    category,
                    SUM(revenue) as revenue
                FROM orders_enriched
                GROUP BY DATE_FORMAT(order_date, 'yyyy-MM'), category
            ),
            category_growth AS (
                SELECT
                    month,
                    category,
                    revenue,
                    LAG(revenue) OVER (PARTITION BY category ORDER BY month) as prev_revenue
                FROM monthly_category
            )
            SELECT
                month,
                category,
                revenue,
                prev_revenue,
                ROUND((revenue - prev_revenue) / prev_revenue * 100, 2) as growth_rate
            FROM category_growth
            WHERE prev_revenue IS NOT NULL
            ORDER BY month, category
            """);

        System.out.println("=== 카테고리별 월간 성장률 ===");
        sqlAnalysis.show(20);

        // 6. 결과 저장
        monthlyRevenue.write()
                .mode("overwrite")
                .parquet("output/monthly_revenue");

        segmentAnalysis.write()
                .mode("overwrite")
                .parquet("output/segment_analysis");

        spark.stop();
    }
}

실제 공개 데이터셋 예제#

실무에서 자주 활용하는 공개 데이터셋을 사용한 예제입니다.

NYC 택시 데이터 분석

뉴욕시 택시 데이터(TLC Trip Record Data)는 빅데이터 분석 학습에 가장 많이 사용되는 공개 데이터셋입니다.

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.functions.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * NYC 택시 데이터 분석 예제
 * 데이터 출처: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
 */
public class NYCTaxiAnalysis {
    private static final Logger logger = LoggerFactory.getLogger(NYCTaxiAnalysis.class);

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("NYC Taxi Analysis")
                .config("spark.sql.adaptive.enabled", "true")
                .config("spark.sql.adaptive.coalescePartitions.enabled", "true")
                .getOrCreate();

        try {
            // 스키마 명시 (inferSchema보다 성능 우수)
            StructType taxiSchema = new StructType()
                .add("VendorID", DataTypes.IntegerType)
                .add("tpep_pickup_datetime", DataTypes.TimestampType)
                .add("tpep_dropoff_datetime", DataTypes.TimestampType)
                .add("passenger_count", DataTypes.IntegerType)
                .add("trip_distance", DataTypes.DoubleType)
                .add("PULocationID", DataTypes.IntegerType)
                .add("DOLocationID", DataTypes.IntegerType)
                .add("payment_type", DataTypes.IntegerType)
                .add("fare_amount", DataTypes.DoubleType)
                .add("tip_amount", DataTypes.DoubleType)
                .add("total_amount", DataTypes.DoubleType);

            // Parquet 포맷 권장 (NYC TLC에서 제공)
            // 샘플: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet
            Dataset<Row> taxiData = spark.read()
                    .schema(taxiSchema)
                    .parquet("data/yellow_tripdata_2024-01.parquet");

            logger.info("총 레코드 수: {}", taxiData.count());

            // 1. 시간대별 운행 패턴 분석
            Dataset<Row> hourlyPattern = taxiData
                .withColumn("pickup_hour", hour(col("tpep_pickup_datetime")))
                .withColumn("pickup_dayofweek", dayofweek(col("tpep_pickup_datetime")))
                .groupBy("pickup_dayofweek", "pickup_hour")
                .agg(
                    count("*").alias("trip_count"),
                    avg("trip_distance").alias("avg_distance"),
                    avg("total_amount").alias("avg_fare"),
                    avg("tip_amount").alias("avg_tip")
                )
                .orderBy("pickup_dayofweek", "pickup_hour");

            logger.info("=== 시간대별 운행 패턴 ===");
            hourlyPattern.show(24);

            // 2. 인기 출발/도착 지역 분석
            Dataset<Row> popularRoutes = taxiData
                .filter(col("trip_distance").gt(0))
                .groupBy("PULocationID", "DOLocationID")
                .agg(
                    count("*").alias("trip_count"),
                    avg("total_amount").alias("avg_fare"),
                    percentile_approx(col("trip_distance"), lit(0.5)).alias("median_distance")
                )
                .orderBy(col("trip_count").desc())
                .limit(20);

            logger.info("=== 인기 경로 Top 20 ===");
            popularRoutes.show();

            // 3. 요금 이상치 탐지
            Dataset<Row> fareStats = taxiData.agg(
                avg("total_amount").alias("mean"),
                stddev("total_amount").alias("stddev")
            ).first();

            double mean = fareStats.getDouble(0);
            double stddev = fareStats.getDouble(1);
            double threshold = mean + (3 * stddev);  // 3-시그마 규칙

            Dataset<Row> outliers = taxiData
                .filter(col("total_amount").gt(threshold)
                    .or(col("total_amount").lt(0)))
                .select("tpep_pickup_datetime", "trip_distance",
                        "fare_amount", "tip_amount", "total_amount");

            logger.info("=== 요금 이상치 (3σ 초과) ===");
            logger.info("임계값: ${}", String.format("%.2f", threshold));
            outliers.show(10);

            // 4. 결과 저장
            hourlyPattern.write()
                .mode("overwrite")
                .partitionBy("pickup_dayofweek")
                .parquet("output/nyc_taxi/hourly_pattern");

            logger.info("분석 완료");

        } catch (Exception e) {
            logger.error("분석 중 오류 발생: {}", e.getMessage(), e);
            throw new RuntimeException(e);
        } finally {
            spark.stop();
        }
    }
}

데이터 다운로드 방법#

# NYC TLC 공식 데이터 (Parquet 형식, ~50MB/월)
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet

# Zone Lookup 테이블 (지역 코드 → 지역명 매핑)
wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

Kaggle 데이터셋 활용 예제

Kaggle은 다양한 실무 데이터셋을 제공합니다.

/**
 * Kaggle Credit Card Fraud Detection 데이터셋 분석
 * 데이터: https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud
 */
public class FraudDetectionAnalysis {
    private static final Logger logger = LoggerFactory.getLogger(FraudDetectionAnalysis.class);

    public static void main(String[] args) {
        SparkSession spark = SparkSession.builder()
                .appName("Fraud Detection Analysis")
                .config("spark.sql.shuffle.partitions", "8")  // 소규모 데이터용
                .getOrCreate();

        try {
            // 신용카드 거래 데이터 (284,807 거래, 492건 사기)
            Dataset<Row> transactions = spark.read()
                    .option("header", "true")
                    .option("inferSchema", "true")
                    .csv("data/creditcard.csv");

            // 클래스 불균형 확인
            logger.info("=== 클래스 분포 ===");
            transactions.groupBy("Class")
                .agg(
                    count("*").alias("count"),
                    round(count("*").multiply(100.0).divide(transactions.count()), 2)
                        .alias("percentage")
                )
                .show();

            // 사기 거래 특성 분석
            Dataset<Row> fraudStats = transactions
                .groupBy("Class")
                .agg(
                    avg("Amount").alias("avg_amount"),
                    max("Amount").alias("max_amount"),
                    min("Amount").alias("min_amount"),
                    stddev("Amount").alias("stddev_amount"),
                    avg("Time").alias("avg_time_seconds")
                );

            logger.info("=== 정상 vs 사기 거래 통계 ===");
            fraudStats.show();

            // 시간대별 사기 발생 패턴
            Dataset<Row> hourlyFraud = transactions
                .withColumn("hour", floor(col("Time").divide(3600)).mod(24))
                .groupBy("hour", "Class")
                .count()
                .orderBy("hour");

            logger.info("=== 시간대별 거래 패턴 ===");
            hourlyFraud.show(48);

            // 금액 구간별 사기 비율
            Dataset<Row> amountBuckets = transactions
                .withColumn("amount_bucket",
                    when(col("Amount").lt(100), "0-100")
                    .when(col("Amount").lt(500), "100-500")
                    .when(col("Amount").lt(1000), "500-1000")
                    .when(col("Amount").lt(5000), "1000-5000")
                    .otherwise("5000+"))
                .groupBy("amount_bucket")
                .agg(
                    count("*").alias("total"),
                    sum(when(col("Class").equalTo(1), 1).otherwise(0)).alias("fraud_count")
                )
                .withColumn("fraud_rate",
                    round(col("fraud_count").multiply(100.0).divide(col("total")), 4));

            logger.info("=== 금액 구간별 사기 비율 ===");
            amountBuckets.show();

        } finally {
            spark.stop();
        }
    }
}

공개 데이터셋 목록

데이터셋크기용도다운로드
NYC Taxi~50MB/월시계열, 집계, 조인TLC
Credit Card Fraud150MB불균형 분류, 이상 탐지Kaggle
Amazon Reviews수GB텍스트 분석, 감성 분류AWS Registry
Common Crawl수TB대규모 웹 분석commoncrawl.org
Wikipedia Dumps수십GBNLP, 지식 그래프dumps.wikimedia.org

다음 단계#

예제를 완료했다면: