Skip to main content
Pipelines are configured entirely through JSON. Each pipeline defines a source, optional processing steps (preprocessing, data quality, transformation), and one or more destinations.

Full Configuration Example

{
  "name": "sales_data",
  "source": {
    "schemaProperties": {
      "fields": [
        {"name": "order_id", "type": "int"},
        {"name": "customer_name", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "order_date", "type": "date"},
        {"name": "region", "type": "string"}
      ]
    },
    "fileAttributes": {
      "csvAttributes": {
        "delimiter": ",",
        "header": true,
        "encoding": "UTF-8"
      }
    }
  },
  "preprocessor": {
    "endpoint": "http://my-service:8080/preprocess",
    "async": false,
    "bearerToken": "my-token",
    "timeoutMs": 300000
  },
  "dataQuality": {
    "validateFileHeader": true,
    "aiRule": {
      "instruction": "Amount must be a positive number, all dates must be in YYYY-MM-DD format",
      "onFailureIsError": true
    },
  },
  "transformation": {
    "aiTransformation": {
      "instruction": "convert all date values to YYYY-MM-DD format. Trim whitespace from all columns. Remove duplicate rows."
    }
  },
  "destination": {
    "objectStore": {
      "prefixKey": "sales/daily",
      "fileFormat": "parquet",
      "partitionBy": ["region"],
      "deleteBeforeWrite": false,
      "writeMode": "append"
    },
    "database": {
      "dbName": "analytics",
      "schema": "public",
      "table": "sales",
      "keyFields": ["order_id"],
      "usePostgres": true,
      "truncateBeforeWrite": false,
      "useTransaction": true
    },
    "kafka": {
      "topic": "sales-events",
      "keyField": "order_id"
    }
  }
}

Configuration Fields

Top Level

FieldTypeRequiredDescription
namestringYesPipeline name (max 80 characters)
sourceobjectYesSource configuration
preprocessorobjectNoOptional REST endpoint called before processing
dataQualityobjectNoData validation rules
transformationobjectNoData transformation settings
destinationobjectYesOne or more output destinations

Source

FieldTypeRequiredDescription
schemaPropertiesobjectYes*Schema definition with field names and types. *Required for structured/semi-structured data
fileAttributesobjectNoFile format configuration (CSV, JSON, XML, XLS, unstructured)
databaseAttributesobjectNoDatabase pull configuration
streamAttributesobjectNoStream source configuration (Kafka)

Source > Schema Properties

FieldTypeDescription
dbNamestringDatabase name associated with this schema (used by some destinations)
fieldsarrayList of {"name": "...", "type": "..."} objects
Supported types: boolean, int, tinyint, smallint, bigint, float, double, decimal(p,s), string, varchar(n), char(n), date, timestamp

Source > File Attributes

CSV:
"csvAttributes": {
  "delimiter": ",",
  "header": true,
  "encoding": "UTF-8"
}
JSON:
"jsonAttributes": {
  "everyRowContainsObject": true,
  "encoding": "UTF-8"
}
XML:
"xmlAttributes": {
  "everyRowContainsObject": true,
  "encoding": "UTF-8"
}
Excel:
"xlsAttributes": {
  "worksheet": 0,
  "tempCsvFileDelimiter": ","
}
Unstructured:
"unstructuredAttributes": {
  "fileExtension": "pdf",
  "preserveFilename": true
}

Source > Database Attributes

FieldTypeRequiredDescription
typestringYespostgres, mysql, or mssql
postgresSecretsNamestringConditionalVault secret name for Postgres credentials
mssqlSecretsNamestringConditionalVault secret name for MSSQL credentials
mysqlSecretsNamestringConditionalVault secret name for MySQL credentials
cronExpressionstringYesCron schedule for polling (e.g., 0 */5 * * * ?)
databasestringNoDatabase name
schemastringNoSchema name
tablestringYes*Table to query (*unless sqlOverride is set)
includeFieldsarrayNoColumn whitelist
timestampFieldNamestringYes*Column for incremental pulls (*unless sqlOverride is set)
sqlOverridestringNoCustom SELECT query (replaces auto-generated query)
outputDelimiterstringNoDelimiter for CSV output (default ,)

Preprocessor

FieldTypeDefaultDescription
endpointstringRequiredURL of the preprocessing service
asyncbooleanfalseIf true, returns immediately
bearerTokenstringnullAuthorization bearer token
timeoutMsint300000Request timeout (milliseconds)

Data Quality

See Data Quality for detailed documentation.
FieldTypeDescription
validateFileHeaderbooleanValidate CSV header matches schema field order
validationSchemastringPath to JSON Schema file for JSON/XML validation
aiRuleobjectCodeGen AI rule — plain-English instruction that generates a Python validation script

Transformation

See AI Transformation (CodeGen) for detailed documentation.
FieldTypeDescription
aiTransformationobjectAI transformation — plain-English instruction that generates a Python transformation script

Destination > Object Store

FieldTypeDefaultDescription
prefixKeystringRequiredS3 path prefix for output files
fileFormatstringparquetOutput format: parquet or orc
partitionByarraynullColumn names for partitioning
destinationBucketOverridestringnullCustom bucket (default: {environment}-data)
deleteBeforeWritebooleanfalseDelete existing data at path before writing
writeToTemporaryLocationbooleanfalseWrite to temp location first
writeModestringappendappend, overwrite, ignore, or errorifexists

Destination > Database

FieldTypeDefaultDescription
dbNamestringRequiredDatabase name
schemastringRequiredSchema name
tablestringRequiredTable name
keyFieldsarraynullPrimary key columns (enables upsert for MongoDB)
usePostgresbooleanfalseWrite to PostgreSQL
useMongoDBbooleanfalseWrite to MongoDB
manageTableManuallybooleanfalseIf false, auto-creates tables
truncateBeforeWritebooleanfalseTruncate table before loading
useTransactionbooleanfalseWrap in a transaction
optionsarraynullCustom COPY options (e.g., ["FORMAT csv", "DELIMITER ','"])

Destination > Kafka

FieldTypeDefaultDescription
topicstringRequiredKafka topic name
keyFieldstringnullColumn to use as message key
overrideBootstrapServersstringnullCustom bootstrap servers
timeoutMsint10000Producer timeout

Destination > ActiveMQ

FieldTypeDescription
queueNamestringActiveMQ queue name

Destination > REST Endpoint

FieldTypeDefaultDescription
endpointstringRequiredURL to POST data to
asyncbooleanfalseIf true, doesn’t wait for response
bearerTokenstringnullAuthorization token
timeoutMsint300000Request timeout (milliseconds)

Destination > Schema Properties (Optional)

Define a separate destination schema if column mapping differs from the source:
"destination": {
  "schemaProperties": {
    "dbName": "analytics",
    "fields": [
      {"name": "order_id", "type": "int"},
      {"name": "customer", "type": "string"}
    ]
  }
}

Multiple Destinations

A single pipeline can write to multiple destinations simultaneously. All destinations execute in parallel:
"destination": {
  "objectStore": { ... },
  "database": { ... },
  "kafka": { ... },
  "activeMQ": { ... },
  "restEndpoint": { ... }
}