How It Works
The pipeline uses ActiveMQ’s Virtual Destinations pattern:- Each destination loader (PostgreSQL, MongoDB, Kafka, ActiveMQ, Object Store, REST) publishes a notification to
VirtualTopic.{environment}-pipeline-notification - Consumers subscribe by creating a queue named
Consumer.{name}.VirtualTopic.{environment}-pipeline-notification - ActiveMQ automatically copies topic messages to all matching consumer queues
- Each consumer gets its own independent queue with its own message cursor
Consuming Notifications
Queue Naming Convention
oss:
Consumer.analytics.VirtualTopic.oss-pipeline-notificationConsumer.audit-log.VirtualTopic.oss-pipeline-notificationConsumer.dashboard.VirtualTopic.oss-pipeline-notification
Quick Start: Python Consumer (Minimal)
Note: This is a minimal snippet. For a production-ready consumer with automatic reconnect, heartbeats, graceful shutdown, and configurable virtual topic support, see examples/topic-subscriber/.
Example: Java/JMS Consumer
Message Filtering
Notifications include JMS message properties that can be used with JMS selectors for filtering:| Property | Description |
|---|---|
pipeline | Pipeline name |
destination | Destination type (postgres, objectStore, kafka, mongodb, activemq) |
schema | Database schema (database destinations) |
database | Database name (database destinations) |
table | Table name (database destinations) |
topic | Kafka topic (Kafka destinations) |
queueName | Queue name (ActiveMQ destinations) |
prefixKey | Object store prefix (object store destinations) |
Notification Payload
null.
Enabling Notifications
Notifications are enabled by default inapplication.yaml:
"false" to disable all notifications.