TL;DR
- REST API Server: Build functional web server with http4s + Circe + Cats Effect
- Data Pipeline: Memory-efficient stream processing with FS2
- CLI Tool: Type-safe command-line parser with scopt
- Error Handling: Collect multiple validation errors simultaneously with Cats Validated
- All examples emphasize functional style with immutability, referential transparency, and type safety
Target Audience: Developers building real services with Scala
Prerequisites:
- Scala basic syntax and functional programming concepts
- sbt build tool usage
- REST API and HTTP basics (Project 1)
- (Optional) Basic knowledge of Cats Effect/IO monad
Examples of building real services with Scala. Implement REST API servers and data pipelines. These examples demonstrate how to write type-safe, maintainable code by leveraging Scala’s functional programming characteristics.
Project 1: REST API Server#
http4s is Scala’s functional HTTP library. Build web servers while maintaining immutability and referential transparency. Circe handles JSON processing, Cats Effect handles asynchronous processing.
Tech Stack
Libraries used in this project. http4s-ember is a lightweight HTTP server/client implementation, and Circe generates JSON codecs at compile time.
- Scala 3 + http4s (functional HTTP library)
- Circe (JSON processing)
- Cats Effect (async processing)
build.sbt
ThisBuild / scalaVersion := "3.3.1"
lazy val root = (project in file("."))
.settings(
name := "scala-api-server",
libraryDependencies ++= Seq(
"org.http4s" %% "http4s-ember-server" % "0.23.25",
"org.http4s" %% "http4s-ember-client" % "0.23.25",
"org.http4s" %% "http4s-circe" % "0.23.25",
"org.http4s" %% "http4s-dsl" % "0.23.25",
"io.circe" %% "circe-generic" % "0.14.6",
"io.circe" %% "circe-parser" % "0.14.6",
"ch.qos.logback" % "logback-classic" % "1.4.14"
)
)Domain Model
First, define domain models to use in the API. Represent immutable data with Case Classes and leverage Circe’s automatic codec generation.
// domain/models.scala
package domain
import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto.*
// Domain model definition
case class UserId(value: Long) extends AnyVal
case class User(
id: UserId,
name: String,
email: String,
createdAt: java.time.Instant
)
case class CreateUserRequest(name: String, email: String)
case class UpdateUserRequest(name: Option[String], email: Option[String])
case class UserResponse(id: Long, name: String, email: String)
// JSON codecs (Circe)
object JsonCodecs:
// UserId codec
given Encoder[UserId] = Encoder.encodeLong.contramap(_.value)
given Decoder[UserId] = Decoder.decodeLong.map(UserId.apply)
// Request/Response codecs
given Decoder[CreateUserRequest] = deriveDecoder
given Decoder[UpdateUserRequest] = deriveDecoder
given Encoder[UserResponse] = deriveEncoder
// User → UserResponse conversion
extension (user: User)
def toResponse: UserResponse = UserResponse(
id = user.id.value,
name = user.name,
email = user.email
)Wrapping UserId as AnyVal provides type safety without runtime overhead. JSON codecs are auto-generated at compile time using Circe’s semiauto.
Key Points
- Case Class: Foundation for immutable data modeling
- AnyVal wrapping: Type safety like
UserId(value: Long)with no runtime overhead- Circe deriveEncoder/Decoder: Auto-generate compile-time JSON codecs
- extension: Add conversion methods to models (
user.toResponse)
Repository (In-Memory)
Define data storage. Use Cats Effect’s Ref to implement thread-safe in-memory storage.
// repository/UserRepository.scala
package repository
import cats.effect.{IO, Ref}
import domain.*
trait UserRepository:
def findById(id: UserId): IO[Option[User]]
def findAll: IO[List[User]]
def create(request: CreateUserRequest): IO[User]
def update(id: UserId, request: UpdateUserRequest): IO[Option[User]]
def delete(id: UserId): IO[Boolean]
object InMemoryUserRepository:
def make: IO[UserRepository] =
for
store <- Ref.of[IO, Map[UserId, User]](Map.empty)
counter <- Ref.of[IO, Long](0L)
yield new UserRepository:
def findById(id: UserId): IO[Option[User]] =
store.get.map(_.get(id))
def findAll: IO[List[User]] =
store.get.map(_.values.toList)
def create(request: CreateUserRequest): IO[User] =
for
newId <- counter.updateAndGet(_ + 1)
user = User(
id = UserId(newId),
name = request.name,
email = request.email,
createdAt = java.time.Instant.now()
)
_ <- store.update(_ + (user.id -> user))
yield user
def update(id: UserId, request: UpdateUserRequest): IO[Option[User]] =
store.modify { currentStore =>
currentStore.get(id) match
case Some(existing) =>
val updated = existing.copy(
name = request.name.getOrElse(existing.name),
email = request.email.getOrElse(existing.email)
)
(currentStore + (id -> updated), Some(updated))
case None =>
(currentStore, None)
}
def delete(id: UserId): IO[Boolean] =
store.modify { currentStore =>
if currentStore.contains(id) then
(currentStore - id, true)
else
(currentStore, false)
}Ref.modify guarantees atomic updates. Defining the interface as a trait makes it easy to switch to actual database implementations later.
Key Points
- trait: Define interface for easy implementation replacement (testing, DB switching)
- Ref[IO, A]: Thread-safe mutable state management
- Ref.modify: Atomic read-modify-write operation
- for comprehension: Sequential composition of IO operations
HTTP Routes
Define RESTful endpoints using http4s DSL. Each route returns an IO value, managing side effects purely.
// routes/UserRoutes.scala
package routes
import cats.effect.IO
import org.http4s.*
import org.http4s.dsl.io.*
import org.http4s.circe.*
import io.circe.syntax.*
import domain.*
import domain.JsonCodecs.given
import repository.UserRepository
object UserRoutes:
def routes(repo: UserRepository): HttpRoutes[IO] = HttpRoutes.of[IO]:
// GET /users - List all
case GET -> Root / "users" =>
for
users <- repo.findAll
response <- Ok(users.map(_.toResponse).asJson)
yield response
// GET /users/:id - Get by ID
case GET -> Root / "users" / LongVar(id) =>
for
userOpt <- repo.findById(UserId(id))
response <- userOpt match
case Some(user) => Ok(user.toResponse.asJson)
case None => NotFound(s"User $id not found")
yield response
// POST /users - Create
case req @ POST -> Root / "users" =>
for
createReq <- req.as[CreateUserRequest]
user <- repo.create(createReq)
response <- Created(user.toResponse.asJson)
yield response
// PUT /users/:id - Update
case req @ PUT -> Root / "users" / LongVar(id) =>
for
updateReq <- req.as[UpdateUserRequest]
userOpt <- repo.update(UserId(id), updateReq)
response <- userOpt match
case Some(user) => Ok(user.toResponse.asJson)
case None => NotFound(s"User $id not found")
yield response
// DELETE /users/:id - Delete
case DELETE -> Root / "users" / LongVar(id) =>
for
deleted <- repo.delete(UserId(id))
response <- if deleted then NoContent() else NotFound(s"User $id not found")
yield response
// JSON decoders
given EntityDecoder[IO, CreateUserRequest] = jsonOf[IO, CreateUserRequest]
given EntityDecoder[IO, UpdateUserRequest] = jsonOf[IO, UpdateUserRequest]Key Points
- http4s DSL: Define routes with pattern matching like
GET -> Root / "users" / LongVar(id)- HttpRoutes[IO]: Pure functional routes, side effects managed by IO
- req.as[T]: Decode request body to type T
- Ok, Created, NotFound: HTTP response generation helpers
Main Application
Combine all components to start the server. Inheriting IOApp.Simple makes it easy to define entry point for IO-based applications.
// Main.scala
import cats.effect.*
import com.comcast.ip4s.*
import org.http4s.ember.server.EmberServerBuilder
import org.http4s.server.Router
import repository.InMemoryUserRepository
import routes.UserRoutes
object Main extends IOApp.Simple:
def run: IO[Unit] =
for
repo <- InMemoryUserRepository.make
routes = Router("/" -> UserRoutes.routes(repo)).orNotFound
_ <- EmberServerBuilder
.default[IO]
.withHost(host"0.0.0.0")
.withPort(port"8080")
.withHttpApp(routes)
.build
.use { server =>
IO.println(s"Server started at http://localhost:8080") *>
IO.never
}
yield ()Run and Test
Run the server and test the API with curl.
# Run server
sbt run
# Server started at http://localhost:8080
# Test (in another terminal)
# Create user
curl -X POST http://localhost:8080/users \
-H "Content-Type: application/json" \
-d '{"name": "Alice", "email": "alice@example.com"}'
# {"id":1,"name":"Alice","email":"alice@example.com"}
# List all
curl http://localhost:8080/users
# Get by ID
curl http://localhost:8080/users/1
# Update
curl -X PUT http://localhost:8080/users/1 \
-H "Content-Type: application/json" \
-d '{"name": "Alice Kim"}'
# Delete
curl -X DELETE http://localhost:8080/users/1Key Points
- IOApp.Simple: Convenient entry point for IO-based apps
- EmberServerBuilder: http4s lightweight server implementation
- Router: Compose multiple routes into one
- use + IO.never: Safely manage resources while keeping server running
Project 2: Data Pipeline#
FS2 is a functional stream processing library. Provides memory-efficient stream processing and resource-safe file I/O.
FS2 Stream Processing
Add FS2 dependencies to build.sbt.
// Add to build.sbt
libraryDependencies += "co.fs2" %% "fs2-core" % "3.9.4"
libraryDependencies += "co.fs2" %% "fs2-io" % "3.9.4"Implement pipeline that processes log data as streams and aggregates. FS2 Streams are lazily evaluated and can process large data with constant memory.
// StreamPipeline.scala
import cats.effect.*
import fs2.*
import fs2.io.file.{Files, Path}
import io.circe.parser.*
import io.circe.generic.auto.*
import scala.concurrent.duration.*
case class LogEntry(
timestamp: String,
level: String,
message: String,
service: String
)
case class LogStats(
service: String,
errorCount: Long,
warnCount: Long,
infoCount: Long
)
object StreamPipeline extends IOApp.Simple:
// 1. Read log stream from file
def readLogs(path: Path): Stream[IO, LogEntry] =
Files[IO].readUtf8Lines(path)
.filter(_.nonEmpty)
.evalMap { line =>
IO.fromEither(decode[LogEntry](line))
.handleError(_ => LogEntry("", "UNKNOWN", line, "unknown"))
}
// 2. Real-time log aggregation (5-second window)
def aggregateLogs(logs: Stream[IO, LogEntry]): Stream[IO, Map[String, LogStats]] =
logs
.groupWithin(1000, 5.seconds) // Batch every 1000 items or 5 seconds
.map { chunk =>
chunk.toList
.groupBy(_.service)
.map { case (service, entries) =>
service -> LogStats(
service = service,
errorCount = entries.count(_.level == "ERROR"),
warnCount = entries.count(_.level == "WARN"),
infoCount = entries.count(_.level == "INFO")
)
}
}
// 3. Error alert filter
def alertOnErrors(logs: Stream[IO, LogEntry]): Stream[IO, LogEntry] =
logs.filter(_.level == "ERROR")
// 4. Print results
def printStats(stats: Map[String, LogStats]): IO[Unit] =
IO.println("=== Log Statistics ===") *>
stats.values.toList.traverse_ { stat =>
IO.println(s" ${stat.service}: E=${stat.errorCount} W=${stat.warnCount} I=${stat.infoCount}")
}
def run: IO[Unit] =
// Sample data (would normally read from file)
val sampleLogs = Stream.emits(List(
LogEntry("2024-01-15T10:00:00", "INFO", "Server started", "api"),
LogEntry("2024-01-15T10:00:01", "ERROR", "DB connection failed", "api"),
LogEntry("2024-01-15T10:00:02", "WARN", "Slow query detected", "db"),
LogEntry("2024-01-15T10:00:03", "INFO", "Request processed", "api"),
LogEntry("2024-01-15T10:00:04", "ERROR", "Timeout", "payment"),
LogEntry("2024-01-15T10:00:05", "INFO", "Cache hit", "cache")
)).covary[IO]
// Execute pipeline
for
// Aggregation stream
_ <- aggregateLogs(sampleLogs)
.evalMap(printStats)
.compile
.drain
// Error alert stream
_ <- IO.println("\n=== Error Alerts ===")
_ <- alertOnErrors(sampleLogs)
.evalMap(e => IO.println(s" [ALERT] ${e.service}: ${e.message}"))
.compile
.drain
yield ()groupWithin performs time or count-based window aggregation. compile.drain executes the stream to the end and discards results.
Key Points
- FS2 Stream: Lazy evaluation, memory efficient, suitable for large data processing
- groupWithin: Time/count-based window aggregation (useful for real-time analysis)
- evalMap: Apply IO operations to stream elements
- compile.drain: Execute stream to end and discard results
Project 3: CLI Tool#
Using scopt makes it easy to implement type-safe command-line parsers. Declaratively define subcommands, options, and arguments.
Command-line Parser with scopt
// build.sbt
libraryDependencies += "com.github.scopt" %% "scopt" % "4.1.0"// CliTool.scala
import scopt.OParser
import java.io.File
case class Config(
command: String = "",
input: Option[File] = None,
output: Option[File] = None,
verbose: Boolean = false,
format: String = "json"
)
object CliTool extends App:
val builder = OParser.builder[Config]
val parser = {
import builder.*
OParser.sequence(
programName("scala-cli"),
head("scala-cli", "1.0"),
cmd("convert")
.action((_, c) => c.copy(command = "convert"))
.text("Convert file format")
.children(
opt[File]('i', "input")
.required()
.action((x, c) => c.copy(input = Some(x)))
.text("Input file"),
opt[File]('o', "output")
.required()
.action((x, c) => c.copy(output = Some(x)))
.text("Output file"),
opt[String]('f', "format")
.action((x, c) => c.copy(format = x))
.text("Output format (json, csv, xml)")
),
cmd("analyze")
.action((_, c) => c.copy(command = "analyze"))
.text("Analyze file content")
.children(
opt[File]('i', "input")
.required()
.action((x, c) => c.copy(input = Some(x)))
.text("Input file"),
opt[Unit]('v', "verbose")
.action((_, c) => c.copy(verbose = true))
.text("Verbose output")
),
help("help").text("Print help message"),
version("version").text("Print version")
)
}
OParser.parse(parser, args, Config()) match
case Some(config) =>
config.command match
case "convert" =>
println(s"Converting ${config.input.get} to ${config.output.get} as ${config.format}")
// Implement conversion logic
case "analyze" =>
println(s"Analyzing ${config.input.get}")
if config.verbose then println("Verbose mode enabled")
// Implement analysis logic
case _ =>
println("No command specified. Use --help for usage.")
case None =>
// Parsing failed (error message automatically printed)
()Parser definition is declarative and automatically prints error messages for invalid arguments.
# Usage examples
sbt "run convert -i input.json -o output.csv -f csv"
sbt "run analyze -i data.json -v"
sbt "run --help"Key Points
- scopt OParser: Type-safe command-line parser
- cmd: Define subcommands (convert, analyze, etc.)
- opt: Define options (
-i,--input, etc.)- required/optional: Specify mandatory/optional arguments
- Automatically print error messages for invalid arguments
Common Pattern: Error Handling#
Using Cats Validated allows collecting multiple validation errors at once. Either stops at the first error, but Validated collects all errors.
Using Either and Validated
import cats.data.{Validated, ValidatedNec}
import cats.syntax.all.*
// Validation rules
sealed trait ValidationError
case class EmptyField(field: String) extends ValidationError
case class InvalidFormat(field: String, reason: String) extends ValidationError
case class OutOfRange(field: String, min: Int, max: Int) extends ValidationError
type ValidationResult[A] = ValidatedNec[ValidationError, A]
// Validation functions
def validateName(name: String): ValidationResult[String] =
if name.isEmpty then EmptyField("name").invalidNec
else if name.length < 2 then InvalidFormat("name", "minimum 2 characters").invalidNec
else name.validNec
def validateEmail(email: String): ValidationResult[String] =
val emailRegex = """^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$""".r
if email.isEmpty then EmptyField("email").invalidNec
else if emailRegex.findFirstIn(email).isEmpty then InvalidFormat("email", "invalid email").invalidNec
else email.validNec
def validateAge(age: Int): ValidationResult[Int] =
if age < 0 || age > 150 then OutOfRange("age", 0, 150).invalidNec
else age.validNec
// Combine
case class ValidatedUser(name: String, email: String, age: Int)
def validateUser(name: String, email: String, age: Int): ValidationResult[ValidatedUser] =
(validateName(name), validateEmail(email), validateAge(age)).mapN(ValidatedUser.apply)
// Usage
validateUser("Alice", "alice@example.com", 30) // Valid(ValidatedUser(...))
validateUser("", "invalid-email", 200) // Invalid(Chain(EmptyField(name), InvalidFormat(email, ...), OutOfRange(age, ...)))ValidatedNec collects errors in NonEmptyChain. mapN combines results only when all validations succeed. If any fail, all failure reasons are collected.
Key Points
- Either: Stop at first error (fail-fast)
- Validated: Collect all errors (accumulating errors)
- ValidatedNec: Collect errors in NonEmptyChain (efficient appending)
- mapN: Combine multiple Validateds, generate result only when all succeed
- Useful when showing multiple errors at once like user input validation
Next Steps#
After learning practical project examples, continue with these topics.
- Spark Integration - Large-scale data processing
- Functional Patterns - Cats, ZIO usage
- Concurrency - Future, IO deep dive