Skip to main content
Pipelines are configured entirely through JSON. Each pipeline defines a source, optional processing steps (preprocessing, data quality, transformation), and one or more destinations. You don’t need to write JSON by hand — once you have the Datris Data Platform up and running, the Datris UI provides a step-by-step pipeline wizard that builds the full configuration for you. Just pick your source, destination, and any processing options, and the platform generates the JSON behind the scenes.

Full Configuration Example

{
  "name": "sales_data",
  "source": {
    "schemaProperties": {
      "fields": [
        {"name": "order_id", "type": "int"},
        {"name": "customer_name", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "order_date", "type": "date"},
        {"name": "region", "type": "string"}
      ]
    },
    "fileAttributes": {
      "csvAttributes": {
        "delimiter": ",",
        "header": true,
        "encoding": "UTF-8"
      }
    }
  },
  "preprocessor": {
    "endpoint": "http://my-service:8080/preprocess",
    "async": false,
    "bearerToken": "my-token",
    "timeoutMs": 300000
  },
  "dataQuality": {
    "validateFileHeader": true,
    "aiRule": {
      "instruction": "Amount must be a positive number, all dates must be in YYYY-MM-DD format",
      "onFailureIsError": true
    },
  },
  "transformation": {
    "aiTransformation": {
      "instruction": "convert all date values to YYYY-MM-DD format. Trim whitespace from all columns. Remove duplicate rows."
    }
  },
  "destination": {
    "objectStore": {
      "prefixKey": "sales/daily",
      "fileFormat": "parquet",
      "partitionBy": ["region"],
      "deleteBeforeWrite": false,
      "writeMode": "append"
    },
    "database": {
      "dbName": "analytics",
      "schema": "public",
      "table": "sales",
      "keyFields": ["order_id"],
      "usePostgres": true,
      "truncateBeforeWrite": false,
      "useTransaction": true
    },
    "kafka": {
      "topic": "sales-events",
      "keyField": "order_id"
    }
  }
}

Configuration Fields

Top Level

FieldTypeRequiredDescription
namestringYesPipeline name (max 80 characters)
sourceobjectYesSource configuration
preprocessorobjectNoOptional REST endpoint called before processing
dataQualityobjectNoData validation rules
transformationobjectNoData transformation settings
destinationobjectYesOne or more output destinations
catalogstringNoFree-form label that groups related pipelines and taps in the Data Catalog. Empty or null = Uncataloged.

Source

FieldTypeRequiredDescription
schemaPropertiesobjectYes*Schema definition with field names and types. *Required for structured/semi-structured data
fileAttributesobjectNoFile format configuration (CSV, JSON, XML, XLS, unstructured)
databaseAttributesobjectNoDatabase pull configuration
streamAttributesobjectNoStream source configuration (Kafka)

Source > Schema Properties

FieldTypeDescription
dbNamestringDatabase name associated with this schema (used by some destinations)
fieldsarrayList of {"name": "...", "type": "..."} objects
schemaVersionintSchema version (default 1)
Supported types: boolean, int, tinyint, smallint, bigint, float, double, decimal(p,s), string, varchar(n), char(n), date, timestamp

Source > File Attributes

CSV:
"csvAttributes": {
  "delimiter": ",",
  "header": true,
  "encoding": "UTF-8"
}
JSON:
"jsonAttributes": {
  "everyRowContainsObject": true,
  "encoding": "UTF-8"
}
XML:
"xmlAttributes": {
  "everyRowContainsObject": true,
  "encoding": "UTF-8"
}
Excel:
"xlsAttributes": {
  "worksheet": 0,
  "tempCsvFileDelimiter": ","
}
Unstructured:
"unstructuredAttributes": {
  "fileExtension": "pdf",
  "preserveFilename": true
}

Source > Database Attributes

FieldTypeRequiredDescription
typestringYespostgres, mysql, or mssql
postgresSecretsNamestringConditionalVault secret name for Postgres credentials
mssqlSecretsNamestringConditionalVault secret name for MSSQL credentials
mysqlSecretsNamestringConditionalVault secret name for MySQL credentials
cronExpressionstringYesCron schedule for polling (e.g., 0 */5 * * * ?)
databasestringNoDatabase name
schemastringNoSchema name
tablestringYes*Table to query (*unless sqlOverride is set)
includeFieldsarrayNoColumn whitelist
timestampFieldNamestringYes*Column for incremental pulls (*unless sqlOverride is set)
sqlOverridestringNoCustom SELECT query (replaces auto-generated query)
outputDelimiterstringNoDelimiter for CSV output (default ,)

Preprocessor

FieldTypeDefaultDescription
endpointstringRequiredURL of the preprocessing service
asyncbooleanfalseIf true, returns immediately
bearerTokenstringnullAuthorization bearer token
timeoutMsint300000Request timeout (milliseconds)

Data Quality

See Data Quality for detailed documentation.
FieldTypeDescription
aiRuleobjectAI rule — plain-English validation instruction
validateFileHeaderbooleanValidate CSV header matches schema field order
validationSchemastringPath to JSON Schema file for JSON/XML validation

Transformation

See AI Transformation for detailed documentation.
FieldTypeDescription
aiTransformationobjectAI transformation — plain-English transformation instruction

Destination > Object Store

The object store destination writes Parquet or ORC files to either the built-in MinIO (default) or to AWS S3. The provider is selected by the provider field; everything else is shared across both.
FieldTypeDefaultDescription
prefixKeystringRequiredPath prefix for output files under the bucket
fileFormatstringparquetOutput format: parquet or orc
partitionByarraynullColumn names for partitioning
destinationBucketOverridestringnullCustom bucket (default: {environment}-data for MinIO; required for S3)
deleteBeforeWritebooleanfalseDelete existing data at path before writing
writeToTemporaryLocationbooleanfalseWrite to temp location first
writeModestringappendappend, overwrite, ignore, or errorifexists
providerstringminiominio (built-in, default) or s3 (AWS S3)
endpointstringnullS3 endpoint URL override. Must use https://. Leave unset for the AWS regional default. Ignored for minio.
credentialsSecretstringnullFor provider=s3: name of a Platform-tab Vault secret containing accessKey, secretKey, region (and optionally sessionToken). Leave unset only when Datris runs on an AWS instance role.
For S3-specific behavior — credentials secret format, per-bucket credential isolation, multiple S3 destinations in one deployment — see the S3 Destination page.

Destination > Database

FieldTypeDefaultDescription
dbNamestringRequiredDatabase name
schemastringRequiredSchema name
tablestringRequiredTable name
keyFieldsarraynullPrimary key columns (enables upsert for MongoDB)
usePostgresbooleanfalseWrite to PostgreSQL
useMongoDBbooleanfalseWrite to MongoDB
manageTableManuallybooleanfalseIf false, auto-creates tables
truncateBeforeWritebooleanfalseTruncate table before loading
useTransactionbooleantrueWrap in a transaction
optionsarraynullCustom COPY options (e.g., ["FORMAT csv", "DELIMITER ','"])

Destination > Kafka

FieldTypeDefaultDescription
topicstringRequiredKafka topic name
keyFieldstringnullColumn to use as message key
overrideBootstrapServersstringnullCustom bootstrap servers
timeoutMsint10000Producer timeout

Destination > ActiveMQ

FieldTypeDescription
queueNamestringActiveMQ queue name

Destination > REST Endpoint

FieldTypeDefaultDescription
endpointstringRequiredURL to POST data to
asyncbooleanfalseIf true, doesn’t wait for response
bearerTokenstringnullAuthorization token
apiKeystringnullAPI key for authentication
timeoutSecondsint0Request timeout (seconds); 0 means unset
timeoutMsint300000Request timeout (milliseconds)

Destination > Qdrant

FieldTypeDescription
collectionNamestringQdrant collection name
chunkingobjectChunking config (see below)
metadataobjectKey-value metadata to attach to each chunk
embeddingSecretNamestringOptional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName.
qdrantSecretNamestringVault secret for Qdrant connection

Destination > Weaviate

FieldTypeDescription
classNamestringWeaviate class name
chunkingobjectChunking config (see below)
metadataobjectKey-value metadata to attach to each chunk
embeddingSecretNamestringOptional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName.
weaviateSecretNamestringVault secret for Weaviate connection

Destination > pgvector

FieldTypeDescription
tableNamestringPostgreSQL table name
schemaNamestringPostgreSQL schema name
chunkingobjectChunking config (see below)
metadataobjectKey-value metadata to attach to each chunk
embeddingSecretNamestringOptional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName.
postgresSecretNamestringVault secret for PostgreSQL connection

Destination > Milvus

FieldTypeDescription
collectionNamestringMilvus collection name
chunkingobjectChunking config (see below)
metadataobjectKey-value metadata to attach to each chunk
embeddingSecretNamestringOptional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName.
milvusSecretNamestringVault secret for Milvus connection

Destination > Chroma

FieldTypeDescription
collectionNamestringChroma collection name
chunkingobjectChunking config (see below)
metadataobjectKey-value metadata to attach to each chunk
embeddingSecretNamestringOptional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName.
chromaSecretNamestringVault secret for Chroma connection

Chunking Config

Used by all vector store destinations.
FieldTypeDefaultDescription
strategystringrecursiveChunking strategy: fixed, sentence, paragraph, recursive
chunkSizeint500Maximum characters per chunk
chunkOverlapint50Overlap between consecutive chunks
maxChunkTokensint0 (off)Optional token-count cap. When set, the chunker refuses to emit any chunk over this estimate, splitting oversized output before it ever reaches the embedding API. Recommended: ~80% of the embedding model’s input cap (e.g. 6500 for text-embedding-3-small). Without it, the server-side embedding guard catches oversized chunks as a fallback.
tokensPerCharRatiofloat2.0Chars-per-token ratio for the chunker’s heuristic counter. Lower is more conservative (more splits); raise to ~3.5 for predictable Latin prose.

Destination > Schema Properties (Optional)

Define a separate destination schema if column mapping differs from the source:
"destination": {
  "schemaProperties": {
    "dbName": "analytics",
    "fields": [
      {"name": "order_id", "type": "int"},
      {"name": "customer", "type": "string"}
    ]
  }
}

Multiple Destinations

A single pipeline can write to multiple destinations simultaneously. All destinations execute in parallel:
"destination": {
  "objectStore": { ... },
  "database": { ... },
  "kafka": { ... },
  "activeMQ": { ... },
  "restEndpoint": { ... }
}