TL;DR
  • REST API 서버: http4s + Circe + Cats Effect로 함수형 웹 서버 구축
  • 데이터 파이프라인: FS2로 메모리 효율적인 스트림 처리
  • CLI 도구: scopt로 타입 안전한 명령줄 파서 구현
  • 에러 처리: Cats Validated로 여러 검증 오류 동시 수집
  • 모든 예제는 불변성, 참조 투명성, 타입 안전성을 강조하는 함수형 스타일

대상 독자: Scala로 실제 서비스를 구축하려는 개발자

선수 지식:

  • Scala 기본 문법 및 함수형 프로그래밍 개념
  • sbt 빌드 도구 사용법
  • REST API 및 HTTP 기본 개념 (프로젝트 1)
  • (선택) Cats Effect/IO 모나드 기초 지식

Scala로 실제 서비스를 구축하는 예제입니다. REST API 서버와 데이터 파이프라인을 구현합니다. 이 예제들은 Scala의 함수형 프로그래밍 특성을 활용하여 타입 안전하고 유지보수하기 쉬운 코드를 작성하는 방법을 보여줍니다.

프로젝트 1: REST API 서버#

http4s는 Scala의 함수형 HTTP 라이브러리입니다. 불변성과 참조 투명성을 유지하면서 웹 서버를 구축할 수 있습니다. Circe는 JSON 처리를, Cats Effect는 비동기 처리를 담당합니다.

기술 스택

이 프로젝트에서 사용하는 라이브러리들입니다. http4s-ember는 가벼운 HTTP 서버/클라이언트 구현체이고, Circe는 컴파일 타임에 JSON 코덱을 생성합니다.

  • Scala 3 + http4s (함수형 HTTP 라이브러리)
  • Circe (JSON 처리)
  • Cats Effect (비동기 처리)

build.sbt

ThisBuild / scalaVersion := "3.3.1"

lazy val root = (project in file("."))
  .settings(
    name := "scala-api-server",
    libraryDependencies ++= Seq(
      "org.http4s"      %% "http4s-ember-server" % "0.23.25",
      "org.http4s"      %% "http4s-ember-client" % "0.23.25",
      "org.http4s"      %% "http4s-circe"        % "0.23.25",
      "org.http4s"      %% "http4s-dsl"          % "0.23.25",
      "io.circe"        %% "circe-generic"       % "0.14.6",
      "io.circe"        %% "circe-parser"        % "0.14.6",
      "ch.qos.logback"   % "logback-classic"     % "1.4.14"
    )
  )

도메인 모델

먼저 API에서 사용할 도메인 모델을 정의합니다. Case Class로 불변 데이터를 표현하고, Circe의 자동 코덱 생성 기능을 활용합니다.

// domain/models.scala
package domain

import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto.*

// 도메인 모델 정의
case class UserId(value: Long) extends AnyVal
case class User(
  id: UserId,
  name: String,
  email: String,
  createdAt: java.time.Instant
)

case class CreateUserRequest(name: String, email: String)
case class UpdateUserRequest(name: Option[String], email: Option[String])
case class UserResponse(id: Long, name: String, email: String)

// JSON 코덱 (Circe)
object JsonCodecs:
  // UserId 코덱
  given Encoder[UserId] = Encoder.encodeLong.contramap(_.value)
  given Decoder[UserId] = Decoder.decodeLong.map(UserId.apply)

  // Request/Response 코덱
  given Decoder[CreateUserRequest] = deriveDecoder
  given Decoder[UpdateUserRequest] = deriveDecoder
  given Encoder[UserResponse] = deriveEncoder

  // User → UserResponse 변환
  extension (user: User)
    def toResponse: UserResponse = UserResponse(
      id = user.id.value,
      name = user.name,
      email = user.email
    )

UserId를 AnyVal로 래핑하면 런타임 오버헤드 없이 타입 안전성을 얻을 수 있습니다. JSON 코덱은 Circe의 semiauto를 사용하여 컴파일 타임에 자동 생성됩니다.

핵심 포인트
  • Case Class: 불변 데이터 모델링의 기본
  • AnyVal 래핑: UserId(value: Long)처럼 타입 안전성 확보, 런타임 오버헤드 없음
  • Circe deriveEncoder/Decoder: 컴파일 타임 JSON 코덱 자동 생성
  • extension: 모델에 변환 메서드 추가 (user.toResponse)

리포지토리 (인메모리)

데이터 저장소를 정의합니다. Cats Effect의 Ref를 사용하여 스레드 안전한 인메모리 저장소를 구현합니다.

// repository/UserRepository.scala
package repository

import cats.effect.{IO, Ref}
import domain.*

trait UserRepository:
  def findById(id: UserId): IO[Option[User]]
  def findAll: IO[List[User]]
  def create(request: CreateUserRequest): IO[User]
  def update(id: UserId, request: UpdateUserRequest): IO[Option[User]]
  def delete(id: UserId): IO[Boolean]

object InMemoryUserRepository:
  def make: IO[UserRepository] =
    for
      store <- Ref.of[IO, Map[UserId, User]](Map.empty)
      counter <- Ref.of[IO, Long](0L)
    yield new UserRepository:

      def findById(id: UserId): IO[Option[User]] =
        store.get.map(_.get(id))

      def findAll: IO[List[User]] =
        store.get.map(_.values.toList)

      def create(request: CreateUserRequest): IO[User] =
        for
          newId <- counter.updateAndGet(_ + 1)
          user = User(
            id = UserId(newId),
            name = request.name,
            email = request.email,
            createdAt = java.time.Instant.now()
          )
          _ <- store.update(_ + (user.id -> user))
        yield user

      def update(id: UserId, request: UpdateUserRequest): IO[Option[User]] =
        store.modify { currentStore =>
          currentStore.get(id) match
            case Some(existing) =>
              val updated = existing.copy(
                name = request.name.getOrElse(existing.name),
                email = request.email.getOrElse(existing.email)
              )
              (currentStore + (id -> updated), Some(updated))
            case None =>
              (currentStore, None)
        }

      def delete(id: UserId): IO[Boolean] =
        store.modify { currentStore =>
          if currentStore.contains(id) then
            (currentStore - id, true)
          else
            (currentStore, false)
        }

Ref.modify는 원자적 업데이트를 보장합니다. 트레이트로 인터페이스를 정의하면 나중에 실제 데이터베이스 구현으로 쉽게 교체할 수 있습니다.

핵심 포인트
  • trait: 인터페이스 정의로 구현체 교체 용이 (테스트, DB 전환)
  • Ref[IO, A]: 스레드 안전한 가변 상태 관리
  • Ref.modify: 원자적 읽기-수정-쓰기 연산
  • for comprehension: IO 연산 순차 조합

HTTP 라우트

http4s DSL을 사용하여 RESTful 엔드포인트를 정의합니다. 각 라우트는 IO 값을 반환하며, 부수 효과가 순수하게 관리됩니다.

// routes/UserRoutes.scala
package routes

import cats.effect.IO
import org.http4s.*
import org.http4s.dsl.io.*
import org.http4s.circe.*
import io.circe.syntax.*
import domain.*
import domain.JsonCodecs.given
import repository.UserRepository

object UserRoutes:
  def routes(repo: UserRepository): HttpRoutes[IO] = HttpRoutes.of[IO]:

    // GET /users - 전체 조회
    case GET -> Root / "users" =>
      for
        users <- repo.findAll
        response <- Ok(users.map(_.toResponse).asJson)
      yield response

    // GET /users/:id - 단건 조회
    case GET -> Root / "users" / LongVar(id) =>
      for
        userOpt <- repo.findById(UserId(id))
        response <- userOpt match
          case Some(user) => Ok(user.toResponse.asJson)
          case None => NotFound(s"User $id not found")
      yield response

    // POST /users - 생성
    case req @ POST -> Root / "users" =>
      for
        createReq <- req.as[CreateUserRequest]
        user <- repo.create(createReq)
        response <- Created(user.toResponse.asJson)
      yield response

    // PUT /users/:id - 수정
    case req @ PUT -> Root / "users" / LongVar(id) =>
      for
        updateReq <- req.as[UpdateUserRequest]
        userOpt <- repo.update(UserId(id), updateReq)
        response <- userOpt match
          case Some(user) => Ok(user.toResponse.asJson)
          case None => NotFound(s"User $id not found")
      yield response

    // DELETE /users/:id - 삭제
    case DELETE -> Root / "users" / LongVar(id) =>
      for
        deleted <- repo.delete(UserId(id))
        response <- if deleted then NoContent() else NotFound(s"User $id not found")
      yield response

  // JSON 디코더
  given EntityDecoder[IO, CreateUserRequest] = jsonOf[IO, CreateUserRequest]
  given EntityDecoder[IO, UpdateUserRequest] = jsonOf[IO, UpdateUserRequest]
핵심 포인트
  • http4s DSL: GET -> Root / "users" / LongVar(id)처럼 패턴 매칭으로 라우트 정의
  • HttpRoutes[IO]: 순수 함수형 라우트, 부수 효과는 IO로 관리
  • req.as[T]: 요청 본문을 타입 T로 디코딩
  • Ok, Created, NotFound: HTTP 응답 생성 헬퍼

메인 애플리케이션

모든 컴포넌트를 조합하여 서버를 시작합니다. IOApp.Simple을 상속하면 IO 기반 애플리케이션의 진입점을 간단히 정의할 수 있습니다.

// Main.scala
import cats.effect.*
import com.comcast.ip4s.*
import org.http4s.ember.server.EmberServerBuilder
import org.http4s.server.Router
import repository.InMemoryUserRepository
import routes.UserRoutes

object Main extends IOApp.Simple:
  def run: IO[Unit] =
    for
      repo <- InMemoryUserRepository.make
      routes = Router("/" -> UserRoutes.routes(repo)).orNotFound
      _ <- EmberServerBuilder
        .default[IO]
        .withHost(host"0.0.0.0")
        .withPort(port"8080")
        .withHttpApp(routes)
        .build
        .use { server =>
          IO.println(s"Server started at http://localhost:8080") *>
          IO.never
        }
    yield ()

실행 및 테스트

서버를 실행하고 curl로 API를 테스트합니다.

# 서버 실행
sbt run
# Server started at http://localhost:8080

# 테스트 (다른 터미널에서)
# 사용자 생성
curl -X POST http://localhost:8080/users \
  -H "Content-Type: application/json" \
  -d '{"name": "Alice", "email": "alice@example.com"}'
# {"id":1,"name":"Alice","email":"alice@example.com"}

# 전체 조회
curl http://localhost:8080/users
# [{"id":1,"name":"Alice","email":"alice@example.com"}]

# 단건 조회
curl http://localhost:8080/users/1

# 수정
curl -X PUT http://localhost:8080/users/1 \
  -H "Content-Type: application/json" \
  -d '{"name": "Alice Kim"}'

# 삭제
curl -X DELETE http://localhost:8080/users/1
핵심 포인트
  • IOApp.Simple: IO 기반 앱의 간편한 진입점
  • EmberServerBuilder: http4s의 경량 서버 구현체
  • Router: 여러 라우트를 하나로 조합
  • use + IO.never: 리소스를 안전하게 관리하며 서버 유지

프로젝트 2: 데이터 파이프라인#

FS2는 함수형 스트림 처리 라이브러리입니다. 메모리 효율적인 스트림 처리와 리소스 안전한 파일 I/O를 제공합니다.

FS2 스트림 처리

build.sbt에 FS2 의존성을 추가합니다.

// build.sbt에 추가
libraryDependencies += "co.fs2" %% "fs2-core" % "3.9.4"
libraryDependencies += "co.fs2" %% "fs2-io"   % "3.9.4"

로그 데이터를 스트림으로 처리하고 집계하는 파이프라인을 구현합니다. FS2 Stream은 지연 평가되며, 대용량 데이터도 일정한 메모리로 처리할 수 있습니다.

// StreamPipeline.scala
import cats.effect.*
import fs2.*
import fs2.io.file.{Files, Path}
import io.circe.parser.*
import io.circe.generic.auto.*
import scala.concurrent.duration.*

case class LogEntry(
  timestamp: String,
  level: String,
  message: String,
  service: String
)

case class LogStats(
  service: String,
  errorCount: Long,
  warnCount: Long,
  infoCount: Long
)

object StreamPipeline extends IOApp.Simple:

  // 1. 파일에서 로그 스트림 읽기
  def readLogs(path: Path): Stream[IO, LogEntry] =
    Files[IO].readUtf8Lines(path)
      .filter(_.nonEmpty)
      .evalMap { line =>
        IO.fromEither(decode[LogEntry](line))
          .handleError(_ => LogEntry("", "UNKNOWN", line, "unknown"))
      }

  // 2. 실시간 로그 집계 (5초 윈도우)
  def aggregateLogs(logs: Stream[IO, LogEntry]): Stream[IO, Map[String, LogStats]] =
    logs
      .groupWithin(1000, 5.seconds)  // 1000개 또는 5초마다 배치
      .map { chunk =>
        chunk.toList
          .groupBy(_.service)
          .map { case (service, entries) =>
            service -> LogStats(
              service = service,
              errorCount = entries.count(_.level == "ERROR"),
              warnCount = entries.count(_.level == "WARN"),
              infoCount = entries.count(_.level == "INFO")
            )
          }
      }

  // 3. 에러 알림 필터
  def alertOnErrors(logs: Stream[IO, LogEntry]): Stream[IO, LogEntry] =
    logs.filter(_.level == "ERROR")

  // 4. 결과 출력
  def printStats(stats: Map[String, LogStats]): IO[Unit] =
    IO.println("=== Log Statistics ===") *>
    stats.values.toList.traverse_ { stat =>
      IO.println(s"  ${stat.service}: E=${stat.errorCount} W=${stat.warnCount} I=${stat.infoCount}")
    }

  def run: IO[Unit] =
    // 샘플 데이터 생성
    val sampleLogs = Stream.emits(List(
      LogEntry("2024-01-15T10:00:00", "INFO", "Server started", "api"),
      LogEntry("2024-01-15T10:00:01", "ERROR", "DB connection failed", "api"),
      LogEntry("2024-01-15T10:00:02", "WARN", "Slow query detected", "db"),
      LogEntry("2024-01-15T10:00:03", "INFO", "Request processed", "api"),
      LogEntry("2024-01-15T10:00:04", "ERROR", "Timeout", "payment"),
      LogEntry("2024-01-15T10:00:05", "INFO", "Cache hit", "cache")
    )).covary[IO]

    // 파이프라인 실행
    for
      // 집계 스트림
      _ <- aggregateLogs(sampleLogs)
        .evalMap(printStats)
        .compile
        .drain

      // 에러 알림 스트림
      _ <- IO.println("\n=== Error Alerts ===")
      _ <- alertOnErrors(sampleLogs)
        .evalMap(e => IO.println(s"  [ALERT] ${e.service}: ${e.message}"))
        .compile
        .drain
    yield ()

groupWithin은 시간 또는 개수 기반의 윈도우 집계를 수행합니다. compile.drain은 스트림을 끝까지 실행하고 결과를 버립니다.

핵심 포인트
  • FS2 Stream: 지연 평가, 메모리 효율적, 대용량 데이터 처리에 적합
  • groupWithin: 시간/개수 기반 윈도우 집계 (실시간 분석에 유용)
  • evalMap: 스트림 요소에 IO 연산 적용
  • compile.drain: 스트림을 끝까지 실행하고 결과 버림

프로젝트 3: CLI 도구#

scopt를 사용하면 타입 안전한 명령줄 파서를 쉽게 구현할 수 있습니다. 서브커맨드, 옵션, 인자를 선언적으로 정의합니다.

scopt를 이용한 명령줄 파서

// build.sbt
libraryDependencies += "com.github.scopt" %% "scopt" % "4.1.0"
// CliTool.scala
import scopt.OParser
import java.io.File

case class Config(
  command: String = "",
  input: Option[File] = None,
  output: Option[File] = None,
  verbose: Boolean = false,
  format: String = "json"
)

object CliTool extends App:
  val builder = OParser.builder[Config]

  val parser = {
    import builder.*
    OParser.sequence(
      programName("scala-cli"),
      head("scala-cli", "1.0"),

      cmd("convert")
        .action((_, c) => c.copy(command = "convert"))
        .text("Convert file format")
        .children(
          opt[File]('i', "input")
            .required()
            .action((x, c) => c.copy(input = Some(x)))
            .text("Input file"),
          opt[File]('o', "output")
            .required()
            .action((x, c) => c.copy(output = Some(x)))
            .text("Output file"),
          opt[String]('f', "format")
            .action((x, c) => c.copy(format = x))
            .text("Output format (json, csv, xml)")
        ),

      cmd("analyze")
        .action((_, c) => c.copy(command = "analyze"))
        .text("Analyze file content")
        .children(
          opt[File]('i', "input")
            .required()
            .action((x, c) => c.copy(input = Some(x)))
            .text("Input file"),
          opt[Unit]('v', "verbose")
            .action((_, c) => c.copy(verbose = true))
            .text("Verbose output")
        ),

      help("help").text("Print help message"),
      version("version").text("Print version")
    )
  }

  OParser.parse(parser, args, Config()) match
    case Some(config) =>
      config.command match
        case "convert" =>
          println(s"Converting ${config.input.get} to ${config.output.get} as ${config.format}")
          // 변환 로직 구현
        case "analyze" =>
          println(s"Analyzing ${config.input.get}")
          if config.verbose then println("Verbose mode enabled")
          // 분석 로직 구현
        case _ =>
          println("No command specified. Use --help for usage.")

    case None =>
      // 파싱 실패 (자동으로 에러 메시지 출력)
      ()

파서 정의는 선언적이며, 잘못된 인자가 주어지면 자동으로 에러 메시지를 출력합니다.

# 사용 예시
sbt "run convert -i input.json -o output.csv -f csv"
sbt "run analyze -i data.json -v"
sbt "run --help"
핵심 포인트
  • scopt OParser: 타입 안전한 명령줄 파서
  • cmd: 서브커맨드 정의 (convert, analyze 등)
  • opt: 옵션 정의 (-i, --input 등)
  • required/optional: 필수/선택 인자 지정
  • 잘못된 인자 시 자동 에러 메시지 출력

공통 패턴: 에러 처리#

Cats의 Validated를 사용하면 여러 검증 오류를 한 번에 수집할 수 있습니다. Either는 첫 번째 오류에서 중단되지만, Validated는 모든 오류를 수집합니다.

Either와 Validated 활용

import cats.data.{Validated, ValidatedNec}
import cats.syntax.all.*

// 검증 규칙 정의
sealed trait ValidationError
case class EmptyField(field: String) extends ValidationError
case class InvalidFormat(field: String, reason: String) extends ValidationError
case class OutOfRange(field: String, min: Int, max: Int) extends ValidationError

type ValidationResult[A] = ValidatedNec[ValidationError, A]

// 검증 함수들
def validateName(name: String): ValidationResult[String] =
  if name.isEmpty then EmptyField("name").invalidNec
  else if name.length < 2 then InvalidFormat("name", "최소 2자 이상").invalidNec
  else name.validNec

def validateEmail(email: String): ValidationResult[String] =
  val emailRegex = """^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$""".r
  if email.isEmpty then EmptyField("email").invalidNec
  else if emailRegex.findFirstIn(email).isEmpty then InvalidFormat("email", "유효하지 않은 이메일").invalidNec
  else email.validNec

def validateAge(age: Int): ValidationResult[Int] =
  if age < 0 || age > 150 then OutOfRange("age", 0, 150).invalidNec
  else age.validNec

// 조합
case class ValidatedUser(name: String, email: String, age: Int)

def validateUser(name: String, email: String, age: Int): ValidationResult[ValidatedUser] =
  (validateName(name), validateEmail(email), validateAge(age)).mapN(ValidatedUser.apply)

// 사용
validateUser("Alice", "alice@example.com", 30)  // Valid(ValidatedUser(...))
validateUser("", "invalid-email", 200)          // Invalid(Chain(EmptyField(name), InvalidFormat(email, ...), OutOfRange(age, ...)))

ValidatedNec는 NonEmptyChain에 오류를 수집합니다. mapN은 모든 검증이 성공했을 때만 결과를 조합합니다. 하나라도 실패하면 모든 실패 사유가 수집됩니다.

핵심 포인트
  • Either: 첫 번째 오류에서 중단 (fail-fast)
  • Validated: 모든 오류 수집 (accumulating errors)
  • ValidatedNec: NonEmptyChain으로 오류 수집 (효율적 추가)
  • mapN: 여러 Validated를 조합, 모두 성공 시에만 결과 생성
  • 사용자 입력 검증처럼 여러 오류를 한 번에 보여줄 때 유용

다음 단계#

실무 프로젝트 예제를 학습했다면 다음 주제들로 학습을 이어가세요.