TL;DR
- CSV, JSON, Parquet, JDBC 등 다양한 데이터 소스 읽기/쓰기
- DataFrame API로 필터, 변환, 집계, 조인 수행
- SQL 쿼리와 DataFrame API 혼용 가능
- 실무 예제: NYC 택시 데이터, Kaggle 데이터셋 분석
대상 독자 및 선수 지식#
| 구분 | 내용 |
|---|---|
| 대상 독자 | Spark DataFrame API를 처음 사용하는 Java 개발자 |
| 선수 지식 | Java 기본 문법, SQL 기초, 환경 설정 완료 |
| 학습 목표 | 데이터 로딩, 변환, 집계, 조인, 저장을 수행할 수 있다 |
| 예상 소요 시간 | 약 30분 |
Spark의 핵심 기능을 활용하는 예제 코드입니다.
데이터 로딩#
CSV 파일 읽기
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
public class CsvExample {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("CSV Example")
.master("local[*]")
.getOrCreate();
// 기본 읽기
Dataset<Row> df = spark.read()
.option("header", "true") // 첫 줄을 헤더로
.option("inferSchema", "true") // 타입 자동 추론
.csv("data/employees.csv");
// 상세 옵션
Dataset<Row> dfDetailed = spark.read()
.option("header", "true")
.option("inferSchema", "true")
.option("sep", ",") // 구분자
.option("quote", "\"") // 인용 문자
.option("escape", "\\") // 이스케이프 문자
.option("nullValue", "NA") // null 표현
.option("dateFormat", "yyyy-MM-dd")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.csv("data/employees.csv");
df.show();
df.printSchema();
spark.stop();
}
}JSON 파일 읽기
// 단일 파일
Dataset<Row> df = spark.read().json("data/users.json");
// 멀티라인 JSON
Dataset<Row> dfMulti = spark.read()
.option("multiLine", "true")
.json("data/users_multiline.json");
// JSON Lines (한 줄에 하나의 JSON 객체)
Dataset<Row> dfLines = spark.read().json("data/users.jsonl");
df.show();Parquet 파일 읽기
// Parquet (권장 포맷)
Dataset<Row> df = spark.read().parquet("data/users.parquet");
// 특정 컬럼만 읽기 (컬럼 프루닝)
Dataset<Row> selected = spark.read()
.parquet("data/users.parquet")
.select("id", "name");핵심 포인트: 데이터 포맷 선택
- Parquet: 컬럼 지향, 압축 효율, 스키마 내장 - 프로덕션 권장
- CSV/JSON: 사람이 읽기 쉬움 - 소규모 데이터나 디버깅용
- JDBC: 데이터베이스 직접 연동 - 증분 추출이나 lookup 테이블용
JDBC로 데이터베이스 읽기
Dataset<Row> df = spark.read()
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/mydb")
.option("dbtable", "employees")
.option("user", "user")
.option("password", "password")
.option("driver", "com.mysql.cj.jdbc.Driver")
.load();
// 쿼리 사용
Dataset<Row> dfQuery = spark.read()
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/mydb")
.option("query", "SELECT * FROM employees WHERE age > 30")
.option("user", "user")
.option("password", "password")
.load();데이터 변환#
컬럼 연산
import static org.apache.spark.sql.functions.*;
Dataset<Row> employees = spark.read()
.option("header", "true")
.option("inferSchema", "true")
.csv("data/employees.csv");
// 새 컬럼 추가
Dataset<Row> withBonus = employees.withColumn(
"bonus",
col("salary").multiply(0.1)
);
// 여러 컬럼 추가
Dataset<Row> enhanced = employees
.withColumn("bonus", col("salary").multiply(0.1))
.withColumn("total_compensation", col("salary").plus(col("bonus")))
.withColumn("hire_year", year(col("hire_date")))
.withColumn("name_upper", upper(col("name")));
// 컬럼 이름 변경
Dataset<Row> renamed = employees.withColumnRenamed("name", "employee_name");
// 컬럼 삭제
Dataset<Row> dropped = employees.drop("middle_name", "suffix");
// 컬럼 타입 변환
Dataset<Row> casted = employees.withColumn(
"salary",
col("salary").cast("double")
);
enhanced.show();필터링
// 단일 조건
Dataset<Row> highEarners = employees.filter(col("salary").gt(60000));
// 복합 조건
Dataset<Row> filtered = employees.filter(
col("age").geq(30)
.and(col("department").equalTo("Engineering"))
.and(col("salary").between(50000, 80000))
);
// 문자열 조건
Dataset<Row> kimFamily = employees.filter(col("name").startsWith("Kim"));
Dataset<Row> hasEmail = employees.filter(col("email").contains("@company.com"));
Dataset<Row> pattern = employees.filter(col("name").rlike("^[A-Z][a-z]+$"));
// NULL 처리
Dataset<Row> withManager = employees.filter(col("manager_id").isNotNull());
Dataset<Row> noManager = employees.filter(col("manager_id").isNull());
// IN 조건
Dataset<Row> depts = employees.filter(
col("department").isin("Engineering", "Marketing", "Sales")
);
filtered.show();핵심 포인트: 데이터 변환
- withColumn(): 새 컬럼 추가 또는 기존 컬럼 변환
- filter(): 조건에 맞는 행만 선택 (SQL WHERE와 동일)
- NULL 처리:
isNull(),isNotNull()로 명시적 처리 필수- 체이닝: 여러 변환을 연결하여 파이프라인 구성
정렬
// 단일 컬럼
Dataset<Row> sorted = employees.orderBy("salary");
Dataset<Row> sortedDesc = employees.orderBy(col("salary").desc());
// 다중 컬럼
Dataset<Row> multiSort = employees.orderBy(
col("department").asc(),
col("salary").desc()
);
// NULL 처리
Dataset<Row> nullsFirst = employees.orderBy(col("manager_id").asc_nulls_first());
Dataset<Row> nullsLast = employees.orderBy(col("salary").desc_nulls_last());
multiSort.show();집계#
기본 집계
// 전체 집계
Dataset<Row> stats = employees.agg(
count("*").alias("total_count"),
countDistinct("department").alias("dept_count"),
sum("salary").alias("total_salary"),
avg("salary").alias("avg_salary"),
max("salary").alias("max_salary"),
min("salary").alias("min_salary"),
stddev("salary").alias("stddev_salary")
);
stats.show();그룹별 집계
// 단일 그룹
Dataset<Row> byDept = employees
.groupBy("department")
.agg(
count("*").alias("employee_count"),
avg("salary").alias("avg_salary"),
sum("salary").alias("total_salary")
)
.orderBy(col("total_salary").desc());
// 다중 그룹
Dataset<Row> byDeptLevel = employees
.groupBy("department", "level")
.agg(
count("*").alias("count"),
avg("salary").alias("avg_salary")
);
byDept.show();피벗
// 피벗 테이블
Dataset<Row> pivoted = employees
.groupBy("department")
.pivot("level", Arrays.asList("Junior", "Senior", "Lead"))
.agg(avg("salary"));
pivoted.show();
// +----------+-------+-------+------+
// |department| Junior| Senior| Lead|
// +----------+-------+-------+------+
// | Sales| 45000 | 65000 | 85000|
// | Eng | 55000 | 75000 | 95000|
// +----------+-------+-------+------+핵심 포인트: 집계
- groupBy() + agg(): SQL GROUP BY와 동일한 그룹별 집계
- pivot(): 행을 열로 변환하는 피벗 테이블 생성
- alias(): 집계 결과 컬럼에 명확한 이름 부여
- orderBy(): 집계 후 정렬로 가독성 향상
조인#
기본 조인
Dataset<Row> employees = spark.read().parquet("employees.parquet");
Dataset<Row> departments = spark.read().parquet("departments.parquet");
// Inner Join
Dataset<Row> joined = employees.join(
departments,
employees.col("department_id").equalTo(departments.col("id"))
);
// 컬럼명이 같은 경우
Dataset<Row> simpleJoin = employees.join(departments, "department_id");
// Left Join
Dataset<Row> leftJoined = employees.join(
departments,
employees.col("department_id").equalTo(departments.col("id")),
"left"
);
// 조인 유형: inner, left, right, full, left_semi, left_anti, cross다중 조건 조인
Dataset<Row> multiJoin = orders.join(
products,
orders.col("product_id").equalTo(products.col("id"))
.and(orders.col("region").equalTo(products.col("region")))
);브로드캐스트 조인
import static org.apache.spark.sql.functions.broadcast;
// 작은 테이블 브로드캐스트
Dataset<Row> optimized = employees.join(
broadcast(departments),
"department_id"
);핵심 포인트: 조인
- 조인 유형: inner, left, right, full, left_semi, left_anti, cross
- broadcast(): 작은 테이블(수 MB 이하)을 모든 노드에 복제하여 성능 향상
- 동일 컬럼명:
.join(other, "key")간단 문법 사용 가능- 다중 조건:
.and()연산자로 복합 조인 조건 구성
SQL 사용#
// 임시 뷰 등록
employees.createOrReplaceTempView("employees");
departments.createOrReplaceTempView("departments");
// SQL 쿼리 실행
Dataset<Row> result = spark.sql("""
SELECT
e.name,
e.salary,
d.department_name,
AVG(e.salary) OVER (PARTITION BY e.department_id) as dept_avg_salary
FROM employees e
JOIN departments d ON e.department_id = d.id
WHERE e.age > 25
ORDER BY e.salary DESC
LIMIT 100
""");
result.show();
// CTE 사용
Dataset<Row> cteResult = spark.sql("""
WITH dept_stats AS (
SELECT
department_id,
AVG(salary) as avg_salary,
COUNT(*) as emp_count
FROM employees
GROUP BY department_id
)
SELECT
d.department_name,
ds.avg_salary,
ds.emp_count
FROM dept_stats ds
JOIN departments d ON ds.department_id = d.id
ORDER BY ds.avg_salary DESC
""");
cteResult.show();핵심 포인트: SQL 사용
- createOrReplaceTempView(): DataFrame을 SQL 테이블처럼 사용
- spark.sql(): 표준 SQL 쿼리 실행 (Window 함수, CTE 포함)
- 혼용 가능: DataFrame API와 SQL을 필요에 따라 조합
- 복잡한 분석: Window 함수, CTE 등은 SQL이 더 직관적
데이터 저장#
파일 저장
// Parquet (권장)
result.write()
.mode("overwrite")
.parquet("output/result.parquet");
// 파티셔닝
result.write()
.mode("overwrite")
.partitionBy("year", "month")
.parquet("output/partitioned");
// CSV
result.write()
.mode("overwrite")
.option("header", "true")
.csv("output/result.csv");
// 단일 파일로 저장
result.coalesce(1)
.write()
.mode("overwrite")
.option("header", "true")
.csv("output/single_file");저장 모드
| 모드 | 설명 |
|---|---|
overwrite | 기존 데이터 덮어쓰기 |
append | 기존 데이터에 추가 |
ignore | 이미 존재하면 무시 |
error (기본) | 이미 존재하면 오류 |
핵심 포인트: 데이터 저장
- Parquet + 파티셔닝: 대용량 데이터의 표준 저장 방식
- partitionBy(): 자주 필터링하는 컬럼 기준 디렉토리 분할
- coalesce(1): 단일 파일 출력 (소규모 결과 공유 시 유용)
- 저장 모드: 프로덕션에서는
append또는 명시적overwrite사용
종합 예제: 매출 분석#
public class SalesAnalysisExample {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Sales Analysis")
.master("local[*]")
.getOrCreate();
spark.sparkContext().setLogLevel("WARN");
// 데이터 로드
Dataset<Row> orders = spark.read()
.option("header", "true")
.option("inferSchema", "true")
.csv("data/orders.csv");
Dataset<Row> products = spark.read()
.option("header", "true")
.option("inferSchema", "true")
.csv("data/products.csv");
Dataset<Row> customers = spark.read()
.option("header", "true")
.option("inferSchema", "true")
.csv("data/customers.csv");
// 1. 주문-제품 조인
Dataset<Row> ordersWithProducts = orders
.join(broadcast(products), "product_id")
.withColumn("revenue", col("quantity").multiply(col("price")));
// 2. 월별 매출 집계
Dataset<Row> monthlyRevenue = ordersWithProducts
.withColumn("month", date_format(col("order_date"), "yyyy-MM"))
.groupBy("month")
.agg(
sum("revenue").alias("total_revenue"),
count("*").alias("order_count"),
countDistinct("customer_id").alias("unique_customers")
)
.orderBy("month");
System.out.println("=== 월별 매출 ===");
monthlyRevenue.show();
// 3. 카테고리별 Top 5 제품
WindowSpec windowSpec = Window
.partitionBy("category")
.orderBy(col("total_revenue").desc());
Dataset<Row> topProducts = ordersWithProducts
.groupBy("category", "product_name")
.agg(sum("revenue").alias("total_revenue"))
.withColumn("rank", rank().over(windowSpec))
.filter(col("rank").leq(5));
System.out.println("=== 카테고리별 Top 5 제품 ===");
topProducts.show(20);
// 4. 고객 세그먼트 분석
Dataset<Row> customerStats = ordersWithProducts
.join(customers, "customer_id")
.groupBy("customer_id", "customer_name", "segment")
.agg(
sum("revenue").alias("total_spent"),
count("*").alias("order_count"),
avg("revenue").alias("avg_order_value")
);
Dataset<Row> segmentAnalysis = customerStats
.groupBy("segment")
.agg(
count("*").alias("customer_count"),
avg("total_spent").alias("avg_customer_value"),
sum("total_spent").alias("segment_revenue")
)
.orderBy(col("segment_revenue").desc());
System.out.println("=== 고객 세그먼트 분석 ===");
segmentAnalysis.show();
// 5. SQL로 복합 분석
ordersWithProducts.createOrReplaceTempView("orders_enriched");
Dataset<Row> sqlAnalysis = spark.sql("""
WITH monthly_category AS (
SELECT
DATE_FORMAT(order_date, 'yyyy-MM') as month,
category,
SUM(revenue) as revenue
FROM orders_enriched
GROUP BY DATE_FORMAT(order_date, 'yyyy-MM'), category
),
category_growth AS (
SELECT
month,
category,
revenue,
LAG(revenue) OVER (PARTITION BY category ORDER BY month) as prev_revenue
FROM monthly_category
)
SELECT
month,
category,
revenue,
prev_revenue,
ROUND((revenue - prev_revenue) / prev_revenue * 100, 2) as growth_rate
FROM category_growth
WHERE prev_revenue IS NOT NULL
ORDER BY month, category
""");
System.out.println("=== 카테고리별 월간 성장률 ===");
sqlAnalysis.show(20);
// 6. 결과 저장
monthlyRevenue.write()
.mode("overwrite")
.parquet("output/monthly_revenue");
segmentAnalysis.write()
.mode("overwrite")
.parquet("output/segment_analysis");
spark.stop();
}
}실제 공개 데이터셋 예제#
실무에서 자주 활용하는 공개 데이터셋을 사용한 예제입니다.
NYC 택시 데이터 분석
뉴욕시 택시 데이터(TLC Trip Record Data)는 빅데이터 분석 학습에 가장 많이 사용되는 공개 데이터셋입니다.
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.*;
import static org.apache.spark.sql.functions.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* NYC 택시 데이터 분석 예제
* 데이터 출처: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page
*/
public class NYCTaxiAnalysis {
private static final Logger logger = LoggerFactory.getLogger(NYCTaxiAnalysis.class);
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("NYC Taxi Analysis")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.getOrCreate();
try {
// 스키마 명시 (inferSchema보다 성능 우수)
StructType taxiSchema = new StructType()
.add("VendorID", DataTypes.IntegerType)
.add("tpep_pickup_datetime", DataTypes.TimestampType)
.add("tpep_dropoff_datetime", DataTypes.TimestampType)
.add("passenger_count", DataTypes.IntegerType)
.add("trip_distance", DataTypes.DoubleType)
.add("PULocationID", DataTypes.IntegerType)
.add("DOLocationID", DataTypes.IntegerType)
.add("payment_type", DataTypes.IntegerType)
.add("fare_amount", DataTypes.DoubleType)
.add("tip_amount", DataTypes.DoubleType)
.add("total_amount", DataTypes.DoubleType);
// Parquet 포맷 권장 (NYC TLC에서 제공)
// 샘플: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet
Dataset<Row> taxiData = spark.read()
.schema(taxiSchema)
.parquet("data/yellow_tripdata_2024-01.parquet");
logger.info("총 레코드 수: {}", taxiData.count());
// 1. 시간대별 운행 패턴 분석
Dataset<Row> hourlyPattern = taxiData
.withColumn("pickup_hour", hour(col("tpep_pickup_datetime")))
.withColumn("pickup_dayofweek", dayofweek(col("tpep_pickup_datetime")))
.groupBy("pickup_dayofweek", "pickup_hour")
.agg(
count("*").alias("trip_count"),
avg("trip_distance").alias("avg_distance"),
avg("total_amount").alias("avg_fare"),
avg("tip_amount").alias("avg_tip")
)
.orderBy("pickup_dayofweek", "pickup_hour");
logger.info("=== 시간대별 운행 패턴 ===");
hourlyPattern.show(24);
// 2. 인기 출발/도착 지역 분석
Dataset<Row> popularRoutes = taxiData
.filter(col("trip_distance").gt(0))
.groupBy("PULocationID", "DOLocationID")
.agg(
count("*").alias("trip_count"),
avg("total_amount").alias("avg_fare"),
percentile_approx(col("trip_distance"), lit(0.5)).alias("median_distance")
)
.orderBy(col("trip_count").desc())
.limit(20);
logger.info("=== 인기 경로 Top 20 ===");
popularRoutes.show();
// 3. 요금 이상치 탐지
Dataset<Row> fareStats = taxiData.agg(
avg("total_amount").alias("mean"),
stddev("total_amount").alias("stddev")
).first();
double mean = fareStats.getDouble(0);
double stddev = fareStats.getDouble(1);
double threshold = mean + (3 * stddev); // 3-시그마 규칙
Dataset<Row> outliers = taxiData
.filter(col("total_amount").gt(threshold)
.or(col("total_amount").lt(0)))
.select("tpep_pickup_datetime", "trip_distance",
"fare_amount", "tip_amount", "total_amount");
logger.info("=== 요금 이상치 (3σ 초과) ===");
logger.info("임계값: ${}", String.format("%.2f", threshold));
outliers.show(10);
// 4. 결과 저장
hourlyPattern.write()
.mode("overwrite")
.partitionBy("pickup_dayofweek")
.parquet("output/nyc_taxi/hourly_pattern");
logger.info("분석 완료");
} catch (Exception e) {
logger.error("분석 중 오류 발생: {}", e.getMessage(), e);
throw new RuntimeException(e);
} finally {
spark.stop();
}
}
}데이터 다운로드 방법#
# NYC TLC 공식 데이터 (Parquet 형식, ~50MB/월)
wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet
# Zone Lookup 테이블 (지역 코드 → 지역명 매핑)
wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csvKaggle 데이터셋 활용 예제
Kaggle은 다양한 실무 데이터셋을 제공합니다.
/**
* Kaggle Credit Card Fraud Detection 데이터셋 분석
* 데이터: https://www.kaggle.com/datasets/mlg-ulb/creditcardfraud
*/
public class FraudDetectionAnalysis {
private static final Logger logger = LoggerFactory.getLogger(FraudDetectionAnalysis.class);
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("Fraud Detection Analysis")
.config("spark.sql.shuffle.partitions", "8") // 소규모 데이터용
.getOrCreate();
try {
// 신용카드 거래 데이터 (284,807 거래, 492건 사기)
Dataset<Row> transactions = spark.read()
.option("header", "true")
.option("inferSchema", "true")
.csv("data/creditcard.csv");
// 클래스 불균형 확인
logger.info("=== 클래스 분포 ===");
transactions.groupBy("Class")
.agg(
count("*").alias("count"),
round(count("*").multiply(100.0).divide(transactions.count()), 2)
.alias("percentage")
)
.show();
// 사기 거래 특성 분석
Dataset<Row> fraudStats = transactions
.groupBy("Class")
.agg(
avg("Amount").alias("avg_amount"),
max("Amount").alias("max_amount"),
min("Amount").alias("min_amount"),
stddev("Amount").alias("stddev_amount"),
avg("Time").alias("avg_time_seconds")
);
logger.info("=== 정상 vs 사기 거래 통계 ===");
fraudStats.show();
// 시간대별 사기 발생 패턴
Dataset<Row> hourlyFraud = transactions
.withColumn("hour", floor(col("Time").divide(3600)).mod(24))
.groupBy("hour", "Class")
.count()
.orderBy("hour");
logger.info("=== 시간대별 거래 패턴 ===");
hourlyFraud.show(48);
// 금액 구간별 사기 비율
Dataset<Row> amountBuckets = transactions
.withColumn("amount_bucket",
when(col("Amount").lt(100), "0-100")
.when(col("Amount").lt(500), "100-500")
.when(col("Amount").lt(1000), "500-1000")
.when(col("Amount").lt(5000), "1000-5000")
.otherwise("5000+"))
.groupBy("amount_bucket")
.agg(
count("*").alias("total"),
sum(when(col("Class").equalTo(1), 1).otherwise(0)).alias("fraud_count")
)
.withColumn("fraud_rate",
round(col("fraud_count").multiply(100.0).divide(col("total")), 4));
logger.info("=== 금액 구간별 사기 비율 ===");
amountBuckets.show();
} finally {
spark.stop();
}
}
}공개 데이터셋 목록
| 데이터셋 | 크기 | 용도 | 다운로드 |
|---|---|---|---|
| NYC Taxi | ~50MB/월 | 시계열, 집계, 조인 | TLC |
| Credit Card Fraud | 150MB | 불균형 분류, 이상 탐지 | Kaggle |
| Amazon Reviews | 수GB | 텍스트 분석, 감성 분류 | AWS Registry |
| Common Crawl | 수TB | 대규모 웹 분석 | commoncrawl.org |
| Wikipedia Dumps | 수십GB | NLP, 지식 그래프 | dumps.wikimedia.org |
다음 단계#
예제를 완료했다면: