The pipeline publishes event notifications after each destination loader completes. Notifications are sent to an ActiveMQ Virtual Topic and can be consumed by any number of independent subscribers.Documentation Index
Fetch the complete documentation index at: https://docs.datris.ai/llms.txt
Use this file to discover all available pages before exploring further.
How It Works
The pipeline uses ActiveMQ’s Virtual Destinations pattern:- Each destination loader (PostgreSQL, MongoDB, Kafka, ActiveMQ, Object Store, REST Endpoint, Qdrant, Weaviate, Milvus, Chroma, pgvector) publishes a notification to
VirtualTopic.{environment}-pipeline-notification - Consumers subscribe by creating a queue named
Consumer.{name}.VirtualTopic.{environment}-pipeline-notification - ActiveMQ automatically copies topic messages to all matching consumer queues
- Each consumer gets its own independent queue with its own message cursor
Consuming Notifications
Queue Naming Convention
oss:
Consumer.analytics.VirtualTopic.oss-pipeline-notificationConsumer.audit-log.VirtualTopic.oss-pipeline-notificationConsumer.dashboard.VirtualTopic.oss-pipeline-notification
Quick Start: Python Consumer (Minimal)
Note: This is a minimal snippet. For a production-ready consumer with automatic reconnect, heartbeats, graceful shutdown, and configurable virtual topic support, see Topic Subscriber.
Example: Java/JMS Consumer
Message Filtering
Notifications include JMS message properties that can be used with JMS selectors for filtering:| Property | Description |
|---|---|
pipeline | Pipeline name |
destination | Destination type (postgres, mongodb, objectStore, kafka, activemq, restEndpoint, qdrant, weaviate, milvus, chroma, pgvector) |
schema | Database schema (database destinations) |
database | Database name (database destinations) |
table | Table name (database destinations) |
topic | Kafka topic (Kafka destinations) |
queueName | Queue name (ActiveMQ destinations) |
prefixKey | Object store prefix (object store destinations) |
Notification Payload
null.
Enabling Notifications
Notifications are enabled by default inapplication.yaml:
"false" to disable all notifications.