Skip to main content
The pipeline can ingest files dropped into MinIO buckets. There are three patterns: single data file, metadata file with data file, and bulk upload.

File Naming Convention

Single Data File

Place a file in the MinIO raw bucket following this naming pattern:
{pipeline}.{publishertoken}.{anything}.pipeline.{ext}
SegmentDescription
pipelineName of the registered pipeline configuration
publishertokenOptional identifier for the data publisher
anythingAny additional text (timestamp, version, etc.)
pipeline (literal)Fixed literal sentinel string
extFile extension: csv, json, xml
Example:
stock_price.publisher-001.2026-03-15.1.pipeline.csv

Metadata File + Data File

For more control, upload the data file first, then upload a metadata file (.metadata.json): Data file: stock_price.2026-03-15.pipeline.csv Metadata file: stock_price.2026-03-15.metadata.json
{
  "dataFileName": "stock_price.2026-03-15.pipeline.csv",
  "pipeline": "stock_price",
  "publisherToken": "28c39a6f-de32-4625-9bee-6af9ee547798"
}
The data file must be uploaded before the metadata file.

Bulk Upload

Upload multiple files to a directory, then trigger processing with a metadata file:
{
  "dataFilePath": "s3://oss-raw/bulk/stock_price/batch-001/",
  "pipeline": "stock_price",
  "bulkUpload": true,
  "publisherToken": "28c39a6f-de32-4625-9bee-6af9ee547798"
}
Important: Use a unique dataFilePath directory for each bulk upload. Do not reuse directories.

Compressed Files

Compressed archives (.zip, .gz, .tar, .jar) are automatically decompressed. The pipeline name and publisher token are parsed from the archive filename:
stock_price.publisher-001.2026-03-15.pipeline.zip

How It Works

  1. A file is placed in a MinIO bucket (via mc cp, API, or another system)
  2. MinIO sends an S3-compatible event notification to the ActiveMQ file-notifier queue
  3. The pipeline polls the queue on a configurable schedule (default: every 5 seconds)
  4. The pipeline parses the event, resolves the pipeline, and processes the file

Configuration

The queue name and polling interval are configured in application.yaml:
# Environment prefix - queue name becomes {environment}-file-notifier
environment: oss

# Polling interval in milliseconds
schedule:
  checkFileNotifierQueue: "5000"

# Days to retain processed message IDs for deduplication
ttlFileNotifierQueueMessages: "60"

Message Deduplication

The pipeline tracks processed message IDs in MongoDB to prevent duplicate processing. Each processed message ID is stored with a TTL (default: 60 days). If the same event arrives again within the TTL window, it is acknowledged and discarded.

MinIO Webhook Setup

Configure MinIO to send bucket notifications. The Docker setup uses the MinioWebhookController endpoint:
# Using mc to configure notifications
mc admin config set myminio notify_webhook:pipeline \
  endpoint="http://pipeline:8080/api/v1/minio-events"

mc admin service restart myminio
mc event add myminio/oss-raw arn:minio:sqs::pipeline:webhook --event put
Alternatively, ActiveMQ-based notifications can be configured (see MinIO documentation for AMQP notification targets).