Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.datris.ai/llms.txt

Use this file to discover all available pages before exploring further.

Pipelines are configured entirely through JSON. Each pipeline defines a source, optional processing steps (preprocessing, data quality, transformation), and one or more destinations. You don’t need to write JSON by hand — once you have the Datris Data Platform up and running, the Datris UI provides a step-by-step pipeline wizard that builds the full configuration for you. Just pick your source, destination, and any processing options, and the platform generates the JSON behind the scenes.

Full Configuration Example

{
  "name": "sales_data",
  "source": {
    "schemaProperties": {
      "fields": [
        {"name": "order_id", "type": "int"},
        {"name": "customer_name", "type": "string"},
        {"name": "amount", "type": "double"},
        {"name": "order_date", "type": "date"},
        {"name": "region", "type": "string"}
      ]
    },
    "fileAttributes": {
      "csvAttributes": {
        "delimiter": ",",
        "header": true,
        "encoding": "UTF-8"
      }
    }
  },
  "preprocessor": {
    "endpoint": "http://my-service:8080/preprocess",
    "async": false,
    "bearerToken": "my-token",
    "timeoutMs": 300000
  },
  "dataQuality": {
    "validateFileHeader": true,
    "aiRule": {
      "instruction": "Amount must be a positive number, all dates must be in YYYY-MM-DD format",
      "onFailureIsError": true
    },
  },
  "transformation": {
    "aiTransformation": {
      "instruction": "convert all date values to YYYY-MM-DD format. Trim whitespace from all columns. Remove duplicate rows."
    }
  },
  "destination": {
    "objectStore": {
      "prefixKey": "sales/daily",
      "fileFormat": "parquet",
      "partitionBy": ["region"],
      "deleteBeforeWrite": false,
      "writeMode": "append"
    },
    "database": {
      "dbName": "analytics",
      "schema": "public",
      "table": "sales",
      "keyFields": ["order_id"],
      "usePostgres": true,
      "truncateBeforeWrite": false,
      "useTransaction": true
    },
    "kafka": {
      "topic": "sales-events",
      "keyField": "order_id"
    }
  }
}

Configuration Fields

Top Level

FieldTypeRequiredDescription
namestringYesPipeline name (max 80 characters)
sourceobjectYesSource configuration
preprocessorobjectNoOptional REST endpoint called before processing
dataQualityobjectNoData validation rules
transformationobjectNoData transformation settings
destinationobjectYesOne or more output destinations
catalogstringNoFree-form label that groups related pipelines and taps in the Data Catalog. Empty or null = Uncataloged.

Source

FieldTypeRequiredDescription
schemaPropertiesobjectYes*Schema definition with field names and types. *Required for structured/semi-structured data
fileAttributesobjectNoFile format configuration (CSV, JSON, XML, XLS, unstructured)
databaseAttributesobjectNoDatabase pull configuration
streamAttributesobjectNoStream source configuration (Kafka)

Source > Schema Properties

FieldTypeDescription
dbNamestringDatabase name associated with this schema (used by some destinations)
fieldsarrayList of {"name": "...", "type": "..."} objects
Supported types: boolean, int, tinyint, smallint, bigint, float, double, decimal(p,s), string, varchar(n), char(n), date, timestamp

Source > File Attributes

CSV:
"csvAttributes": {
  "delimiter": ",",
  "header": true,
  "encoding": "UTF-8"
}
JSON:
"jsonAttributes": {
  "everyRowContainsObject": true,
  "encoding": "UTF-8"
}
XML:
"xmlAttributes": {
  "everyRowContainsObject": true,
  "encoding": "UTF-8"
}
Excel:
"xlsAttributes": {
  "worksheet": 0,
  "tempCsvFileDelimiter": ","
}
Unstructured:
"unstructuredAttributes": {
  "fileExtension": "pdf",
  "preserveFilename": true
}

Source > Database Attributes

FieldTypeRequiredDescription
typestringYespostgres, mysql, or mssql
postgresSecretsNamestringConditionalVault secret name for Postgres credentials
mssqlSecretsNamestringConditionalVault secret name for MSSQL credentials
mysqlSecretsNamestringConditionalVault secret name for MySQL credentials
cronExpressionstringYesCron schedule for polling (e.g., 0 */5 * * * ?)
databasestringNoDatabase name
schemastringNoSchema name
tablestringYes*Table to query (*unless sqlOverride is set)
includeFieldsarrayNoColumn whitelist
timestampFieldNamestringYes*Column for incremental pulls (*unless sqlOverride is set)
sqlOverridestringNoCustom SELECT query (replaces auto-generated query)
outputDelimiterstringNoDelimiter for CSV output (default ,)

Preprocessor

FieldTypeDefaultDescription
endpointstringRequiredURL of the preprocessing service
asyncbooleanfalseIf true, returns immediately
bearerTokenstringnullAuthorization bearer token
timeoutMsint300000Request timeout (milliseconds)

Data Quality

See Data Quality for detailed documentation.
FieldTypeDescription
aiRuleobjectAI rule — plain-English validation instruction
validateFileHeaderbooleanValidate CSV header matches schema field order
validationSchemastringPath to JSON Schema file for JSON/XML validation

Transformation

See AI Transformation for detailed documentation.
FieldTypeDescription
aiTransformationobjectAI transformation — plain-English transformation instruction

Destination > Object Store

FieldTypeDefaultDescription
prefixKeystringRequiredMinIO (S3) path prefix for output files
fileFormatstringparquetOutput format: parquet or orc
partitionByarraynullColumn names for partitioning
destinationBucketOverridestringnullCustom bucket (default: {environment}-data)
deleteBeforeWritebooleanfalseDelete existing data at path before writing
writeToTemporaryLocationbooleanfalseWrite to temp location first
writeModestringappendappend, overwrite, ignore, or errorifexists

Destination > Database

FieldTypeDefaultDescription
dbNamestringRequiredDatabase name
schemastringRequiredSchema name
tablestringRequiredTable name
keyFieldsarraynullPrimary key columns (enables upsert for MongoDB)
usePostgresbooleanfalseWrite to PostgreSQL
useMongoDBbooleanfalseWrite to MongoDB
manageTableManuallybooleanfalseIf false, auto-creates tables
truncateBeforeWritebooleanfalseTruncate table before loading
useTransactionbooleanfalseWrap in a transaction
optionsarraynullCustom COPY options (e.g., ["FORMAT csv", "DELIMITER ','"])

Destination > Kafka

FieldTypeDefaultDescription
topicstringRequiredKafka topic name
keyFieldstringnullColumn to use as message key
overrideBootstrapServersstringnullCustom bootstrap servers
timeoutMsint10000Producer timeout

Destination > ActiveMQ

FieldTypeDescription
queueNamestringActiveMQ queue name

Destination > REST Endpoint

FieldTypeDefaultDescription
endpointstringRequiredURL to POST data to
asyncbooleanfalseIf true, doesn’t wait for response
bearerTokenstringnullAuthorization token
timeoutMsint300000Request timeout (milliseconds)

Destination > Qdrant

FieldTypeDescription
collectionNamestringQdrant collection name
chunkingobjectChunking config (see below)
metadataobjectKey-value metadata to attach to each chunk
embeddingSecretNamestringOptional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName.
qdrantSecretNamestringVault secret for Qdrant connection

Destination > Weaviate

FieldTypeDescription
classNamestringWeaviate class name
chunkingobjectChunking config (see below)
metadataobjectKey-value metadata to attach to each chunk
embeddingSecretNamestringOptional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName.
weaviateSecretNamestringVault secret for Weaviate connection

Destination > pgvector

FieldTypeDescription
tableNamestringPostgreSQL table name
schemaNamestringPostgreSQL schema name
chunkingobjectChunking config (see below)
metadataobjectKey-value metadata to attach to each chunk
embeddingSecretNamestringOptional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName.
postgresSecretNamestringVault secret for PostgreSQL connection

Destination > Milvus

FieldTypeDescription
collectionNamestringMilvus collection name
chunkingobjectChunking config (see below)
metadataobjectKey-value metadata to attach to each chunk
embeddingSecretNamestringOptional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName.
milvusSecretNamestringVault secret for Milvus connection

Destination > Chroma

FieldTypeDescription
collectionNamestringChroma collection name
chunkingobjectChunking config (see below)
metadataobjectKey-value metadata to attach to each chunk
embeddingSecretNamestringOptional override of the embedding Vault secret. Defaults to the server-level ai.embedding.secretName.
chromaSecretNamestringVault secret for Chroma connection

Chunking Config

Used by all vector store destinations.
FieldTypeDefaultDescription
strategystringrecursiveChunking strategy: fixed, sentence, paragraph, recursive
chunkSizeint500Maximum characters per chunk
chunkOverlapint50Overlap between consecutive chunks

Destination > Schema Properties (Optional)

Define a separate destination schema if column mapping differs from the source:
"destination": {
  "schemaProperties": {
    "dbName": "analytics",
    "fields": [
      {"name": "order_id", "type": "int"},
      {"name": "customer", "type": "string"}
    ]
  }
}

Multiple Destinations

A single pipeline can write to multiple destinations simultaneously. All destinations execute in parallel:
"destination": {
  "objectStore": { ... },
  "database": { ... },
  "kafka": { ... },
  "activeMQ": { ... },
  "restEndpoint": { ... }
}