application.yaml, and the pipeline subscribes to topics matching a prefix, parsing incoming records as CSV, JSON, or XML.
Enabling the Kafka Consumer
Set thekafkaConsumer block in application.yaml:
Configuration Reference
| Property | Required | Description | Default |
|---|---|---|---|
enabled | No | Enable or disable the Kafka consumer | false |
bootstrapServers | Yes | Comma-separated list of Kafka broker addresses | — |
groupId | Yes | Consumer group ID; controls offset tracking and partition assignment | — |
topicPollingInterval | No | Milliseconds between metadata refreshes to discover new topics | 10000 |
topicPrefix | No | Only topics whose names start with this prefix are consumed | "" (all topics) |
Topic Prefix Matching
WhentopicPrefix is set, the consumer periodically queries Kafka metadata and subscribes to every topic whose name begins with the prefix. This allows new pipelines to appear as topics without restarting the pipeline.
For example, with topicPrefix: "pipeline.", the consumer will pick up:
pipeline.orderspipeline.customerspipeline.events.clickstream
internal.logs or audit.trail.
The topic name minus the prefix is used as the pipeline name. pipeline.orders maps to the pipeline named orders.
Data Formats
The pipeline inspects each record to determine its format.CSV
Records containing plain comma-separated text are parsed as CSV. The first record on a topic may optionally be a header row; the pipeline matches columns by position against the pipeline schema.JSON and XML
Records that begin with{ or [ are parsed as JSON. Records that begin with < are parsed as XML. Field names in the payload are matched against the pipeline schema by name.
JSON example:
Full Example
A complete setup with a pipeline configuration and the Kafka consumer enabled: application.yamlOffset Management
The pipeline commits offsets to Kafka using the configuredgroupId. If the consumer restarts, it resumes from the last committed offset. To reprocess a topic from the beginning, reset the consumer group offsets:
Troubleshooting
| Symptom | Check |
|---|---|
| Consumer not subscribing to a topic | Verify the topic name starts with the configured topicPrefix. Wait at least one topicPollingInterval cycle after topic creation. |
| Records not parsing | Confirm the record format (CSV/JSON/XML) matches the pipeline schema field names or positions. |
| Lag increasing | Check consumer group lag with kafka-consumer-groups.sh --describe. Consider scaling partitions or adding consumer instances with the same groupId. |
| Connection failures | Verify bootstrapServers addresses are reachable from the pipeline host. Check Kafka broker logs for authentication or authorization errors. |