Skip to main content

Architecture

Data Stream follows Hexagonal Architecture (Ports & Adapters) to achieve clean separation of concerns, testability, and maintainability.

Overview

Write and Read Path

Hexagonal Architecture

Core Principles

  1. Domain at the Center: Business logic has no external dependencies
  2. Ports: Interfaces that define contracts
  3. Adapters: Implementations that connect to external systems
  4. Dependency Inversion: Outer layers depend on inner layers

Project Structure

backend/
├── cmd/
│ └── main.go # Application entry point

├── internal/
│ ├── core/ # Business Logic (Domain)
│ │ ├── domain/ # Entities + Domain Errors
│ │ │ ├── game.go # Game, GameType
│ │ │ ├── round.go # Round, RoundResult
│ │ │ └── errors.go # Domain errors
│ │ │
│ │ ├── ports/ # Interface Definitions
│ │ │ ├── repositories.go # GameRepository, RoundRepository
│ │ │ ├── messaging.go # StreamSubscriber
│ │ │ └── services.go # Service interfaces
│ │ │
│ │ └── services/ # Use Cases
│ │ └── round_service.go # RoundQueryService
│ │
│ └── adapters/
│ ├── inbound/ # Driving Adapters
│ │ ├── http/ # REST API handlers
│ │ ├── websocket/ # WebSocket handlers
│ │ ├── sse/ # SSE handlers
│ │ └── webtransport/ # WebTransport server
│ │
│ └── outbound/ # Driven Adapters
│ └── redis/ # Redis repository + Pub/Sub

├── pkg/
│ └── config/ # Configuration

└── public/ # Static files (HTML, CSS, JS)

Layer Responsibilities

Domain Layer (internal/core/domain/)

Pure business entities with no external dependencies:

// domain/round.go
type Round struct {
ID int64
GameID int
GameSlug string
GameType string
FinishedAt time.Time
Extras json.RawMessage
Timestamp int64
}

type RoundResult struct {
Round
// Additional presentation fields
}

Ports Layer (internal/core/ports/)

Interface definitions (contracts):

// ports/repositories.go
type RoundRepository interface {
GetLatest(ctx context.Context, gameSlug string) (*domain.Round, error)
GetHistory(ctx context.Context, gameSlug string, limit int) ([]domain.Round, error)
GetHistoryByType(ctx context.Context, gameType string, limit int) ([]domain.Round, error)
}

type StreamSubscriber interface {
Subscribe(ctx context.Context, channel string) (<-chan domain.Round, error)
SubscribePattern(ctx context.Context, pattern string) (<-chan domain.Round, error)
}

Services Layer (internal/core/services/)

Use cases that orchestrate domain operations:

// services/round_service.go
type RoundQueryService struct {
repo ports.RoundRepository
}

func (s *RoundQueryService) GetLatest(ctx context.Context, gameSlug string) (*domain.Round, error) {
round, err := s.repo.GetLatest(ctx, gameSlug)
if err != nil {
return nil, domain.NewNotFoundError("round", gameSlug)
}
return round, nil
}

Adapters Layer (internal/adapters/)

Inbound Adapters (Driving):

  • HTTP handlers for REST API
  • WebSocket handlers for real-time bidirectional
  • SSE handlers for server push
  • WebTransport server for QUIC/HTTP/3

Outbound Adapters (Driven):

  • Redis repository for data access
  • Redis pub/sub for event streaming

CDC Pipeline

Change Data Capture Flow

CDC Pipeline

Redis Data Model

Keys:
├── latest:crash # STRING - Last round for game
├── latest:double # STRING - Last round for game
├── history:crash # ZSET - History sorted by timestamp
├── history:double # ZSET - History sorted by timestamp
├── history:type:multiplier # ZSET - History by game type
├── processed:12345 # STRING - Deduplication (TTL 5min)
└── stream:crash # Pub/Sub channel

Streaming Protocols

Protocol Comparison

AspectWebSocketSSEWebTransport
TransportTCPHTTP (chunked)UDP (QUIC)
DirectionBidirectionalServer → ClientBidirectional
Streams11Multiple
ReconnectManualAutomaticManual
Proxy friendlySometimesAlwaysRarely
Browser supportUniversalIE exceptChrome/Firefox

Message Flow

Message Flow

Performance Characteristics

Latency Breakdown

StageTypical Latency
PostgreSQL INSERT1-5ms
Debezium CDC~1ms
NATS publish~1ms
Consumer processing~1ms
Redis Pub/Sub~1ms
Total (DB → Client)< 100ms

Throughput

ComponentCapacity
CDC (Postgres → NATS)~10k events/s
Consumer (NATS → Redis)~50k events/s
REST API~100k req/s
WebSocket push~100k msg/s
SSE push~50k msg/s

Error Handling

Domain Errors

// Sentinel errors
var (
ErrGameNotFound = errors.New("game not found")
ErrRoundNotFound = errors.New("round not found")
)

// Typed errors for better handling
type NotFoundError struct {
Resource string
ID string
}

type ValidationError struct {
Field string
Message string
}

HTTP Error Responses

{
"error": "game 'invalid-game' not found",
"success": false
}

Streaming Reconnection

  • WebSocket: Client must implement reconnection logic
  • SSE: Browser automatically reconnects
  • WebTransport: Client must implement reconnection logic