Skip to main content
The pipeline includes a built-in MCP (Model Context Protocol) server that lets AI agents interact with the platform natively. Any MCP-compatible agent — Claude Desktop, Claude Code, Cursor, or custom agentic frameworks — can discover database metadata, create and manage pipelines, upload files, monitor jobs, profile data, search vector databases, query structured data, answer questions with AI-powered RAG, and upload configuration files — all without custom integration code. The MCP server is a lightweight Python service that routes all operations through the pipeline’s REST API. It runs alongside the pipeline in Docker or locally for development.

Resources

The MCP server exposes resources that agents can read on demand for detailed documentation.
Resource URIDescription
datris://pipeline-config-referenceComplete reference for building pipeline configurations — all source types, data quality rules, transformations, and destination types with JSON examples
datris://tap-workflow-referenceCanonical reference for tap work: creation (instruction vs. script), per-run params with the script-side env-var contract, the scheduling rule with a Quartz CRON cookbook, the test-before-run validation discipline, the params-vs-secrets table, run flow + persistedReason handling, document-tap ledger, and outcome verification via publisherToken + get_tap_logs. Re-readable on demand mid-session whenever an agent needs to verify its mental model.

Available Tools

Pipeline Management

ToolDescription
list_pipelinesList all registered pipeline configurations
get_pipelineGet a specific pipeline configuration by name
create_pipelineCreate a pipeline from sample data (base64-encoded). Schema is auto-detected. Supports three destination categories: structured (postgres, mongodb), object store (Parquet or ORC to MinIO / AWS S3), and vector (pgvector, qdrant, weaviate, milvus, chroma). For S3, set provider: "s3" and reference a Platform-tab credentialsSecret. Optional catalog to group related pipelines.
delete_pipelineDelete a pipeline and its destination data
list_pipeline_versionsList a pipeline’s definition-change history (newest first): version, createdAt, createdBy, changeNote. An empty result means it hasn’t been edited since versioning was enabled — not that it has no version; the current number is the version field on list_pipelines / get_pipeline (≥ 1 for every pipeline)
get_pipeline_versionView one historical snapshot of a pipeline’s definition — the full config as it was at that version. Read-only
diff_pipeline_versionsCompare two pipeline versions — server-computed field-by-field config diff. version is the selected/newer snapshot, against the baseline
restore_pipeline_versionRoll a pipeline back (or forward) to a prior version. Append-only: writes the chosen snapshot as a new latest version; nothing is overwritten. “Rolling forward” is the same call with a higher version
upload_dataUpload data (base64-encoded) to a pipeline for processing. New CSV columns are auto-added to the schema (schema evolution). Returns pipeline token.
get_job_statusGet job status. Pass pipeline_token for a {rollup, events} response — poll rollup.allDone, then read rollup.status (success / warning / error) and per-job lastError. Pass pipeline_name for a paginated summary across recent jobs.
set_catalogSet or clear the catalog grouping label on an existing pipeline or tap. Pass exactly one of pipeline or tap.
kill_jobKill a running job by pipeline token
profile_dataAI-profile data (base64-encoded) with summary stats and suggested DQ rules
get_versionGet pipeline server version
check_service_healthCheck which backend services are up, down, or not configured (slow — use for diagnostics only)
Semantic search across any of the pipeline’s supported vector databases. Each tool takes a natural language query and returns the most similar document chunks with scores and metadata.
ToolDescription
search_qdrantSearch a Qdrant collection
search_weaviateSearch a Weaviate class
search_milvusSearch a Milvus collection
search_chromaSearch a Chroma collection
search_pgvectorSearch a pgvector PostgreSQL table

Database Queries

Read-only queries against the pipeline’s backend databases.
ToolDescription
query_postgresExecute a read-only SQL SELECT query against PostgreSQL
query_mongodbQuery a MongoDB collection with filter and projection
query_objectstoreRead rows from a pipeline’s Object Store destination (Parquet or ORC files in MinIO or AWS S3). Pass the pipeline name; the server resolves bucket, prefix, format, and credentials from the pipeline config.
query_naturalAsk a question in natural language — AI generates and executes SQL

Metadata Discovery

Explore the structure of PostgreSQL, MongoDB, and vector databases managed by the platform. Use these tools to understand what data is available before writing queries or running searches.
ToolDescription
list_postgres_databasesList all PostgreSQL databases
list_postgres_schemasList schemas in a PostgreSQL database
list_postgres_tablesList tables in a schema (supports vector-only filter)
list_postgres_columnsList columns and types for a specific table
list_mongodb_databasesList all MongoDB databases
list_mongodb_collectionsList collections (optionally filtered by database)
list_qdrant_collectionsList all collections in Qdrant
list_weaviate_classesList all classes in Weaviate
list_milvus_collectionsList all collections in Milvus
list_chroma_collectionsList all collections in Chroma
list_pgvector_collectionsList all pgvector tables in PostgreSQL

AI

ToolDescription
ai_answerAnswer a question using AI based on provided context (RAG)

Taps

ToolDescription
create_tapCreate a tap from an instruction (AI generates script), a user-provided script, or config only. Optionally set target pipeline, CRON schedule, secret name, and tap_type (structured or document)
list_tapsList all taps with status, tapType, target pipeline, schedule, and last run info
get_tapGet full details of a single tap including its Python script source
run_tapExecute a tap and push to the target pipeline. Accepts an optional params object — per-call values (date windows, ticker lists, page cursors, etc.) get injected as DATRIS_TAP_PARAM_<key> env vars the script can read for that one run only. Response carries persisted + persistedReason (success, no_records, run_error, no_target_pipeline, test_mode, debounced), recordCount, publisherToken, and pipelineTokens for watching async loads. Records themselves are not returned — use test_tap to preview what a script produces. Output exceeding tapMaxOutputMB (default 100MB) fails fast with an actionable error
test_tapTest-run a tap without pushing data to the pipeline. Call this before the first run_tap of any newly-created or just-updated script, and before setting a cron_expression on a never-validated script — setting a cron on an untested script ships a guaranteed-bad nightly run
update_tapUpdate a tap’s CONFIG (enabled, schedule, pipeline, description) without touching the script. To replace the SCRIPT itself, call create_tap again with the same name and a new script or instructioncreate_tap upserts by name
get_pipeline_statusWatch ingestion progress after run_tap. Pass the response’s publisherToken (covers every job the run submitted — structured = 1, document = N) or a single pipelineToken. Returns a rollup (with allDone, status, per-job lastError) plus the raw events; poll until rollup.allDone is true
wait_secondsSleep 1–120 seconds, then return. Use to pace polling of long-running pipeline work without burning tool calls — typical pattern is run_tap → poll → wait_seconds(5) → poll → wait_seconds(10) → poll → … with exponential backoff capped at 60s (120s only if progress is genuinely glacial). Reset to a short wait whenever a poll shows new jobs flipped to a terminal state
get_tap_logsGet run history for a tap (last 50 entries with status, duration, errors, and publisherToken for each run that submitted records). Pivot from a scheduled-run entry to get_pipeline_status(publisher_token=...) to confirm the run actually landed in the destination
get_tap_ledgerDocument taps only: read the ledger of discovered documents (URI, filename, status, hashes, timestamps). Pass clear_uri to force-reprocess one file, or clear_all=true to force a full re-scan
create_tap_secretCreate or update a Vault secret for a tap to read as env vars (tagged _type=tap). Fails on collision unless overwrite=true. Reserved AI-provider names are blocked
delete_tap_secretDelete a tap secret. Only _type=tap secrets can be removed — human-owned secrets must be deleted from Configuration → Secrets in the UI
list_tap_secretsList the names of existing tap secrets (secrets tagged _type=tap). Call before create_tap_secret to reuse a suitable existing secret rather than asking the user for credentials again
get_tap_secret_fieldsReturn the field NAMES (keys only — never values) of an existing tap secret. Use after list_tap_secrets to verify a candidate secret has the keys the tap script will need
delete_tapDelete a tap and its stored script (also clears the ledger and staged MinIO objects for document taps)
list_tap_versionsList a tap’s definition-change history (newest first): version, createdAt, createdBy, changeNote. The platform snapshots a tap’s config + script on every create/update. An empty result means the tap hasn’t been edited since versioning was enabled — not that it has no version; the current number is the version field on list_taps / get_tap (≥ 1 for every tap)
get_tap_versionView one historical snapshot of a tap’s definition — the full config and the pinned Python script as they were at that version. Read-only
diff_tap_versionsCompare two tap versions — server-computed field-by-field config diff plus a line-level script diff. version is the selected/newer snapshot, against the baseline
restore_tap_versionRoll a tap back (or forward) to a prior version. Append-only: writes the chosen snapshot as a new latest version (config + that version’s script); nothing is overwritten. “Rolling forward” is the same call with a higher version. Does not run the tap
For a document tap, create_tap requires the target_pipeline to have an unstructuredAttributes source and a vector-store destination (qdrant, pgvector, weaviate, milvus, or chroma). The server rejects mismatched pairings with HTTP 400.

Configuration

ToolDescription
upload_configUpload a JSON Schema config file (base64-encoded content)
update_secretUpdate an AI provider secret (anthropic, openai, ollama, embedding) to configure API keys
list_platform_secretsList the names of human-owned platform secrets (the Platform tab in Configuration → Secrets). These hold destination credentials and infrastructure connections. Read-only — agents cannot create, modify, or delete them.
get_platform_secret_fieldsReturn the field NAMES (no values) of a platform secret. Use after list_platform_secrets to verify a candidate has the required keys for a destination — e.g. an S3 credentialsSecret must contain accessKey, secretKey, region.

Monitoring Active Agents

The Datris UI includes an Agent Monitor tab that shows a live view of every agent currently connected to the MCP server and a streaming log of the tool calls each one is making. Agents are labeled using their MCP clientInfo.name — so Claude Desktop appears as claude-ai, Claude Code as claude-code, Cursor as cursor, and so on — with graceful fallbacks to tenant name, API-key name, or session id for clients that don’t supply one. See Monitoring → Agent Monitor for details.

Setup

Docker (automatic)

The MCP server starts automatically with docker-compose up in SSE mode on port 3000. No additional setup required.

Local (for Claude Desktop / Claude Code)

The MCP server is published on PyPI. Use uvx to run it directly:
uvx datris-mcp-server

Transport Modes

ModeUse CaseCommand
stdioClaude Desktop, Claude Code, local agentspython server.py
SSEDocker, remote agents, web clientspython server.py --sse --port 3000
Streamable HTTPRemote agents using the streamable HTTP transportpython server.py --streamable-http --port 3000

Configuring Claude

Client-side setup for Claude Desktop and Claude Code — including recommended and alternative transports, and example first prompts — lives on its own page: Configuring Claude.

Environment Variables

VariableDefaultDescription
DATRIS_API_URLhttp://localhost:8080Datris REST API server URL
REQUIRE_API_KEYfalseReject SSE/HTTP sessions that connect without x-api-key
All database connections, vector search, and embedding are handled by the pipeline server. The MCP server only needs the pipeline URL.

Authentication

The MCP server has no API key of its own. Each connecting agent sends an x-api-key header per session and the MCP server forwards it as-is to the Datris REST API on every tool call. Issue and manage agent keys from Configuration → API-Keys in the UI. When the Datris platform has USE_API_KEYS=true, set REQUIRE_API_KEY=true on the MCP server too — otherwise the MCP layer accepts unauthenticated connections that then fail with 401 at the REST tier. The pairing is intentional: turn both on, or neither. Each agent should connect with its own labeled key (e.g. claude-desktop, cursor, prod-agent) so its traffic appears under a distinct identity in the request logs and can be revoked independently. Issue from the UI:
  1. Configuration → API-Keys → Issue new key
  2. Label the key for that agent (claude-desktop, cursor, etc.)
  3. Pick a capability template (read-only, rag-builder, ops) or build a custom scope
  4. Copy the value once at issue time and paste it into the agent’s MCP client config
The platform’s capability framework enforces what each key is allowed to do — a read-only agent can’t create_tap even if its system prompt tells it to try. See API Keys for the full per-client pattern.

Example Agent Workflows

The platform’s canonical workflow is delivered to every connected agent through the MCP instructions field — agents don’t need to memorize it. These examples show the same flow applied to common tasks.Core rules the agent receives on connect: check what exists before creating (list_pipelines, list_taps); keep pipeline configs simple (source + destination only — never pass profile_data output into a pipeline config); after run_tap, read persisted and persistedReason before doing anything else; verify real completion via get_pipeline_status(publisher_token=...), not by the response body; set cron_expression on the tap when the user describes a recurrence — never propose external schedulers or shell snippets; call test_tap before run_tap and before setting a cron on any newly-created or newly-updated script; and only narrate work that actually completed in the current turn — if the tool calls didn’t fire, say so honestly rather than confabulating success.For long-form tap workflow guidance — params, scheduling, validation, error handling, the Quartz CRON cookbook — agents can re-read the datris://tap-workflow-reference resource on demand at any point in a session.

Ingest a CSV file directly

  1. Check for an existing pipelinelist_pipelines. If one already fits the data, skip to step 3.
  2. Create a pipelinecreate_pipeline with the CSV as sample data. Schema is auto-detected. Keep the config simple; only pass codegen_rule / codegen_transform if the user explicitly asks for validation or transformation.
  3. Uploadupload_data with the CSV content (base64).
  4. Verify it landedget_job_status with the pipelineToken returned from upload_data. Poll until rollup.allDone is true, then read rollup.status (success, warning, error). On failure, rollup.jobs[].lastError carries the failing process and message.

Onboard a new external data source via a tap

  1. Check existing tapslist_taps. If a tap already covers this source, run or test it directly.
  2. Create credentials — if the source needs an API key, call create_tap_secret with the credential fields (they become env vars inside the script).
  3. Create the tapcreate_tap with an instruction (AI generates the script) or with your own Python fetch() function (faster and more reliable). Pass secret_name to bind the credentials from step 2. For PDFs/Word/HTML into a vector-store pipeline, pass tap_type="document".
  4. Testtest_tap. If it fails, read the error, fix the script, and call create_tap again. Repeat until the test passes.
  5. Runrun_tap. Read the response’s persisted field first:
    • persisted: true → capture publisherToken and continue to step 6.
    • persisted: false → read persistedReason (no_target_pipeline, test_mode, run_error, no_records, debounced), tell the user exactly why, and stop. (run_tap does not return records — call test_tap if you need to preview what the script produces.) debounced means the same tap was triggered within the last 5 seconds; the earlier run is still executing — do not retry, look it up via get_tap_logs and poll get_pipeline_status instead.
  6. Verify it landedget_pipeline_status(publisher_token=...) and poll until rollup.allDone is true. Then rollup.status is the outcome (success, warning, error) and rollup.jobs[].lastError carries any failure detail. The tap log only tells you the script ran; the publisher token is how you confirm records actually reached the destination.
  7. Schedule (optional)update_tap with a cron_expression. For each scheduled run, pick the corresponding entry from get_tap_logs to get its publisherToken, then verify with step 6.

Build and query a RAG knowledge base from external documents

  1. Create a vector-store pipelinecreate_pipeline with a Qdrant / Weaviate / Milvus / pgvector / Chroma destination and an unstructuredAttributes source.
  2. Create a document tapcreate_tap with tap_type="document" and an instruction describing the document source (e.g., “ingest every PDF under legal-contracts/2026/ in S3”). If the source needs credentials, call create_tap_secret first.
  3. Test and runtest_tap, then run_tap. Read persisted / persistedReason exactly as above.
  4. Verify every document landedget_pipeline_status(publisher_token=...). Document taps fan out to N jobs (one per file) under one publisher token; poll until rollup.allDone is true. rollup.jobs[] has one entry per file with its own status and lastError.
  5. Inspect the ledger (optional)get_tap_ledger shows which files were discovered and processed. On subsequent runs, unchanged files are skipped automatically; pass clear_uri or clear_all=true to force a re-scan.
  6. Searchsearch_qdrant (or the matching tool for your destination) to retrieve relevant chunks.
  7. Answerai_answer with the retrieved chunks as context and the user’s question.

Discover and query existing data

  1. List databaseslist_postgres_databases.
  2. List schemaslist_postgres_schemas for the target database.
  3. List tableslist_postgres_tables (supports a vector-only filter).
  4. Inspect columnslist_postgres_columns to understand structure.
  5. Queryquery_postgres with a SELECT, or query_natural to ask a plain-English question and have the AI generate + run the SQL.

Cross-modal analysis (structured + vector)

  1. Vector searchsearch_pgvector (or another vector tool) for relevant document chunks.
  2. Structured queryquery_postgres or query_natural for related metrics.
  3. Combine — merge the two result sets in the response.

Automated quality monitoring of scheduled taps

  1. List tapslist_taps to find the tap of interest.
  2. Read recent runsget_tap_logs returns the last 50 entries with status, record count, duration, errors, and publisherToken for each run that submitted records.
  3. Verify completion — for any run of interest, call get_pipeline_status(publisher_token=...) and read rollup.status to confirm records actually landed in the destination. The tap log tells you the script ran; the publisher token tells you the load finished.
  4. Diagnose failures — on an errored job, read rollup.jobs[].lastError (processName + description) and either fix the tap (create_tap again with a corrected script) or retarget the pipeline (update_tap).

CLI Examples

The Datris CLI connects to the MCP server and provides the same capabilities from the terminal. See CLI for the full reference.
# Install
brew tap datris/tap
brew install datris

# Ingest a CSV into PostgreSQL
datris ingest sales-data.csv --dest postgres

# Ingest with AI validation and transformation
datris ingest trades.csv --dest postgres \
  --ai-validate "all prices must be positive and dates must be YYYY-MM-DD" \
  --ai-transform "convert dates to YYYY/MM/DD and uppercase all ticker symbols"

# Ingest into a vector store for RAG
datris ingest manual.pdf --dest pgvector

# Ingest + analyze in one command
datris ingest trades.csv --dest postgres --ai-analyze "What are the top 5 stocks by volume?"
datris ingest report.pdf --dest pgvector --ai-analyze "What was the company's revenue?"

# Analyze existing data (PostgreSQL, MongoDB, or vector stores)
datris analyze "top 5 stocks by volume" --table trades
datris analyze "What is the return policy?" --table support_docs --dest pgvector

# Query PostgreSQL directly
datris query "SELECT * FROM public.sales LIMIT 10"

# Semantic search (raw results)
datris search "quarterly revenue" --store pgvector --collection financial_docs

# List pipelines, check health, get status
datris pipelines
datris health
datris status my_pipeline