CDC Pipeline
Change Data Capture (CDC) is the core of Data Stream's real-time capabilities.
Overview
CDC captures database changes at the source (PostgreSQL WAL) and streams them through the pipeline:
PostgreSQL → Debezium → NATS JetStream → Consumer → Redis → Clients
Components
PostgreSQL (Source)
Configured for logical replication:
-- wal_level must be 'logical'
SHOW wal_level;
-- logical
-- Replication slot for Debezium
SELECT * FROM pg_replication_slots;
Debezium (CDC Engine)
Captures WAL changes and publishes to NATS:
# debezium/application.properties
debezium.source.connector.class=io.debezium.connector.postgresql.PostgresConnector
debezium.source.database.hostname=postgres
debezium.source.database.port=5432
debezium.source.database.user=datastream
debezium.source.database.password=datastream
debezium.source.database.dbname=datastream
debezium.source.topic.prefix=datastream
debezium.source.table.include.list=public.rounds
debezium.source.plugin.name=pgoutput
debezium.sink.type=nats-jetstream
debezium.sink.nats-jetstream.url=nats://nats:4222
debezium.sink.nats-jetstream.stream=DebeziumStream
NATS JetStream (Message Broker)
Durable message streaming:
Streams:
├── DebeziumStream # Raw CDC events
│ └── datastream.public.rounds
└── ROUNDS # Segmented by game
├── rounds.game.crash
├── rounds.game.double
└── rounds.type.multiplier
Go Consumer
Processes CDC events and publishes to Redis:
// consumer/internal/adapters/nats/consumer.go
func (c *CDCConsumer) ProcessEvent(msg *nats.Msg) error {
var event CDCEvent
json.Unmarshal(msg.Data, &event)
// Skip non-insert events
if event.Payload.Op != "c" {
return nil
}
round := c.mapToRound(event.Payload.After)
// Deduplication
if c.isDuplicate(round.ID) {
return nil
}
// Publish to Redis
c.redisPublisher.Publish(round)
return nil
}
Redis (Read Model)
Stores processed data and provides Pub/Sub:
Keys:
├── latest:crash # Last round (STRING)
├── history:crash # History (ZSET)
├── history:type:multiplier # By type (ZSET)
├── stream:crash # Pub/Sub channel
└── processed:12345 # Dedup (STRING, TTL 5min)
Data Flow
1. INSERT in PostgreSQL
INSERT INTO rounds (game_id, status, extras, finished_at)
VALUES (1, 'finished', '{"point": "5.32"}', NOW());
2. WAL Entry Created
PostgreSQL writes to Write-Ahead Log.
3. Debezium Captures Change
Debezium reads WAL via logical replication:
{
"schema": {...},
"payload": {
"before": null,
"after": {
"id": 513,
"game_id": 1,
"status": "finished",
"extras": "{\"point\": \"5.32\"}",
"finished_at": 1768621542735266
},
"source": {...},
"op": "c",
"ts_ms": 1768621542740
}
}
4. Published to NATS
Debezium publishes to datastream.public.rounds subject.
5. Consumer Processes
// Enrich with game metadata
game := c.gameCache.Get(event.GameID)
round.GameSlug = game.Slug
round.GameType = game.Type
// Store in Redis
c.redis.Set(ctx, fmt.Sprintf("latest:%s", round.GameSlug), round)
c.redis.ZAdd(ctx, fmt.Sprintf("history:%s", round.GameSlug), redis.Z{
Score: float64(round.Timestamp),
Member: round,
})
// Publish to Pub/Sub
c.redis.Publish(ctx, fmt.Sprintf("stream:%s", round.GameSlug), round)
6. Clients Receive
Clients connected via WebSocket/SSE receive the update.
Latency Breakdown
| Stage | Latency |
|---|---|
| PostgreSQL INSERT | 1-5ms |
| WAL write | ~1ms |
| Debezium capture | ~1ms |
| NATS publish | ~1ms |
| Consumer process | ~1ms |
| Redis publish | ~1ms |
| Total | < 10ms |
Deduplication
Prevents processing the same event twice:
func (c *Consumer) isDuplicate(roundID int64) bool {
key := fmt.Sprintf("processed:%d", roundID)
result, _ := c.redis.SetNX(ctx, key, "1", 5*time.Minute).Result()
return !result // If SetNX returns false, key already exists
}
Error Handling
Transient Errors
NATS JetStream provides at-least-once delivery:
sub.Consume(func(msg jetstream.Msg) {
if err := processEvent(msg); err != nil {
// Don't ack - will be redelivered
msg.Nak()
return
}
msg.Ack()
})
Poison Messages
Skip after max retries:
if msg.NumDelivered() > 10 {
log.Error("Max retries exceeded, skipping", "id", msg.ID)
msg.Term()
return
}
Monitoring
Debezium Health
curl http://localhost:8080/q/health
NATS Streams
curl http://localhost:8222/jsz | jq '.account_details[0].stream_detail'
Consumer Lag
# Check consumer position
nats consumer info ROUNDS consumer-name