Performance
Performance characteristics and optimization strategies.
Benchmarks
Latency
| Metric | Value |
|---|---|
| CDC (DB → Redis) | < 10ms |
| WebSocket push | 1-5ms |
| SSE push | 10-50ms |
| WebTransport push | 0.5-2ms |
| REST API | 0.1-0.5ms |
Throughput
| Component | Capacity |
|---|---|
| CDC events | ~10k/s |
| Consumer | ~50k/s |
| REST API | ~100k req/s |
| WebSocket | ~100k msg/s |
| SSE | ~50k msg/s |
Optimization Strategies
1. Connection Pooling
// Redis connection pool
redisClient := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
PoolSize: 100,
MinIdleConns: 10,
})
2. Message Batching
For high-volume scenarios:
var batch []Round
ticker := time.NewTicker(100 * time.Millisecond)
for {
select {
case round := <-incoming:
batch = append(batch, round)
if len(batch) >= 100 {
publishBatch(batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
publishBatch(batch)
batch = batch[:0]
}
}
}
3. Redis Pipelining
pipe := redis.Pipeline()
for _, round := range rounds {
pipe.Set(ctx, key, value, 0)
}
pipe.Exec(ctx)
4. NATS Async Publishing
debezium.sink.nats-jetstream.async.enabled=true
debezium.sink.nats-jetstream.async.max.pending=10000
Monitoring
Prometheus Metrics
var (
messagesProcessed = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "datastream_messages_processed_total",
Help: "Total messages processed",
},
)
processingLatency = prometheus.NewHistogram(
prometheus.HistogramOpts{
Name: "datastream_processing_latency_seconds",
Help: "Processing latency",
Buckets: []float64{.001, .005, .01, .025, .05, .1},
},
)
)
Key Metrics to Monitor
- Message processing rate
- End-to-end latency
- Error rate
- Connection count
- Memory usage