The pipeline can consume data from Kafka topics in real time. Configure the Kafka consumer inDocumentation Index
Fetch the complete documentation index at: https://docs.datris.ai/llms.txt
Use this file to discover all available pages before exploring further.
application.yaml, and the pipeline subscribes to topics based on registered pipeline configurations with streamAttributes.type: "kafka".
Kafka is opt-in. The bundled
kafka, zookeeper, and kafka-ui services are commented out in docker-compose.yml by default. To use the bundled broker, uncomment all three service blocks and the kafka-data, zookeeper-data, and zookeeper-log volumes. The kafka-clients library is included in the assembly JAR, so pipelines that point at any external Kafka broker work without enabling the bundled one.Enabling the Kafka Consumer
Set thekafkaConsumer block in application.yaml:
Configuration Reference
| Property | Required | Description | Default |
|---|---|---|---|
enabled | No | Enable or disable the Kafka consumer | false |
bootstrapServers | Yes | Comma-separated list of Kafka broker addresses | — |
groupId | Yes | Consumer group ID; controls offset tracking and partition assignment | — |
topicPollingInterval | No | Milliseconds between poll cycles | 500 |
topicPrefix | No | Prefix prepended to pipeline names to form topic names | oss |
How Topics Are Resolved
At startup, the consumer reads all pipeline configurations from the database and subscribes to topics for pipelines that havestreamAttributes.type: "kafka". The topic name is constructed as {topicPrefix}.{pipelineName}.
For example, with topicPrefix: "oss" (defined in kafkaConsumer.topicPrefix in application.yaml) and a pipeline named orders, the consumer subscribes to topic oss.orders.
When a Kafka message arrives on oss.orders, the consumer strips the prefix and looks up the pipeline named orders to determine how to process the data.
Data Formats
The format of incoming records is determined by the pipeline configuration — not auto-detected. Configure the pipeline’ssource.fileAttributes to match the data format:
- CSV — use
csvAttributeswith the appropriate delimiter. Records are parsed by position against the pipeline schema. - JSON — use
jsonAttributes. Field names in the payload are matched against the pipeline schema by name. - XML — use
xmlAttributes. Element names are matched against the pipeline schema by name.
Full Example
A complete setup with a pipeline configuration and the Kafka consumer enabled: application.yamlOffset Management
The pipeline uses Kafka’s auto-commit to track offsets using the configuredgroupId. If the consumer restarts, it resumes from the last committed offset. New consumer groups start from the earliest offset.
To reprocess a topic from the beginning, reset the consumer group offsets:
Troubleshooting
| Symptom | Check |
|---|---|
| Consumer not subscribing to a topic | Verify the pipeline has streamAttributes.type: "kafka" and was registered before the server started. |
| Records not parsing | Confirm the pipeline’s fileAttributes (csvAttributes, jsonAttributes, or xmlAttributes) matches the format of your Kafka records. |
| Lag increasing | Check consumer group lag with kafka-consumer-groups.sh --describe. Consider scaling partitions or adding consumer instances with the same groupId. |
| Connection failures | Verify bootstrapServers addresses are reachable from the pipeline host. Check Kafka broker logs for authentication or authorization errors. |
