Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.datris.ai/llms.txt

Use this file to discover all available pages before exploring further.

The pipeline includes a built-in MCP (Model Context Protocol) server that lets AI agents interact with the platform natively. Any MCP-compatible agent — Claude Desktop, Claude Code, Cursor, or custom agentic frameworks — can discover database metadata, create and manage pipelines, upload files, monitor jobs, profile data, search vector databases, query structured data, answer questions with AI-powered RAG, and upload configuration files — all without custom integration code. The MCP server is a lightweight Python service that routes all operations through the pipeline’s REST API. It runs alongside the pipeline in Docker or locally for development.

Resources

The MCP server exposes resources that agents can read on demand for detailed documentation.
Resource URIDescription
datris://pipeline-config-referenceComplete reference for building pipeline configurations — all source types, data quality rules, transformations, and destination types with JSON examples

Available Tools

Pipeline Management

ToolDescription
list_pipelinesList all registered pipeline configurations
get_pipelineGet a specific pipeline configuration by name
create_pipelineCreate a pipeline from sample data (base64-encoded). Schema is auto-detected. Specify destination type. Optional catalog to group related pipelines.
delete_pipelineDelete a pipeline and its destination data
upload_dataUpload data (base64-encoded) to a pipeline for processing. New CSV columns are auto-added to the schema (schema evolution). Returns pipeline token.
get_job_statusGet job status. Pass pipeline_token for a {rollup, events} response — poll rollup.allDone, then read rollup.status (success / warning / error) and per-job lastError. Pass pipeline_name for a paginated summary across recent jobs.
set_catalogSet or clear the catalog grouping label on an existing pipeline or tap. Pass exactly one of pipeline or tap.
kill_jobKill a running job by pipeline token
profile_dataAI-profile data (base64-encoded) with summary stats and suggested DQ rules
get_versionGet pipeline server version
check_service_healthCheck which backend services are up, down, or not configured (slow — use for diagnostics only)
Semantic search across any of the pipeline’s supported vector databases. Each tool takes a natural language query and returns the most similar document chunks with scores and metadata.
ToolDescription
search_qdrantSearch a Qdrant collection
search_weaviateSearch a Weaviate class
search_milvusSearch a Milvus collection
search_chromaSearch a Chroma collection
search_pgvectorSearch a pgvector PostgreSQL table

Database Queries

Read-only queries against the pipeline’s backend databases.
ToolDescription
query_postgresExecute a read-only SQL SELECT query against PostgreSQL
query_mongodbQuery a MongoDB collection with filter and projection
query_naturalAsk a question in natural language — AI generates and executes SQL

Metadata Discovery

Explore the structure of PostgreSQL, MongoDB, and vector databases managed by the platform. Use these tools to understand what data is available before writing queries or running searches.
ToolDescription
list_postgres_databasesList all PostgreSQL databases
list_postgres_schemasList schemas in a PostgreSQL database
list_postgres_tablesList tables in a schema (supports vector-only filter)
list_postgres_columnsList columns and types for a specific table
list_mongodb_databasesList all MongoDB databases
list_mongodb_collectionsList collections (optionally filtered by database)
list_qdrant_collectionsList all collections in Qdrant
list_weaviate_classesList all classes in Weaviate
list_milvus_collectionsList all collections in Milvus
list_chroma_collectionsList all collections in Chroma
list_pgvector_collectionsList all pgvector tables in PostgreSQL

AI

ToolDescription
ai_answerAnswer a question using AI based on provided context (RAG)

Taps

ToolDescription
create_tapCreate a tap from an instruction (AI generates script), a user-provided script, or config only. Optionally set target pipeline, CRON schedule, secret name, and tap_type (structured or document)
list_tapsList all taps with status, tapType, target pipeline, schedule, and last run info
get_tapGet full details of a single tap including its Python script source
run_tapExecute a tap and push to the target pipeline. Response carries persisted + persistedReason, recordCount, publisherToken, and pipelineTokens for watching async loads. Records themselves are not returned — use test_tap to preview what a script produces
test_tapTest-run a tap without pushing data to the pipeline
update_tapUpdate a tap’s config (enabled, schedule, pipeline, description) without regenerating the script
get_pipeline_statusWatch ingestion progress after run_tap. Pass the response’s publisherToken (covers every job the run submitted — structured = 1, document = N) or a single pipelineToken. Returns a rollup (with allDone, status, per-job lastError) plus the raw events; poll until rollup.allDone is true
get_tap_logsGet run history for a tap (last 50 entries with status, duration, errors, and publisherToken for each run that submitted records). Pivot from a scheduled-run entry to get_pipeline_status(publisher_token=...) to confirm the run actually landed in the destination
get_tap_ledgerDocument taps only: read the ledger of discovered documents (URI, filename, status, hashes, timestamps). Pass clear_uri to force-reprocess one file, or clear_all=true to force a full re-scan
create_tap_secretCreate or update a Vault secret for a tap to read as env vars (tagged _type=tap). Fails on collision unless overwrite=true. Reserved AI-provider names are blocked
delete_tap_secretDelete a tap secret. Only _type=tap secrets can be removed — human-owned secrets must be deleted from the Secrets tab
delete_tapDelete a tap and its stored script (also clears the ledger and staged MinIO objects for document taps)
For a document tap, create_tap requires the target_pipeline to have an unstructuredAttributes source and a vector-store destination (qdrant, pgvector, weaviate, milvus, or chroma). The server rejects mismatched pairings with HTTP 400.

Discovery

ToolDescription
discover_sourceDiscover available datasets from any data source — Python package, API, website, or database. Chat with the AI to identify a source, then enumerate every dataset it exposes with parameters, auth requirements, and tapInstruction templates ready to feed into create_tap. Powers the same flow as the Discovery wizard

Configuration

ToolDescription
upload_configUpload a JSON Schema config file (base64-encoded content)
update_secretUpdate an AI provider secret (anthropic, openai, ollama, embedding) to configure API keys

Managed Service

Tools for signing up, upgrading, and monitoring hosted Datris instances. Available on the remote MCP endpoint at mcp.trial.datris.ai.
ToolDescription
signup_trialSign up for a free 14-day trial. Returns an API key and MCP endpoint URL. No authentication required.
upgrade_to_dedicatedUpgrade from shared trial to a dedicated instance. Returns a Stripe checkout URL.
check_upgrade_statusCheck provisioning status. Returns none, provisioning, or active with the new MCP endpoint.

Monitoring Active Agents

The Datris UI includes an Agents tab that shows a live view of every agent currently connected to the MCP server and a streaming log of the tool calls each one is making. Agents are labeled using their MCP clientInfo.name — so Claude Desktop appears as claude-ai, Claude Code as claude-code, Cursor as cursor, and so on — with graceful fallbacks to tenant name, API-key name, or session id for clients that don’t supply one. See Monitoring → Agent Monitor for details.

Setup

Docker (automatic)

The MCP server starts automatically with docker-compose up in SSE mode on port 3000. No additional setup required.

Local (for Claude Desktop / Claude Code)

The MCP server is published on PyPI. Use uvx to run it directly:
uvx datris-mcp-server

Transport Modes

ModeUse CaseCommand
stdioClaude Desktop, Claude Code, local agentspython server.py
SSEDocker, remote agents, web clientspython server.py --sse --port 3000

Configuring Claude

Client-side setup for Claude Desktop and Claude Code — including recommended and alternative transports, and example first prompts — lives on its own page: Configuring Claude.

Environment Variables

VariableDefaultDescription
DATRIS_API_URLhttp://localhost:8080Datris REST API server URL
REQUIRE_API_KEYfalseReject SSE/HTTP sessions that connect without x-api-key
All database connections, vector search, and embedding are handled by the pipeline server. The MCP server only needs the pipeline URL.

Authentication

The MCP server has no API key of its own. Each connecting agent sends an x-api-key header per session and the MCP server forwards it as-is to the Datris REST API on every tool call. Manage accepted keys in the Configuration UI’s Secrets tab — oss/api-keys (single-tenant) or api-key-mappings (multi-tenant).

Example Agent Workflows

The platform’s canonical workflow is delivered to every connected agent through the MCP instructions field — agents don’t need to memorize it. These examples show the same flow applied to common tasks.Core rules the agent receives on connect: check what exists before creating (list_pipelines, list_taps); keep pipeline configs simple (source + destination only — never pass profile_data output into a pipeline config); after run_tap, read persisted and persistedReason before doing anything else; and verify real completion via get_pipeline_status(publisher_token=...), not by the response body.

Ingest a CSV file directly

  1. Check for an existing pipelinelist_pipelines. If one already fits the data, skip to step 3.
  2. Create a pipelinecreate_pipeline with the CSV as sample data. Schema is auto-detected. Keep the config simple; only pass codegen_rule / codegen_transform if the user explicitly asks for validation or transformation.
  3. Uploadupload_data with the CSV content (base64).
  4. Verify it landedget_job_status with the pipelineToken returned from upload_data. Poll until rollup.allDone is true, then read rollup.status (success, warning, error). On failure, rollup.jobs[].lastError carries the failing process and message.

Onboard a new external data source via a tap

  1. Check existing tapslist_taps. If a tap already covers this source, run or test it directly.
  2. Discover (optional)discover_source with a natural-language question like “what datasets are available in yfinance?”. Returns a structured catalog with a tapInstruction template per dataset.
  3. Create credentials — if the source needs an API key, call create_tap_secret with the credential fields (they become env vars inside the script).
  4. Create the tapcreate_tap with an instruction (AI generates the script) or with your own Python fetch() function (faster and more reliable). Pass secret_name to bind the credentials from step 3. For PDFs/Word/HTML into a vector-store pipeline, pass tap_type="document".
  5. Testtest_tap. If it fails, read the error, fix the script, and call create_tap again. Repeat until the test passes.
  6. Runrun_tap. Read the response’s persisted field first:
    • persisted: true → capture publisherToken and continue to step 7.
    • persisted: false → read persistedReason (no_target_pipeline, test_mode, run_error, no_records, debounced), tell the user exactly why, and stop. (run_tap does not return records — call test_tap if you need to preview what the script produces.) debounced means the same tap was triggered within the last 5 seconds; the earlier run is still executing — do not retry, look it up via get_tap_logs and poll get_pipeline_status instead.
  7. Verify it landedget_pipeline_status(publisher_token=...) and poll until rollup.allDone is true. Then rollup.status is the outcome (success, warning, error) and rollup.jobs[].lastError carries any failure detail. The tap log only tells you the script ran; the publisher token is how you confirm records actually reached the destination.
  8. Schedule (optional)update_tap with a cron_expression. For each scheduled run, pick the corresponding entry from get_tap_logs to get its publisherToken, then verify with step 7.

Build and query a RAG knowledge base from external documents

  1. Create a vector-store pipelinecreate_pipeline with a Qdrant / Weaviate / Milvus / pgvector / Chroma destination and an unstructuredAttributes source.
  2. Create a document tapcreate_tap with tap_type="document" and an instruction describing the document source (e.g., “ingest every PDF under legal-contracts/2026/ in S3”). If the source needs credentials, call create_tap_secret first.
  3. Test and runtest_tap, then run_tap. Read persisted / persistedReason exactly as above.
  4. Verify every document landedget_pipeline_status(publisher_token=...). Document taps fan out to N jobs (one per file) under one publisher token; poll until rollup.allDone is true. rollup.jobs[] has one entry per file with its own status and lastError.
  5. Inspect the ledger (optional)get_tap_ledger shows which files were discovered and processed. On subsequent runs, unchanged files are skipped automatically; pass clear_uri or clear_all=true to force a re-scan.
  6. Searchsearch_qdrant (or the matching tool for your destination) to retrieve relevant chunks.
  7. Answerai_answer with the retrieved chunks as context and the user’s question.

Discover and query existing data

  1. List databaseslist_postgres_databases.
  2. List schemaslist_postgres_schemas for the target database.
  3. List tableslist_postgres_tables (supports a vector-only filter).
  4. Inspect columnslist_postgres_columns to understand structure.
  5. Queryquery_postgres with a SELECT, or query_natural to ask a plain-English question and have the AI generate + run the SQL.

Cross-modal analysis (structured + vector)

  1. Vector searchsearch_pgvector (or another vector tool) for relevant document chunks.
  2. Structured queryquery_postgres or query_natural for related metrics.
  3. Combine — merge the two result sets in the response.

Automated quality monitoring of scheduled taps

  1. List tapslist_taps to find the tap of interest.
  2. Read recent runsget_tap_logs returns the last 50 entries with status, record count, duration, errors, and publisherToken for each run that submitted records.
  3. Verify completion — for any run of interest, call get_pipeline_status(publisher_token=...) and read rollup.status to confirm records actually landed in the destination. The tap log tells you the script ran; the publisher token tells you the load finished.
  4. Diagnose failures — on an errored job, read rollup.jobs[].lastError (processName + description) and either fix the tap (create_tap again with a corrected script) or retarget the pipeline (update_tap).

CLI Examples

The Datris CLI connects to the MCP server and provides the same capabilities from the terminal. See CLI for the full reference.
# Install
brew tap datris/tap
brew install datris

# Ingest a CSV into PostgreSQL
datris ingest sales-data.csv --dest postgres

# Ingest with AI validation and transformation
datris ingest trades.csv --dest postgres \
  --ai-validate "all prices must be positive and dates must be YYYY-MM-DD" \
  --ai-transform "convert dates to YYYY/MM/DD and uppercase all ticker symbols"

# Ingest into a vector store for RAG
datris ingest manual.pdf --dest pgvector

# Ingest + analyze in one command
datris ingest trades.csv --dest postgres --ai-analyze "What are the top 5 stocks by volume?"
datris ingest report.pdf --dest pgvector --ai-analyze "What was the company's revenue?"

# Analyze existing data (PostgreSQL, MongoDB, or vector stores)
datris analyze "top 5 stocks by volume" --table trades
datris analyze "What is the return policy?" --table support_docs --dest pgvector

# Query PostgreSQL directly
datris query "SELECT * FROM public.sales LIMIT 10"

# Semantic search (raw results)
datris search "quarterly revenue" --store pgvector --collection financial_docs

# List pipelines, check health, get status
datris pipelines
datris health
datris status my_pipeline