Skip to main content
The pipeline can consume data from Kafka topics in real time. Configure the Kafka consumer in application.yaml, and the pipeline subscribes to topics matching a prefix, parsing incoming records as CSV, JSON, or XML.

Enabling the Kafka Consumer

Set the kafkaConsumer block in application.yaml:
kafkaConsumer:
  enabled: true
  bootstrapServers: "kafka-broker-1:9092,kafka-broker-2:9092"
  groupId: "pipeline-ingest"
  topicPollingInterval: 10000
  topicPrefix: "pipeline."

Configuration Reference

PropertyRequiredDescriptionDefault
enabledNoEnable or disable the Kafka consumerfalse
bootstrapServersYesComma-separated list of Kafka broker addresses
groupIdYesConsumer group ID; controls offset tracking and partition assignment
topicPollingIntervalNoMilliseconds between metadata refreshes to discover new topics10000
topicPrefixNoOnly topics whose names start with this prefix are consumed"" (all topics)

Topic Prefix Matching

When topicPrefix is set, the consumer periodically queries Kafka metadata and subscribes to every topic whose name begins with the prefix. This allows new pipelines to appear as topics without restarting the pipeline. For example, with topicPrefix: "pipeline.", the consumer will pick up:
  • pipeline.orders
  • pipeline.customers
  • pipeline.events.clickstream
It will ignore topics that do not match, such as internal.logs or audit.trail. The topic name minus the prefix is used as the pipeline name. pipeline.orders maps to the pipeline named orders.

Data Formats

The pipeline inspects each record to determine its format.

CSV

Records containing plain comma-separated text are parsed as CSV. The first record on a topic may optionally be a header row; the pipeline matches columns by position against the pipeline schema.
1001,jane@example.com,2025-06-15,249.99
1002,bob@example.com,2025-06-14,89.50

JSON and XML

Records that begin with { or [ are parsed as JSON. Records that begin with < are parsed as XML. Field names in the payload are matched against the pipeline schema by name. JSON example:
{"id": 1001, "email": "jane@example.com", "order_date": "2025-06-15", "total": 249.99}
XML example:
<record>
  <id>1001</id>
  <email>jane@example.com</email>
  <order_date>2025-06-15</order_date>
  <total>249.99</total>
</record>

Full Example

A complete setup with a pipeline configuration and the Kafka consumer enabled: application.yaml
kafkaConsumer:
  enabled: true
  bootstrapServers: "kafka:9092"
  groupId: "pipeline-ingest"
  topicPollingInterval: 5000
  topicPrefix: "pipeline."
Pipeline configuration (orders)
{
  "name": "orders",
  "source": {
    "streamAttributes": {
      "type": "kafka"
    },
    "schemaProperties": {
      "fields": [
        { "name": "id", "type": "bigint" },
        { "name": "email", "type": "varchar(255)" },
        { "name": "order_date", "type": "date" },
        { "name": "total", "type": "decimal(10,2)" }
      ]
    }
  }
}
Produce a test message and verify ingestion:
# Produce a JSON record to the topic
echo '{"id":1001,"email":"jane@example.com","order_date":"2025-06-15","total":249.99}' | \
  kafka-console-producer.sh --broker-list kafka:9092 --topic pipeline.orders

# Check pipeline status
curl -s "http://localhost:8080/api/v1/pipeline/status?pipelinename=orders" | jq .

Offset Management

The pipeline commits offsets to Kafka using the configured groupId. If the consumer restarts, it resumes from the last committed offset. To reprocess a topic from the beginning, reset the consumer group offsets:
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
  --group pipeline-ingest \
  --topic pipeline.orders \
  --reset-offsets --to-earliest --execute

Troubleshooting

SymptomCheck
Consumer not subscribing to a topicVerify the topic name starts with the configured topicPrefix. Wait at least one topicPollingInterval cycle after topic creation.
Records not parsingConfirm the record format (CSV/JSON/XML) matches the pipeline schema field names or positions.
Lag increasingCheck consumer group lag with kafka-consumer-groups.sh --describe. Consider scaling partitions or adding consumer instances with the same groupId.
Connection failuresVerify bootstrapServers addresses are reachable from the pipeline host. Check Kafka broker logs for authentication or authorization errors.