TL;DR
  • Event Sourcing: 상태 대신 이벤트를 저장. 이벤트 재생으로 상태 복원
  • Event Store: 이벤트 영속화 저장소. 버전 기반 낙관적 동시성 제어
  • Event-Sourced Aggregate: apply/when 패턴으로 이벤트 적용 및 상태 변경
  • Snapshot: 성능 최적화를 위해 주기적으로 상태 스냅샷 저장
  • 시점 복원: 특정 버전까지의 이벤트만 재생하여 과거 상태 조회 가능

대상 독자 및 선수 지식#

항목요구 수준
대상 독자Event Sourcing 패턴을 실제로 구현해보려는 개발자
DDDAggregate, Domain Event 개념 이해
JavaSwitch Expression, Pattern Matching 문법
선수 문서주문 도메인, 애플리케이션 계층 완료

Event Sourcing 패턴을 실제 주문 도메인에 구현합니다. 상태 대신 이벤트를 저장하고, 이벤트를 재생하여 상태를 복원합니다.

Event Sourcing이란?#

기존 방식 vs Event Sourcing#

flowchart LR
    subgraph Traditional["기존 방식: 상태 저장"]
        T1[Order] --> T2[(DB)]
        T2 --> T3["status: CONFIRMED<br>amount: 50000"]
    end

    subgraph ES["Event Sourcing: 이벤트 저장"]
        E1[Order] --> E2[(Event Store)]
        E2 --> E3["1. OrderCreated<br>2. ItemAdded<br>3. OrderConfirmed"]
    end

다이어그램 설명: 왼쪽(기존 방식)은 Order가 DB에 현재 상태(status: CONFIRMED, amount: 50000)를 저장합니다. 오른쪽(Event Sourcing)은 Order가 Event Store에 이벤트 시퀀스(OrderCreated, ItemAdded, OrderConfirmed)를 저장합니다.

구분기존 방식Event Sourcing
저장 대상현재 상태모든 이벤트
이력별도 관리 필요자동으로 보존
시점 복원불가능특정 시점 재현 가능
디버깅어려움이벤트 추적 용이
복잡도낮음높음
핵심 포인트: Event Sourcing 개념
  • 상태 저장 X, 이벤트 저장 O: 현재 상태는 이벤트 재생으로 계산
  • 완전한 이력: 모든 변경 이력이 자동으로 보존됨
  • 시간 여행: 특정 시점의 상태를 언제든 복원 가능
  • 트레이드오프: 구현 복잡도 증가, 조회 성능 고려 필요

도메인 이벤트 정의#

이벤트 베이스 클래스#

// DomainEvent.java
public abstract class DomainEvent {
    private final String eventId;
    private final String aggregateId;
    private final long version;
    private final Instant occurredAt;

    protected DomainEvent(String aggregateId, long version) {
        this.eventId = UUID.randomUUID().toString();
        this.aggregateId = aggregateId;
        this.version = version;
        this.occurredAt = Instant.now();
    }

    public abstract String getEventType();

    // getters
}

주문 도메인 이벤트#

// OrderCreatedEvent.java
public class OrderCreatedEvent extends DomainEvent {
    private final String customerId;
    private final String shippingAddress;

    public OrderCreatedEvent(String orderId, long version,
                             String customerId, String shippingAddress) {
        super(orderId, version);
        this.customerId = customerId;
        this.shippingAddress = shippingAddress;
    }

    @Override
    public String getEventType() {
        return "ORDER_CREATED";
    }
}

// OrderItemAddedEvent.java
public class OrderItemAddedEvent extends DomainEvent {
    private final String productId;
    private final String productName;
    private final int quantity;
    private final Money unitPrice;

    public OrderItemAddedEvent(String orderId, long version,
                               String productId, String productName,
                               int quantity, Money unitPrice) {
        super(orderId, version);
        this.productId = productId;
        this.productName = productName;
        this.quantity = quantity;
        this.unitPrice = unitPrice;
    }

    @Override
    public String getEventType() {
        return "ORDER_ITEM_ADDED";
    }
}

// OrderConfirmedEvent.java
public class OrderConfirmedEvent extends DomainEvent {
    private final Money totalAmount;
    private final Instant confirmedAt;

    public OrderConfirmedEvent(String orderId, long version, Money totalAmount) {
        super(orderId, version);
        this.totalAmount = totalAmount;
        this.confirmedAt = Instant.now();
    }

    @Override
    public String getEventType() {
        return "ORDER_CONFIRMED";
    }
}

// OrderCancelledEvent.java
public class OrderCancelledEvent extends DomainEvent {
    private final String reason;
    private final Instant cancelledAt;

    public OrderCancelledEvent(String orderId, long version, String reason) {
        super(orderId, version);
        this.reason = reason;
        this.cancelledAt = Instant.now();
    }

    @Override
    public String getEventType() {
        return "ORDER_CANCELLED";
    }
}
핵심 포인트: 도메인 이벤트
  • 버전 포함: 각 이벤트는 Aggregate의 버전 번호를 포함
  • 불변: 이벤트는 한번 생성되면 변경 불가
  • 자기 설명적: getEventType()으로 이벤트 종류 식별
  • 필요한 데이터만: 상태 복원에 필요한 최소한의 데이터만 포함

Event-Sourced Aggregate#

Order Aggregate#

public class Order extends EventSourcedAggregate {
    private String customerId;
    private String shippingAddress;
    private List<OrderLine> orderLines = new ArrayList<>();
    private OrderStatus status;
    private Money totalAmount;
    private Instant confirmedAt;
    private String cancelReason;

    // 빈 생성자 (이벤트 재생용)
    public Order() {
        super();
    }

    // 팩토리 메서드: 새 주문 생성
    public static Order create(String orderId, String customerId, String shippingAddress) {
        Order order = new Order();
        order.apply(new OrderCreatedEvent(orderId, 1, customerId, shippingAddress));
        return order;
    }

    // 이벤트에서 복원
    public static Order reconstitute(String orderId, List<DomainEvent> events) {
        Order order = new Order();
        events.forEach(order::apply);
        return order;
    }

    // 명령: 상품 추가
    public void addItem(String productId, String productName, int quantity, Money unitPrice) {
        if (status != OrderStatus.PENDING) {
            throw new IllegalStateException("확정된 주문에는 상품을 추가할 수 없습니다.");
        }
        if (quantity <= 0) {
            throw new IllegalArgumentException("수량은 1 이상이어야 합니다.");
        }

        apply(new OrderItemAddedEvent(getId(), nextVersion(), productId, productName, quantity, unitPrice));
    }

    // 명령: 주문 확정
    public void confirm() {
        if (status != OrderStatus.PENDING) {
            throw new IllegalStateException("대기 중인 주문만 확정할 수 있습니다.");
        }
        if (orderLines.isEmpty()) {
            throw new IllegalStateException("상품이 없는 주문은 확정할 수 없습니다.");
        }

        Money total = calculateTotal();
        apply(new OrderConfirmedEvent(getId(), nextVersion(), total));
    }

    // 명령: 주문 취소
    public void cancel(String reason) {
        if (status == OrderStatus.CANCELLED) {
            throw new IllegalStateException("이미 취소된 주문입니다.");
        }
        if (status == OrderStatus.SHIPPED) {
            throw new IllegalStateException("배송 중인 주문은 취소할 수 없습니다.");
        }

        apply(new OrderCancelledEvent(getId(), nextVersion(), reason));
    }

    // 이벤트 핸들러: 상태 변경
    @Override
    protected void when(DomainEvent event) {
        switch (event) {
            case OrderCreatedEvent e -> {
                setId(e.getAggregateId());
                this.customerId = e.getCustomerId();
                this.shippingAddress = e.getShippingAddress();
                this.status = OrderStatus.PENDING;
                this.orderLines = new ArrayList<>();
            }
            case OrderItemAddedEvent e -> {
                this.orderLines.add(new OrderLine(
                    e.getProductId(),
                    e.getProductName(),
                    e.getQuantity(),
                    e.getUnitPrice()
                ));
            }
            case OrderConfirmedEvent e -> {
                this.status = OrderStatus.CONFIRMED;
                this.totalAmount = e.getTotalAmount();
                this.confirmedAt = e.getConfirmedAt();
            }
            case OrderCancelledEvent e -> {
                this.status = OrderStatus.CANCELLED;
                this.cancelReason = e.getReason();
            }
            default -> throw new IllegalArgumentException("Unknown event: " + event.getClass());
        }
    }

    private Money calculateTotal() {
        return orderLines.stream()
            .map(line -> line.getUnitPrice().multiply(line.getQuantity()))
            .reduce(Money.ZERO, Money::add);
    }
}

EventSourcedAggregate 베이스 클래스#

public abstract class EventSourcedAggregate {
    private String id;
    private long version = 0;
    private final List<DomainEvent> uncommittedEvents = new ArrayList<>();

    protected void apply(DomainEvent event) {
        when(event);
        this.version = event.getVersion();
        uncommittedEvents.add(event);
    }

    protected abstract void when(DomainEvent event);

    protected long nextVersion() {
        return version + 1;
    }

    public List<DomainEvent> getUncommittedEvents() {
        return List.copyOf(uncommittedEvents);
    }

    public void markEventsAsCommitted() {
        uncommittedEvents.clear();
    }

    // getters, setters
}
핵심 포인트: Event-Sourced Aggregate
  • apply/when 패턴: apply()가 이벤트를 기록하고, when()이 상태 변경
  • 명령 메서드: 유효성 검증 후 이벤트 생성 (addItem, confirm, cancel)
  • reconstitute: 이벤트 리스트로부터 상태 복원
  • uncommittedEvents: 저장되지 않은 이벤트 추적
  • 버전 관리: nextVersion()으로 순차적 버전 번호 할당

Event Store 구현#

EventStore 인터페이스#

public interface EventStore {
    void append(String aggregateId, List<DomainEvent> events, long expectedVersion);
    List<DomainEvent> load(String aggregateId);
    List<DomainEvent> loadFromVersion(String aggregateId, long fromVersion);
}

JPA 기반 구현#

@Entity
@Table(name = "event_store")
public class EventEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false)
    private String aggregateId;

    @Column(nullable = false)
    private String aggregateType;

    @Column(nullable = false)
    private String eventType;

    @Column(nullable = false)
    private long version;

    @Column(columnDefinition = "TEXT", nullable = false)
    private String payload;  // JSON

    @Column(nullable = false)
    private Instant occurredAt;

    @Column(nullable = false)
    private Instant storedAt;
}

@Repository
public interface EventEntityRepository extends JpaRepository<EventEntity, Long> {
    List<EventEntity> findByAggregateIdOrderByVersionAsc(String aggregateId);
    List<EventEntity> findByAggregateIdAndVersionGreaterThanOrderByVersionAsc(
        String aggregateId, long version);
    Optional<EventEntity> findTopByAggregateIdOrderByVersionDesc(String aggregateId);
}

@Component
@RequiredArgsConstructor
public class JpaEventStore implements EventStore {
    private final EventEntityRepository repository;
    private final ObjectMapper objectMapper;

    @Override
    @Transactional
    public void append(String aggregateId, List<DomainEvent> events, long expectedVersion) {
        // 낙관적 동시성 체크
        Optional<EventEntity> lastEvent = repository.findTopByAggregateIdOrderByVersionDesc(aggregateId);
        long currentVersion = lastEvent.map(EventEntity::getVersion).orElse(0L);

        if (currentVersion != expectedVersion) {
            throw new OptimisticLockingException(
                String.format("Expected version %d but was %d", expectedVersion, currentVersion)
            );
        }

        // 이벤트 저장
        List<EventEntity> entities = events.stream()
            .map(this::toEntity)
            .toList();

        repository.saveAll(entities);
    }

    @Override
    public List<DomainEvent> load(String aggregateId) {
        return repository.findByAggregateIdOrderByVersionAsc(aggregateId).stream()
            .map(this::toDomainEvent)
            .toList();
    }

    @Override
    public List<DomainEvent> loadFromVersion(String aggregateId, long fromVersion) {
        return repository.findByAggregateIdAndVersionGreaterThanOrderByVersionAsc(aggregateId, fromVersion)
            .stream()
            .map(this::toDomainEvent)
            .toList();
    }

    private EventEntity toEntity(DomainEvent event) {
        EventEntity entity = new EventEntity();
        entity.setAggregateId(event.getAggregateId());
        entity.setAggregateType("Order");
        entity.setEventType(event.getEventType());
        entity.setVersion(event.getVersion());
        entity.setPayload(serialize(event));
        entity.setOccurredAt(event.getOccurredAt());
        entity.setStoredAt(Instant.now());
        return entity;
    }

    private DomainEvent toDomainEvent(EventEntity entity) {
        return deserialize(entity.getPayload(), entity.getEventType());
    }

    private String serialize(DomainEvent event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to serialize event", e);
        }
    }

    private DomainEvent deserialize(String payload, String eventType) {
        try {
            Class<? extends DomainEvent> eventClass = getEventClass(eventType);
            return objectMapper.readValue(payload, eventClass);
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to deserialize event", e);
        }
    }

    private Class<? extends DomainEvent> getEventClass(String eventType) {
        return switch (eventType) {
            case "ORDER_CREATED" -> OrderCreatedEvent.class;
            case "ORDER_ITEM_ADDED" -> OrderItemAddedEvent.class;
            case "ORDER_CONFIRMED" -> OrderConfirmedEvent.class;
            case "ORDER_CANCELLED" -> OrderCancelledEvent.class;
            default -> throw new IllegalArgumentException("Unknown event type: " + eventType);
        };
    }
}
핵심 포인트: Event Store
  • Append-Only: 이벤트는 추가만 가능, 수정/삭제 불가
  • 낙관적 동시성: expectedVersion 검증으로 동시 수정 충돌 감지
  • JSON 직렬화: 이벤트를 JSON으로 저장하여 유연성 확보
  • 버전별 조회: loadFromVersion()으로 특정 버전 이후 이벤트만 로드

Repository 구현#

OrderRepository#

public interface OrderRepository {
    void save(Order order);
    Optional<Order> findById(String orderId);
}

@Component
@RequiredArgsConstructor
public class EventSourcedOrderRepository implements OrderRepository {
    private final EventStore eventStore;

    @Override
    @Transactional
    public void save(Order order) {
        List<DomainEvent> uncommittedEvents = order.getUncommittedEvents();
        if (uncommittedEvents.isEmpty()) {
            return;
        }

        long expectedVersion = order.getVersion() - uncommittedEvents.size();
        eventStore.append(order.getId(), uncommittedEvents, expectedVersion);
        order.markEventsAsCommitted();
    }

    @Override
    public Optional<Order> findById(String orderId) {
        List<DomainEvent> events = eventStore.load(orderId);
        if (events.isEmpty()) {
            return Optional.empty();
        }
        return Optional.of(Order.reconstitute(orderId, events));
    }
}
핵심 포인트: Repository
  • Event Store 위임: 이벤트 저장/조회를 Event Store에 위임
  • 버전 계산: 저장 시 expectedVersion = 현재 버전 - 미커밋 이벤트 수
  • reconstitute 활용: 이벤트로부터 Aggregate 복원
  • Domain Repository 인터페이스: 도메인 계층에 인터페이스 정의

스냅샷 (Snapshot)#

성능 최적화를 위한 스냅샷#

@Entity
@Table(name = "snapshots")
public class SnapshotEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    private String aggregateId;
    private String aggregateType;
    private long version;

    @Column(columnDefinition = "TEXT")
    private String state;  // JSON

    private Instant createdAt;
}

@Component
@RequiredArgsConstructor
public class SnapshotStore {
    private static final int SNAPSHOT_THRESHOLD = 100;  // 100개 이벤트마다 스냅샷

    private final SnapshotRepository snapshotRepository;
    private final EventStore eventStore;
    private final ObjectMapper objectMapper;

    public Optional<Order> loadWithSnapshot(String orderId) {
        // 1. 스냅샷 로드
        Optional<SnapshotEntity> snapshot = snapshotRepository
            .findTopByAggregateIdOrderByVersionDesc(orderId);

        if (snapshot.isEmpty()) {
            // 스냅샷 없으면 전체 이벤트에서 복원
            List<DomainEvent> events = eventStore.load(orderId);
            return events.isEmpty() ? Optional.empty()
                : Optional.of(Order.reconstitute(orderId, events));
        }

        // 2. 스냅샷에서 상태 복원
        Order order = deserializeOrder(snapshot.get().getState());

        // 3. 스냅샷 이후 이벤트만 로드
        List<DomainEvent> newEvents = eventStore.loadFromVersion(
            orderId, snapshot.get().getVersion());

        // 4. 새 이벤트 적용
        newEvents.forEach(order::apply);
        order.markEventsAsCommitted();

        return Optional.of(order);
    }

    public void saveWithSnapshot(Order order) {
        // 이벤트 저장
        List<DomainEvent> uncommittedEvents = order.getUncommittedEvents();
        long expectedVersion = order.getVersion() - uncommittedEvents.size();
        eventStore.append(order.getId(), uncommittedEvents, expectedVersion);
        order.markEventsAsCommitted();

        // 스냅샷 필요 여부 확인
        if (order.getVersion() % SNAPSHOT_THRESHOLD == 0) {
            createSnapshot(order);
        }
    }

    private void createSnapshot(Order order) {
        SnapshotEntity snapshot = new SnapshotEntity();
        snapshot.setAggregateId(order.getId());
        snapshot.setAggregateType("Order");
        snapshot.setVersion(order.getVersion());
        snapshot.setState(serializeOrder(order));
        snapshot.setCreatedAt(Instant.now());

        snapshotRepository.save(snapshot);
    }
}
핵심 포인트: 스냅샷
  • 성능 최적화: 이벤트가 많을 때 전체 재생 비용 절감
  • 주기적 생성: SNAPSHOT_THRESHOLD(예: 100) 이벤트마다 스냅샷 생성
  • 부분 재생: 스냅샷 + 이후 이벤트만 재생하여 상태 복원
  • 상태 직렬화: Aggregate 상태를 JSON으로 저장

Application Service#

OrderApplicationService#

@Service
@RequiredArgsConstructor
@Transactional
public class OrderApplicationService {
    private final OrderRepository orderRepository;
    private final ApplicationEventPublisher eventPublisher;

    public String createOrder(CreateOrderCommand command) {
        String orderId = UUID.randomUUID().toString();

        Order order = Order.create(
            orderId,
            command.customerId(),
            command.shippingAddress()
        );

        orderRepository.save(order);

        // 외부 시스템 알림을 위한 이벤트 발행
        publishEvents(order);

        return orderId;
    }

    public void addItem(AddItemCommand command) {
        Order order = orderRepository.findById(command.orderId())
            .orElseThrow(() -> new OrderNotFoundException(command.orderId()));

        order.addItem(
            command.productId(),
            command.productName(),
            command.quantity(),
            command.unitPrice()
        );

        orderRepository.save(order);
        publishEvents(order);
    }

    public void confirmOrder(String orderId) {
        Order order = orderRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));

        order.confirm();

        orderRepository.save(order);
        publishEvents(order);
    }

    public void cancelOrder(String orderId, String reason) {
        Order order = orderRepository.findById(orderId)
            .orElseThrow(() -> new OrderNotFoundException(orderId));

        order.cancel(reason);

        orderRepository.save(order);
        publishEvents(order);
    }

    // 조회: 특정 시점의 상태 복원
    public OrderView getOrderAtVersion(String orderId, long version) {
        List<DomainEvent> events = eventStore.load(orderId).stream()
            .filter(e -> e.getVersion() <= version)
            .toList();

        if (events.isEmpty()) {
            throw new OrderNotFoundException(orderId);
        }

        Order order = Order.reconstitute(orderId, events);
        return OrderView.from(order);
    }

    private void publishEvents(Order order) {
        order.getUncommittedEvents().forEach(eventPublisher::publishEvent);
    }
}
핵심 포인트: Application Service
  • 시점 조회: getOrderAtVersion()으로 특정 버전의 상태 조회
  • 이벤트 필터링: version <= targetVersion 조건으로 과거 상태 복원
  • 외부 이벤트 발행: 저장 후 ApplicationEventPublisher로 외부 시스템 알림
  • 표준 CRUD 패턴: 기존 방식과 동일한 서비스 인터페이스 유지

테스트#

단위 테스트#

class OrderTest {

    @Test
    void 주문_생성_시_OrderCreatedEvent가_발생한다() {
        // When
        Order order = Order.create("order-1", "customer-1", "서울시 강남구");

        // Then
        List<DomainEvent> events = order.getUncommittedEvents();
        assertThat(events).hasSize(1);
        assertThat(events.get(0)).isInstanceOf(OrderCreatedEvent.class);

        OrderCreatedEvent event = (OrderCreatedEvent) events.get(0);
        assertThat(event.getAggregateId()).isEqualTo("order-1");
        assertThat(event.getCustomerId()).isEqualTo("customer-1");
    }

    @Test
    void 이벤트에서_상태를_복원할_수_있다() {
        // Given
        List<DomainEvent> events = List.of(
            new OrderCreatedEvent("order-1", 1, "customer-1", "서울시"),
            new OrderItemAddedEvent("order-1", 2, "prod-1", "노트북", 1, Money.won(1000000)),
            new OrderConfirmedEvent("order-1", 3, Money.won(1000000))
        );

        // When
        Order order = Order.reconstitute("order-1", events);

        // Then
        assertThat(order.getStatus()).isEqualTo(OrderStatus.CONFIRMED);
        assertThat(order.getOrderLines()).hasSize(1);
        assertThat(order.getTotalAmount()).isEqualTo(Money.won(1000000));
    }

    @Test
    void 확정된_주문에는_상품을_추가할_수_없다() {
        // Given
        Order order = Order.create("order-1", "customer-1", "서울시");
        order.addItem("prod-1", "노트북", 1, Money.won(1000000));
        order.confirm();
        order.markEventsAsCommitted();

        // When & Then
        assertThatThrownBy(() ->
            order.addItem("prod-2", "마우스", 1, Money.won(50000))
        ).isInstanceOf(IllegalStateException.class)
         .hasMessageContaining("확정된 주문");
    }
}
핵심 포인트: 테스트
  • 이벤트 검증: getUncommittedEvents()로 발생한 이벤트 확인
  • 상태 복원 테스트: 이벤트 리스트로 reconstitute 후 상태 검증
  • 불변식 테스트: 잘못된 상태 전이 시도 시 예외 발생 확인
  • 독립적 테스트: DB 없이 도메인 로직만 단위 테스트 가능

체크리스트#

  • 모든 상태 변경은 이벤트를 통해 수행
  • 이벤트는 불변 (Immutable)
  • 낙관적 동시성 제어 구현
  • 스냅샷으로 성능 최적화
  • 이벤트 스키마 버전 관리
  • 재생(Replay) 테스트

다음 단계#