Hexagonal Architecture
Data Stream follows Hexagonal Architecture (Ports & Adapters) for clean separation of concerns.
Overview
Core Principles
1. Domain at the Center
The domain layer contains pure business logic with no external dependencies:
// internal/core/domain/round.go
package domain
type Round struct {
ID int64
GameID int
GameSlug string
GameType string
FinishedAt time.Time
Extras json.RawMessage
Timestamp int64
}
// Pure business logic - no external deps
func (r *Round) IsValid() bool {
return r.ID > 0 && r.GameSlug != ""
}
2. Ports Define Contracts
Ports are interfaces that define how the domain interacts with the outside world:
// internal/core/ports/repositories.go
package ports
type RoundRepository interface {
GetLatest(ctx context.Context, gameSlug string) (*domain.Round, error)
GetHistory(ctx context.Context, gameSlug string, limit int) ([]domain.Round, error)
}
type StreamSubscriber interface {
Subscribe(ctx context.Context, channel string) (<-chan domain.Round, error)
}
3. Adapters Implement Ports
Adapters connect the domain to external systems:
// internal/adapters/outbound/redis/round_repository.go
package redis
type RoundRepository struct {
client *redis.Client
}
func (r *RoundRepository) GetLatest(ctx context.Context, gameSlug string) (*domain.Round, error) {
key := fmt.Sprintf("latest:%s", gameSlug)
data, err := r.client.Get(ctx, key).Result()
if err != nil {
return nil, domain.ErrRoundNotFound
}
// Parse and return...
}
Project Structure
backend/internal/
├── core/ # Business Logic
│ ├── domain/ # Entities
│ │ ├── game.go # Game entity
│ │ ├── round.go # Round entity
│ │ └── errors.go # Domain errors
│ │
│ ├── ports/ # Interfaces
│ │ ├── repositories.go # Data access
│ │ ├── messaging.go # Pub/Sub
│ │ └── services.go # Service ports
│ │
│ └── services/ # Use Cases
│ └── round_service.go # Query service
│
└── adapters/
├── inbound/ # Driving (Primary)
│ ├── http/ # REST handlers
│ ├── websocket/ # WS handlers
│ ├── sse/ # SSE handlers
│ └── webtransport/ # WT server
│
└── outbound/ # Driven (Secondary)
└── redis/ # Redis adapter
Benefits
Testability
Mock ports for unit testing:
type MockRoundRepo struct {
rounds map[string]*domain.Round
}
func (m *MockRoundRepo) GetLatest(ctx context.Context, slug string) (*domain.Round, error) {
if r, ok := m.rounds[slug]; ok {
return r, nil
}
return nil, domain.ErrRoundNotFound
}
func TestRoundService(t *testing.T) {
mock := &MockRoundRepo{
rounds: map[string]*domain.Round{
"crash": {ID: 1, GameSlug: "crash"},
},
}
service := services.NewRoundQueryService(mock)
// Test service...
}
Flexibility
Swap adapters without changing business logic:
// Easy to switch from Redis to PostgreSQL
var repo ports.RoundRepository
if useRedis {
repo = redis.NewRoundRepository(redisClient)
} else {
repo = postgres.NewRoundRepository(db)
}
service := services.NewRoundQueryService(repo)
Maintainability
Clear boundaries make code easier to understand and modify.
Wire Dependency Injection
DataStream uses Google Wire for compile-time dependency injection.
Why Wire?
| Manual (main.go) | Wire (di/) |
|---|---|
| Errors at runtime | Errors at compile-time |
| Manual ordering | Automatic ordering |
| Hard to refactor | Easy to add deps |
| Zero reflection | Zero reflection |
Structure
di/
├── wire.go # Provider definitions (you write)
└── wire_gen.go # Generated code (wire generates)
Provider Sets
//go:build wireinject
package di
import "github.com/google/wire"
// PostgresSet provides database adapters
var PostgresSet = wire.NewSet(
postgres.NewGameRepository,
wire.Bind(new(ports.GameRepository), new(*postgres.GameRepository)),
)
// RedisSet provides cache adapters
var RedisSet = wire.NewSet(
redis.NewRoundRepository,
redis.NewStreamPublisher,
wire.Bind(new(ports.RoundRepository), new(*redis.RoundRepository)),
wire.Bind(new(ports.StreamPublisher), new(*redis.StreamPublisher)),
)
// NATSSet provides messaging adapters
var NATSSet = wire.NewSet(
natsadapter.NewCDCConsumer,
wire.Bind(new(ports.CDCConsumer), new(*natsadapter.CDCConsumer)),
)
// ApplicationSet provides use cases
var ApplicationSet = wire.NewSet(
services.NewRoundService,
services.NewGameCacheService,
)
// InitializeApp wires everything together
func InitializeApp(
ctx context.Context,
db *sql.DB,
redisClient *redis.Client,
nc *nats.Conn,
) (*App, error) {
wire.Build(
PostgresSet,
RedisSet,
NATSSet,
ApplicationSet,
ProvideApp,
)
return nil, nil
}
Usage
# Generate wire code
task wire
# Check if regeneration needed
task wire:check
Domain Errors
Typed errors for consistent error handling:
// internal/core/domain/errors.go
// Sentinel errors
var (
ErrGameNotFound = errors.New("game not found")
ErrRoundNotFound = errors.New("round not found")
ErrInvalidGameSlug = errors.New("invalid game slug")
)
// Typed errors
type NotFoundError struct {
Resource string
ID string
}
func (e *NotFoundError) Error() string {
return fmt.Sprintf("%s '%s' not found", e.Resource, e.ID)
}
type ValidationError struct {
Field string
Message string
}
func (e *ValidationError) Error() string {
return fmt.Sprintf("validation error on %s: %s", e.Field, e.Message)
}
// Helper functions
func IsNotFoundError(err error) bool {
var nfe *NotFoundError
return errors.As(err, &nfe) || errors.Is(err, ErrGameNotFound) || errors.Is(err, ErrRoundNotFound)
}
func IsValidationError(err error) bool {
var ve *ValidationError
return errors.As(err, &ve)
}
func NewNotFoundError(resource, id string) error {
return &NotFoundError{Resource: resource, ID: id}
}
Error Handling in Adapters
// HTTP adapter error handling
func errorHandler(c *fiber.Ctx, err error) error {
if domain.IsNotFoundError(err) {
return c.Status(404).JSON(fiber.Map{
"error": err.Error(),
"success": false,
})
}
if domain.IsValidationError(err) {
return c.Status(400).JSON(fiber.Map{
"error": err.Error(),
"success": false,
})
}
return c.Status(500).JSON(fiber.Map{
"error": "internal server error",
"success": false,
})
}
Production Middlewares
Panic Recovery
Captures panics and prevents server crashes:
app.Use(recover.New(recover.Config{
EnableStackTrace: true,
}))
Rate Limiter
Limits requests per IP:
api := app.Group("/api", limiter.New(limiter.Config{
Max: 100, // 100 requests
Expiration: 1 * time.Minute, // per minute
}))
Response headers:
X-Ratelimit-Limit: 100
X-Ratelimit-Remaining: 97
X-Ratelimit-Reset: 53
Custom Error Handler
Maps domain errors to HTTP responses:
app := fiber.New(fiber.Config{
ErrorHandler: func(c *fiber.Ctx, err error) error {
if domain.IsNotFoundError(err) {
return c.Status(404).JSON(fiber.Map{
"error": err.Error(),
"success": false,
})
}
if domain.IsValidationError(err) {
return c.Status(400).JSON(fiber.Map{
"error": err.Error(),
"success": false,
})
}
return c.Status(500).JSON(fiber.Map{
"error": "internal server error",
"success": false,
})
},
})