Engineering

Stateful stream enrichment with Debezium CDC: the multi-speed join problem

flink kafka debezium cdc clickhouse

The CVE vulnerability pipeline at SecurityScorecard was the first Flink pipeline I built from scratch at the company. It became the foundational pattern for everything that followed. But the most interesting engineering in it wasn't the happy path. It was what happened when two streams moved at completely different speeds.

What we were trying to do

The goal was to enrich security observations with CVE vulnerability details in real time. An observation arrives saying 'this host has port 443 open running OpenSSL 1.0.2.' We want to augment that with: is there a known CVE for this version of OpenSSL? Is it in the CISA KEV catalog? What does the vendor recommend?

The CVE reference data lives in Postgres. The observations come in from Kafka. The enrichment needs to happen at streaming speed. Querying Postgres on the hot path for every observation would destroy throughput.

The architecture I chose: materialize the CVE reference data into Flink's RocksDB state via Debezium CDC, so enrichment is a local lookup with no external dependency.

How Debezium CDC works in this context

Debezium tails PostgreSQL's Write-Ahead Log and publishes every insert, update, and delete as a structured event to Kafka. So instead of querying Postgres directly, I have a Kafka topic that's a real-time changelog of the CVE table. I consume this topic in Flink and materialize the current state into RocksDB, effectively a local, in-process copy of the CVE reference data that stays in sync automatically.

Enrichment then becomes: for each observation, look up the relevant CVE data from local RocksDB state. Microseconds, not milliseconds. No network hop, no external dependency on the critical path.

The multi-speed problem

Here's where it gets interesting. I'm joining two Kafka topics in Flink: the observations topic (high volume, constant throughput, thousands of events per second) and the Debezium CDC topic (low volume, event-driven, only produces when Postgres changes, which might be once every few minutes).

Flink's global watermark is the mechanism that tells the engine when to close time windows and emit results. It's the minimum of all source watermarks. That's an important detail. When one source goes quiet, its watermark stops advancing. And when that source's watermark stops advancing, the global watermark stops too.

So: the CDC topic goes quiet for two minutes (no Postgres changes). Its watermark freezes at T. The observation topic keeps producing events. Its watermark is at T+2min. The global watermark is still T. Every observation window that should have closed at T+30s, T+1min, T+2min: none of them fire. The pipeline is silently stalled.

The symptom vs. the cause

The symptom was intermittent window stall. Output would just stop for a while, then resume when the CDC topic produced an event. No errors. No alerts. Just silence.

The cause took a while to find because Flink doesn't loudly announce watermark freezes. I had to instrument watermark progression explicitly, logging the per-source watermarks and the global watermark at regular intervals, then correlate them with CDC topic activity to see the pattern.

The fix

Flink has a built-in mechanism for exactly this situation: per-source idle timeouts. You configure a duration, and if a source produces no events for that duration, Flink stops including it in the global watermark calculation. The fast source advances freely; if the slow source wakes up with a late event, it's handled as late data.

WatermarkStrategy
  .forBoundedOutOfOrderness(Duration.ofSeconds(10))
  .withIdleness(Duration.ofSeconds(30));

Simple to configure. The hard part was diagnosing that this was the problem in the first place.

Side-output fan-outs

The other design decision worth explaining: I used Flink's side output pattern to write to multiple ClickHouse tables from a single pipeline. One Flink job processes the enriched observation stream and emits to four side outputs: the main observations table, the CVE details table, the CISA KEV flags table, and the vendor recommendations table.

The alternative would have been four separate pipelines, each consuming the same input and doing their own enrichment. That means four copies of the RocksDB state, four sets of Kafka consumer offsets, four deployment configurations. The side output approach keeps the enrichment state centralized and cuts the operational overhead significantly.

This pipeline became the template for everything else we built. Getting the foundational patterns right (stateful enrichment, idle timeouts, side-output fan-outs) meant every engineer who followed had a working reference implementation to learn from and extend.

What I'd do differently

I'd add explicit watermark lag alerting from the start. The idle timeout fix prevents the stall, but if the CDC topic goes quiet for longer than expected, I want an alert. It might mean Debezium is stuck, not just quiet. The fix handles the symptom; an alert catches the cause.

← Previous post Next: Cross-region state sync →