Documentation Index
Fetch the complete documentation index at: https://docs.datris.ai/llms.txt
Use this file to discover all available pages before exploring further.
Pipelines are configured entirely through JSON. Each pipeline defines a source, optional processing steps (preprocessing, data quality, transformation), and one or more destinations.
You don’t need to write JSON by hand — once you have the Datris Data Platform up and running, the Datris UI provides a step-by-step pipeline wizard that builds the full configuration for you. Just pick your source, destination, and any processing options, and the platform generates the JSON behind the scenes.
Full Configuration Example
{
"name": "sales_data",
"source": {
"schemaProperties": {
"fields": [
{"name": "order_id", "type": "int"},
{"name": "customer_name", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "order_date", "type": "date"},
{"name": "region", "type": "string"}
]
},
"fileAttributes": {
"csvAttributes": {
"delimiter": ",",
"header": true,
"encoding": "UTF-8"
}
}
},
"preprocessor": {
"endpoint": "http://my-service:8080/preprocess",
"async": false,
"bearerToken": "my-token",
"timeoutMs": 300000
},
"dataQuality": {
"validateFileHeader": true,
"aiRule": {
"instruction": "Amount must be a positive number, all dates must be in YYYY-MM-DD format",
"onFailureIsError": true
},
},
"transformation": {
"aiTransformation": {
"instruction": "convert all date values to YYYY-MM-DD format. Trim whitespace from all columns. Remove duplicate rows."
}
},
"destination": {
"objectStore": {
"prefixKey": "sales/daily",
"fileFormat": "parquet",
"partitionBy": ["region"],
"deleteBeforeWrite": false,
"writeMode": "append"
},
"database": {
"dbName": "analytics",
"schema": "public",
"table": "sales",
"keyFields": ["order_id"],
"usePostgres": true,
"truncateBeforeWrite": false,
"useTransaction": true
},
"kafka": {
"topic": "sales-events",
"keyField": "order_id"
}
}
}
Configuration Fields
Top Level
| Field | Type | Required | Description |
|---|
name | string | Yes | Pipeline name (max 80 characters) |
source | object | Yes | Source configuration |
preprocessor | object | No | Optional REST endpoint called before processing |
dataQuality | object | No | Data validation rules |
transformation | object | No | Data transformation settings |
destination | object | Yes | One or more output destinations |
catalog | string | No | Free-form label that groups related pipelines and taps in the Data Catalog. Empty or null = Uncataloged. |
Source
| Field | Type | Required | Description |
|---|
schemaProperties | object | Yes* | Schema definition with field names and types. *Required for structured/semi-structured data |
fileAttributes | object | No | File format configuration (CSV, JSON, XML, XLS, unstructured) |
databaseAttributes | object | No | Database pull configuration |
streamAttributes | object | No | Stream source configuration (Kafka) |
Source > Schema Properties
| Field | Type | Description |
|---|
dbName | string | Database name associated with this schema (used by some destinations) |
fields | array | List of {"name": "...", "type": "..."} objects |
Supported types: boolean, int, tinyint, smallint, bigint, float, double, decimal(p,s), string, varchar(n), char(n), date, timestamp
Source > File Attributes
CSV:
"csvAttributes": {
"delimiter": ",",
"header": true,
"encoding": "UTF-8"
}
JSON:
"jsonAttributes": {
"everyRowContainsObject": true,
"encoding": "UTF-8"
}
XML:
"xmlAttributes": {
"everyRowContainsObject": true,
"encoding": "UTF-8"
}
Excel:
"xlsAttributes": {
"worksheet": 0,
"tempCsvFileDelimiter": ","
}
Unstructured:
"unstructuredAttributes": {
"fileExtension": "pdf",
"preserveFilename": true
}
Source > Database Attributes
| Field | Type | Required | Description |
|---|
type | string | Yes | postgres, mysql, or mssql |
postgresSecretsName | string | Conditional | Vault secret name for Postgres credentials |
mssqlSecretsName | string | Conditional | Vault secret name for MSSQL credentials |
mysqlSecretsName | string | Conditional | Vault secret name for MySQL credentials |
cronExpression | string | Yes | Cron schedule for polling (e.g., 0 */5 * * * ?) |
database | string | No | Database name |
schema | string | No | Schema name |
table | string | Yes* | Table to query (*unless sqlOverride is set) |
includeFields | array | No | Column whitelist |
timestampFieldName | string | Yes* | Column for incremental pulls (*unless sqlOverride is set) |
sqlOverride | string | No | Custom SELECT query (replaces auto-generated query) |
outputDelimiter | string | No | Delimiter for CSV output (default ,) |
Preprocessor
| Field | Type | Default | Description |
|---|
endpoint | string | Required | URL of the preprocessing service |
async | boolean | false | If true, returns immediately |
bearerToken | string | null | Authorization bearer token |
timeoutMs | int | 300000 | Request timeout (milliseconds) |
Data Quality
See Data Quality for detailed documentation.
| Field | Type | Description |
|---|
aiRule | object | AI rule — plain-English validation instruction |
validateFileHeader | boolean | Validate CSV header matches schema field order |
validationSchema | string | Path to JSON Schema file for JSON/XML validation |
See AI Transformation for detailed documentation.
| Field | Type | Description |
|---|
aiTransformation | object | AI transformation — plain-English transformation instruction |
Destination > Object Store
| Field | Type | Default | Description |
|---|
prefixKey | string | Required | MinIO (S3) path prefix for output files |
fileFormat | string | parquet | Output format: parquet or orc |
partitionBy | array | null | Column names for partitioning |
destinationBucketOverride | string | null | Custom bucket (default: {environment}-data) |
deleteBeforeWrite | boolean | false | Delete existing data at path before writing |
writeToTemporaryLocation | boolean | false | Write to temp location first |
writeMode | string | append | append, overwrite, ignore, or errorifexists |
Destination > Database
| Field | Type | Default | Description |
|---|
dbName | string | Required | Database name |
schema | string | Required | Schema name |
table | string | Required | Table name |
keyFields | array | null | Primary key columns (enables upsert for MongoDB) |
usePostgres | boolean | false | Write to PostgreSQL |
useMongoDB | boolean | false | Write to MongoDB |
manageTableManually | boolean | false | If false, auto-creates tables |
truncateBeforeWrite | boolean | false | Truncate table before loading |
useTransaction | boolean | false | Wrap in a transaction |
options | array | null | Custom COPY options (e.g., ["FORMAT csv", "DELIMITER ','"]) |
Destination > Kafka
| Field | Type | Default | Description |
|---|
topic | string | Required | Kafka topic name |
keyField | string | null | Column to use as message key |
overrideBootstrapServers | string | null | Custom bootstrap servers |
timeoutMs | int | 10000 | Producer timeout |
Destination > ActiveMQ
| Field | Type | Description |
|---|
queueName | string | ActiveMQ queue name |
Destination > REST Endpoint
| Field | Type | Default | Description |
|---|
endpoint | string | Required | URL to POST data to |
async | boolean | false | If true, doesn’t wait for response |
bearerToken | string | null | Authorization token |
timeoutMs | int | 300000 | Request timeout (milliseconds) |
Destination > Qdrant
| Field | Type | Description |
|---|
collectionName | string | Qdrant collection name |
chunking | object | Chunking config (see below) |
metadata | object | Key-value metadata to attach to each chunk |
embeddingSecretName | string | Optional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName. |
qdrantSecretName | string | Vault secret for Qdrant connection |
Destination > Weaviate
| Field | Type | Description |
|---|
className | string | Weaviate class name |
chunking | object | Chunking config (see below) |
metadata | object | Key-value metadata to attach to each chunk |
embeddingSecretName | string | Optional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName. |
weaviateSecretName | string | Vault secret for Weaviate connection |
Destination > pgvector
| Field | Type | Description |
|---|
tableName | string | PostgreSQL table name |
schemaName | string | PostgreSQL schema name |
chunking | object | Chunking config (see below) |
metadata | object | Key-value metadata to attach to each chunk |
embeddingSecretName | string | Optional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName. |
postgresSecretName | string | Vault secret for PostgreSQL connection |
Destination > Milvus
| Field | Type | Description |
|---|
collectionName | string | Milvus collection name |
chunking | object | Chunking config (see below) |
metadata | object | Key-value metadata to attach to each chunk |
embeddingSecretName | string | Optional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName. |
milvusSecretName | string | Vault secret for Milvus connection |
Destination > Chroma
| Field | Type | Description |
|---|
collectionName | string | Chroma collection name |
chunking | object | Chunking config (see below) |
metadata | object | Key-value metadata to attach to each chunk |
embeddingSecretName | string | Optional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName. |
chromaSecretName | string | Vault secret for Chroma connection |
Chunking Config
Used by all vector store destinations.
| Field | Type | Default | Description |
|---|
strategy | string | recursive | Chunking strategy: fixed, sentence, paragraph, recursive |
chunkSize | int | 500 | Maximum characters per chunk |
chunkOverlap | int | 50 | Overlap between consecutive chunks |
Destination > Schema Properties (Optional)
Define a separate destination schema if column mapping differs from the source:
"destination": {
"schemaProperties": {
"dbName": "analytics",
"fields": [
{"name": "order_id", "type": "int"},
{"name": "customer", "type": "string"}
]
}
}
Multiple Destinations
A single pipeline can write to multiple destinations simultaneously. All destinations execute in parallel:
"destination": {
"objectStore": { ... },
"database": { ... },
"kafka": { ... },
"activeMQ": { ... },
"restEndpoint": { ... }
}