소요 시간: 약 25분
TL;DR
spring-kafka의존성 추가 후application.yml설정KafkaTemplate<String, String>으로 메시지 발행@KafkaListener(topics = ["..."], groupId = "...")로 메시지 수신- Consumer Group 네이밍 규칙:
{app-name}-group
대상 독자: Kotlin + Spring Boot 기초를 이해한 개발자 선수 지식: Spring Boot 연동, Kafka 기초
Kotlin과 Spring Kafka를 사용하여 메시지를 발행하고 수신하는 예제를 구현합니다. 이 예제는 사이트의 docker/ 디렉토리에 있는 Kafka(KRaft 모드, 포트 9092)를 사용합니다.
Step 1 — Kafka 브로커 실행#
# 프로젝트 루트에서
cd docker
docker-compose up -d
# 상태 확인
docker-compose ps정상 실행 시 kafka 컨테이너가 Up 상태여야 합니다.
Step 2 — 프로젝트 설정#
build.gradle.kts
plugins {
kotlin("jvm") version "2.0.0"
kotlin("plugin.spring") version "2.0.0"
id("org.springframework.boot") version "3.2.5"
id("io.spring.dependency-management") version "1.1.5"
}
group = "com.example"
version = "0.0.1-SNAPSHOT"
kotlin {
jvmToolchain(17)
compilerOptions {
freeCompilerArgs.addAll("-Xjsr305=strict")
}
}
dependencies {
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.kafka:spring-kafka")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin")
implementation("org.jetbrains.kotlin:kotlin-reflect")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test")
}src/main/resources/application.yml
spring:
application:
name: kotlin-kafka-demo
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all # 모든 리플리카 확인 후 완료
retries: 3
consumer:
group-id: kotlin-kafka-demo-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false # 수동 커밋 권장
listener:
ack-mode: manual_immediate # @KafkaListener의 Acknowledgment 파라미터 활성화
server:
port: 8080
management:
endpoints:
web:
exposure:
include: health,infoStep 3 — Kafka 설정 클래스#
// src/main/kotlin/com/example/kafka/config/KafkaTopicConfig.kt
package com.example.kafka.config
import org.apache.kafka.clients.admin.NewTopic
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.config.TopicBuilder
@Configuration
class KafkaTopicConfig {
companion object {
const val TOPIC_MESSAGES = "kotlin.messages"
const val TOPIC_ORDERS = "kotlin.orders"
}
@Bean
fun messagesTopic(): NewTopic = TopicBuilder
.name(TOPIC_MESSAGES)
.partitions(3)
.replicas(1)
.build()
@Bean
fun ordersTopic(): NewTopic = TopicBuilder
.name(TOPIC_ORDERS)
.partitions(3)
.replicas(1)
.build()
}Step 4 — 도메인 모델#
// src/main/kotlin/com/example/kafka/domain/Message.kt
package com.example.kafka.domain
import java.time.LocalDateTime
data class Message(
val id: String,
val content: String,
val sender: String,
val timestamp: LocalDateTime = LocalDateTime.now()
)
data class Order(
val orderId: String,
val productId: String,
val quantity: Int,
val amount: Long,
val customerId: String
)Step 5 — Producer 서비스#
// src/main/kotlin/com/example/kafka/producer/MessageProducer.kt
package com.example.kafka.producer
import com.example.kafka.config.KafkaTopicConfig
import com.example.kafka.domain.Message
import com.example.kafka.domain.Order
import com.fasterxml.jackson.databind.ObjectMapper
import org.slf4j.LoggerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Service
import java.util.UUID
@Service
class MessageProducer(
private val kafkaTemplate: KafkaTemplate<String, String>,
private val objectMapper: ObjectMapper
) {
private val log = LoggerFactory.getLogger(this::class.java)
fun sendMessage(content: String, sender: String): String {
val messageId = UUID.randomUUID().toString()
val message = Message(
id = messageId,
content = content,
sender = sender
)
val payload = objectMapper.writeValueAsString(message)
kafkaTemplate.send(KafkaTopicConfig.TOPIC_MESSAGES, messageId, payload)
.whenComplete { result, ex ->
if (ex != null) {
log.error("메시지 전송 실패: $messageId", ex)
} else {
log.info(
"메시지 전송 완료: $messageId " +
"(파티션=${result.recordMetadata.partition()}, " +
"오프셋=${result.recordMetadata.offset()})"
)
}
}
return messageId
}
fun sendOrder(order: Order) {
val payload = objectMapper.writeValueAsString(order)
kafkaTemplate.send(KafkaTopicConfig.TOPIC_ORDERS, order.orderId, payload)
.whenComplete { _, ex ->
if (ex != null) {
log.error("주문 전송 실패: ${order.orderId}", ex)
} else {
log.info("주문 전송 완료: ${order.orderId}")
}
}
}
}Step 6 — Consumer 서비스#
// src/main/kotlin/com/example/kafka/consumer/MessageConsumer.kt
package com.example.kafka.consumer
import com.example.kafka.config.KafkaTopicConfig
import com.example.kafka.domain.Message
import com.example.kafka.domain.Order
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.slf4j.LoggerFactory
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.kafka.support.Acknowledgment
import org.springframework.stereotype.Service
@Service
class MessageConsumer(
private val objectMapper: ObjectMapper
) {
private val log = LoggerFactory.getLogger(this::class.java)
@KafkaListener(
topics = [KafkaTopicConfig.TOPIC_MESSAGES],
groupId = "kotlin-kafka-demo-group"
)
fun consumeMessage(record: ConsumerRecord<String, String>) {
val message = objectMapper.readValue(record.value(), Message::class.java)
log.info(
"메시지 수신 — ID: ${message.id}, " +
"발신자: ${message.sender}, " +
"내용: ${message.content}, " +
"파티션: ${record.partition()}, 오프셋: ${record.offset()}"
)
// 실제 비즈니스 로직 처리
processMessage(message)
}
@KafkaListener(
topics = [KafkaTopicConfig.TOPIC_ORDERS],
groupId = "kotlin-kafka-demo-group",
containerFactory = "kafkaListenerContainerFactory"
)
fun consumeOrder(
record: ConsumerRecord<String, String>,
ack: Acknowledgment
) {
try {
val order = objectMapper.readValue(record.value(), Order::class.java)
log.info("주문 수신 — 주문ID: ${order.orderId}, 고객: ${order.customerId}")
processOrder(order)
ack.acknowledge() // 수동 커밋
} catch (ex: Exception) {
log.error("주문 처리 실패: ${record.key()}", ex)
// 재처리 전략: Dead Letter Queue로 전송 등
}
}
private fun processMessage(message: Message) {
log.info("메시지 처리 완료: ${message.id}")
}
private fun processOrder(order: Order) {
log.info("주문 처리 완료: ${order.orderId}, 금액: ${order.amount}원")
}
}Step 7 — REST 컨트롤러#
// src/main/kotlin/com/example/kafka/controller/KafkaController.kt
package com.example.kafka.controller
import com.example.kafka.domain.Order
import com.example.kafka.producer.MessageProducer
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.*
import java.util.UUID
data class SendMessageRequest(val message: String, val sender: String = "anonymous")
data class SendMessageResponse(val messageId: String, val status: String)
data class SendOrderRequest(
val productId: String,
val quantity: Int,
val amount: Long,
val customerId: String
)
@RestController
@RequestMapping("/api/kafka")
class KafkaController(
private val messageProducer: MessageProducer
) {
@PostMapping("/messages")
@ResponseStatus(HttpStatus.ACCEPTED)
fun sendMessage(@RequestBody request: SendMessageRequest): SendMessageResponse {
val messageId = messageProducer.sendMessage(request.message, request.sender)
return SendMessageResponse(messageId, "ACCEPTED")
}
@PostMapping("/orders")
@ResponseStatus(HttpStatus.ACCEPTED)
fun sendOrder(@RequestBody request: SendOrderRequest): Map<String, String> {
val order = Order(
orderId = UUID.randomUUID().toString(),
productId = request.productId,
quantity = request.quantity,
amount = request.amount,
customerId = request.customerId
)
messageProducer.sendOrder(order)
return mapOf("orderId" to order.orderId, "status" to "ACCEPTED")
}
}Step 8 — 애플리케이션 실행#
// src/main/kotlin/com/example/kafka/KafkaApplication.kt
package com.example.kafka
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
@SpringBootApplication
class KafkaApplication
fun main(args: Array<String>) {
runApplication<KafkaApplication>(*args)
}./gradlew bootRunStep 9 — 메시지 송수신 테스트#
서버가 실행 중인 상태에서 아래 curl 명령으로 테스트합니다.
# 메시지 전송
curl -X POST http://localhost:8080/api/kafka/messages \
-H "Content-Type: application/json" \
-d '{"message":"안녕하세요, Kafka!","sender":"홍길동"}'
# 응답
# {"messageId":"550e8400-e29b-41d4-a716-446655440000","status":"ACCEPTED"}
# 주문 전송
curl -X POST http://localhost:8080/api/kafka/orders \
-H "Content-Type: application/json" \
-d '{"productId":"PROD-001","quantity":2,"amount":50000,"customerId":"user-123"}'
# 응답
# {"orderId":"...","status":"ACCEPTED"}서버 로그에서 Consumer가 메시지를 수신했는지 확인합니다.
INFO MessageConsumer - 메시지 수신 — ID: 550e8400..., 발신자: 홍길동, 내용: 안녕하세요, Kafka!, 파티션: 0, 오프셋: 0
INFO MessageConsumer - 메시지 처리 완료: 550e8400...Step 10 — Consumer Group 확인#
# Kafka 컨테이너에서 Consumer Group 상태 확인
docker exec -it kafka \
kafka-consumer-groups.sh \
--bootstrap-server localhost:9092 \
--describe \
--group kotlin-kafka-demo-group핵심 정리
KafkaTemplate<String, String>.send(topic, key, value)— 메시지 발행@KafkaListener(topics = [...], groupId = "...")— 메시지 수신 선언- Consumer Group ID 규칙:
{app-name}-group(예:kotlin-kafka-demo-group)Acknowledgment.acknowledge()— 수동 커밋으로 메시지 처리 보장- JSON 직렬화에
jackson-module-kotlin필요
테스트 작성#
spring-kafka-test의 @EmbeddedKafka를 사용하면 실제 Kafka 브로커 없이 인메모리 브로커로 Producer/Consumer를 검증할 수 있습니다. build.gradle.kts에 이미 포함된 spring-kafka-test를 확인합니다.
// build.gradle.kts — testImplementation 블록 확인
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("org.springframework.kafka:spring-kafka-test") // @EmbeddedKafka 포함Producer 단위 테스트 + Consumer 수신 검증
@EmbeddedKafka로 인메모리 브로커를 시작하고, KafkaTemplate으로 메시지를 발행한 뒤 Consumer가 수신했는지 CountDownLatch로 검증합니다.
// src/test/kotlin/com/example/kafka/producer/MessageProducerTest.kt
package com.example.kafka.producer
import com.example.kafka.config.KafkaTopicConfig
import com.example.kafka.domain.Message
import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.test.context.SpringBootTest
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.kafka.listener.KafkaMessageListenerContainer
import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.EmbeddedKafkaBroker
import org.springframework.kafka.test.context.EmbeddedKafka
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
@SpringBootTest(
properties = ["spring.kafka.bootstrap-servers=\${spring.embedded.kafka.brokers}"]
)
@EmbeddedKafka(
partitions = 1,
topics = [KafkaTopicConfig.TOPIC_MESSAGES]
)
class MessageProducerTest @Autowired constructor(
private val messageProducer: MessageProducer,
private val objectMapper: ObjectMapper,
private val embeddedKafkaBroker: EmbeddedKafkaBroker
) {
@Test
fun `메시지를 발행하면 Consumer가 수신한다`() {
// Consumer 설정
val consumerProps = KafkaTestUtils.consumerProps(
"test-group", "true", embeddedKafkaBroker
).apply {
put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer::class.java)
put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
}
val consumerFactory = DefaultKafkaConsumerFactory<String, String>(consumerProps)
val containerProps = ContainerProperties(KafkaTopicConfig.TOPIC_MESSAGES)
val latch = CountDownLatch(1)
var receivedValue: String? = null
containerProps.messageListener = MessageListener<String, String> { record ->
receivedValue = record.value()
latch.countDown()
}
val container = KafkaMessageListenerContainer(consumerFactory, containerProps)
container.start()
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.partitionsPerTopic)
// Producer 실행
val messageId = messageProducer.sendMessage("테스트 메시지", "테스터")
// 최대 5초 대기
assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue()
container.stop()
// 수신된 메시지 검증
val received = objectMapper.readValue(receivedValue, Message::class.java)
assertThat(received.id).isEqualTo(messageId)
assertThat(received.content).isEqualTo("테스트 메시지")
assertThat(received.sender).isEqualTo("테스터")
}
}@EmbeddedKafka 속성
partitions— 토픽당 파티션 수 (기본값 1)topics— 미리 생성할 토픽 목록spring.embedded.kafka.brokers— 인메모리 브로커 주소 (자동 주입)
spring.kafka.bootstrap-servers=\${spring.embedded.kafka.brokers}를@SpringBootTest속성으로 지정하면 애플리케이션 설정이 인메모리 브로커를 가리킵니다.
테스트를 실행합니다.
./gradlew test다음 단계#
💡 함께 읽기:
- Kafka Producer/Consumer 구현 — 같은 패턴을 Java + Spring Kafka로 비교해 봅니다.
- Consumer Group 개념 —
groupId설정이 파티션 분배에 미치는 영향을 이해합니다.- 에러 처리 패턴 — Dead Letter Queue 등 실무 재처리 전략을 다룹니다.