Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.datris.ai/llms.txt

Use this file to discover all available pages before exploring further.

The pipeline can consume data from Kafka topics in real time. Configure the Kafka consumer in application.yaml, and the pipeline subscribes to topics based on registered pipeline configurations with streamAttributes.type: "kafka".
Kafka is opt-in. The bundled kafka, zookeeper, and kafka-ui services are commented out in docker-compose.yml by default. To use the bundled broker, uncomment all three service blocks and the kafka-data, zookeeper-data, and zookeeper-log volumes. The kafka-clients library is included in the assembly JAR, so pipelines that point at any external Kafka broker work without enabling the bundled one.

Enabling the Kafka Consumer

Set the kafkaConsumer block in application.yaml:
kafkaConsumer:
  enabled: true
  bootstrapServers: "kafka:9092"
  groupId: "oss-group"
  topicPollingInterval: 500
  topicPrefix: "oss"

Configuration Reference

PropertyRequiredDescriptionDefault
enabledNoEnable or disable the Kafka consumerfalse
bootstrapServersYesComma-separated list of Kafka broker addresses
groupIdYesConsumer group ID; controls offset tracking and partition assignment
topicPollingIntervalNoMilliseconds between poll cycles500
topicPrefixNoPrefix prepended to pipeline names to form topic namesoss

How Topics Are Resolved

At startup, the consumer reads all pipeline configurations from the database and subscribes to topics for pipelines that have streamAttributes.type: "kafka". The topic name is constructed as {topicPrefix}.{pipelineName}. For example, with topicPrefix: "oss" (defined in kafkaConsumer.topicPrefix in application.yaml) and a pipeline named orders, the consumer subscribes to topic oss.orders. When a Kafka message arrives on oss.orders, the consumer strips the prefix and looks up the pipeline named orders to determine how to process the data.

Data Formats

The format of incoming records is determined by the pipeline configuration — not auto-detected. Configure the pipeline’s source.fileAttributes to match the data format:
  • CSV — use csvAttributes with the appropriate delimiter. Records are parsed by position against the pipeline schema.
  • JSON — use jsonAttributes. Field names in the payload are matched against the pipeline schema by name.
  • XML — use xmlAttributes. Element names are matched against the pipeline schema by name.
Any plain text data can be consumed as long as the pipeline schema and file attributes match the format.

Full Example

A complete setup with a pipeline configuration and the Kafka consumer enabled: application.yaml
kafkaConsumer:
  enabled: true
  bootstrapServers: "kafka:9092"
  groupId: "oss-group"
  topicPollingInterval: 500
  topicPrefix: "oss"
Pipeline configuration (orders)
{
  "name": "orders",
  "source": {
    "streamAttributes": {
      "type": "kafka"
    },
    "fileAttributes": {
      "jsonAttributes": {
        "encoding": "UTF-8"
      }
    },
    "schemaProperties": {
      "fields": [
        { "name": "id", "type": "bigint" },
        { "name": "email", "type": "string" },
        { "name": "order_date", "type": "date" },
        { "name": "total", "type": "double" }
      ]
    }
  },
  "destination": {
    "database": {
      "dbName": "datris",
      "schema": "public",
      "table": "orders",
      "usePostgres": true
    }
  }
}
Produce a test message and verify ingestion:
# Produce a JSON record to the topic
echo '{"id":1001,"email":"jane@example.com","order_date":"2025-06-15","total":249.99}' | \
  docker exec -i kafka kafka-console-producer.sh --bootstrap-server kafka:9092 --topic oss.orders

# Check pipeline status
curl -s "http://localhost:8080/api/v1/pipeline/status?pipelinename=orders" | jq .

Offset Management

The pipeline uses Kafka’s auto-commit to track offsets using the configured groupId. If the consumer restarts, it resumes from the last committed offset. New consumer groups start from the earliest offset. To reprocess a topic from the beginning, reset the consumer group offsets:
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group oss-group \
  --topic oss.orders \
  --reset-offsets --to-earliest --execute

Troubleshooting

SymptomCheck
Consumer not subscribing to a topicVerify the pipeline has streamAttributes.type: "kafka" and was registered before the server started.
Records not parsingConfirm the pipeline’s fileAttributes (csvAttributes, jsonAttributes, or xmlAttributes) matches the format of your Kafka records.
Lag increasingCheck consumer group lag with kafka-consumer-groups.sh --describe. Consider scaling partitions or adding consumer instances with the same groupId.
Connection failuresVerify bootstrapServers addresses are reachable from the pipeline host. Check Kafka broker logs for authentication or authorization errors.