Beta. The Taps API is stable but may add fields or refine response shapes in upcoming releases. Existing endpoints will remain backward-compatible where possible.
List Taps
Returns all registered taps.
Response: JSON array of tap configurations.
Get Tap
GET /api/v1/tap?name={tapName}
Returns a single tap configuration plus the Python script body from MinIO.
Parameters:
| Parameter | Type | Description |
|---|
name | query | Tap name (required) |
Response (success):
{
"name": "stock-prices-tap",
"description": "Fetch daily stock prices from yfinance",
"scriptPath": "tap-scripts/stock-prices-tap_a1b2c3d4.py",
"targetPipeline": "stock-prices-pipeline",
"script": "def fetch():\n ...",
"packages": ["yfinance"],
"enabled": true,
"tapType": "structured"
}
Response when the stored script is missing from object storage:
{
"name": "stock-prices-tap",
"scriptPath": "tap-scripts/stock-prices-tap_a1b2c3d4.py",
"script": null,
"scriptMissing": true,
...
}
scriptMissing only appears when scriptPath is set but the object is absent — typically after an interrupted edit or re-generation wiped the old script before the new one landed. Distinguish from “never generated” (scriptPath null, script null, no scriptMissing key). The Edit Tap UI uses this flag to render a recoverable-state banner instead of the generic “Generate a script first” validation.
Create or Update Tap
POST /api/v1/tap
Content-Type: application/json
Body:
{
"name": "stock-prices-tap",
"description": "Fetch daily stock prices from yfinance",
"scriptPath": "tap-scripts/stock-prices-tap_a1b2c3d4.py",
"targetPipeline": "stock-prices-pipeline",
"packages": ["yfinance"],
"secretName": "my-api-creds",
"cronExpression": "0 0 0 * * ?",
"enabled": true,
"tapType": "structured"
}
| Field | Type | Default | Description |
|---|
tapType | string | "structured" | "structured" or "document". Document taps return {uri, filename, content} dicts and require a targetPipeline with an unstructuredAttributes source and a vector-store destination. Cannot be changed once set. |
catalog | string | null | Free-form label that groups related taps and pipelines in the Data Catalog. Empty or null = Uncataloged. |
Document-tap compatibility rule. When tapType == "document" and targetPipeline is non-empty, the server validates that the pipeline has:
source.fileAttributes.unstructuredAttributes != null
- A vector-store destination (one of
qdrant, pgvector, weaviate, milvus, chroma)
Mismatches return HTTP 400 with a message naming the violation. Also enforced at run time in TapRunner.feedDocumentPipeline in case the pipeline is reshaped after the tap is saved.
Delete Tap
DELETE /api/v1/tap?name={tapName}
Deletes the tap configuration and its script from MinIO. For document taps, also clears the ledger entries and deletes all staged MinIO objects for that tap.
Brainstorm (AI Chat)
POST /api/v1/tap/brainstorm
Content-Type: application/json
Multi-turn conversational endpoint that helps the user refine a vague tap idea into a clear instruction. The UI calls this on every message in the brainstorm chat panel of the tap creation wizard. The AI:
- Asks one focused clarifying question at a time
- Suggests specific data sources (e.g., yfinance, Alpha Vantage, Open-Meteo)
- Recognizes Datris platform tables and uses the metadata/query endpoints
- Returns an updated instruction draft on every turn so the UI can keep the instruction box in sync
- Returns suggestedEnvVars when an external API requires authentication
Body:
{
"messages": [
{"role": "user", "content": "I'm looking for company earnings data"},
{"role": "assistant", "content": "Which companies?..."},
{"role": "user", "content": "Tickers from the consumer_discretionary_earnings table on Datris"}
],
"currentDescription": "Optional: the current draft instruction so the AI has context",
"tapType": "structured"
}
| Field | Type | Default | Description |
|---|
tapType | string | "structured" | Selects the system prompt. "document" uses a document-tap-aware prompt that asks about document sources (SharePoint, S3, URL listings, etc.) and never asks about chunking, embedding, or vector-store details (those are pipeline concerns). |
Response:
{
"reply": "Got it. Which earnings data source — yfinance (free), Alpha Vantage, or Polygon.io?",
"description": "Query the consumer_discretionary_earnings table via /api/v1/query/postgres to get the ticker list. (Source TBD.)",
"suggestedEnvVars": []
}
After the user picks Alpha Vantage, a subsequent call returns:
{
"reply": "Good choice. The script will need an Alpha Vantage API key — please create a tap secret with ALPHA_VANTAGE_API_KEY in the credentials section below.",
"description": "Query the consumer_discretionary_earnings table via /api/v1/query/postgres to get tickers, then for each ticker call the Alpha Vantage EARNINGS endpoint and return historical earnings records. Use os.environ.get('ALPHA_VANTAGE_API_KEY') for authentication.",
"suggestedEnvVars": ["ALPHA_VANTAGE_API_KEY"]
}
The AI never suggests DATRIS_POSTGRES_DATABASE, DATRIS_MONGODB_DATABASE, DATRIS_PLATFORM_HOST, or DATRIS_PLATFORM_PORT in suggestedEnvVars — those are always injected by the platform.
Generate Script (AI)
POST /api/v1/tap/generate
Content-Type: application/json
Uses AI to generate a Python fetch() script from a plain-English description. The system prompt branches on tapType — document taps get instructions to return {uri, filename, content} dicts and a different set of rules (no chunking, no embedding, no local-filesystem fallback).
Body:
{
"description": "Discover all PDFs in the S3 bucket 'contracts' under prefix 2026/",
"tapName": "contracts-tap",
"oldScriptPath": "tap-scripts/old_script.py",
"secretName": "aws-creds",
"tapType": "document"
}
| Field | Type | Default | Description |
|---|
tapType | string | "structured" | Selects the system prompt. Set to "document" when generating a document-tap script. |
Response:
{
"script": "import requests\n\ndef fetch():\n ...",
"packages": ["yfinance"],
"scriptPath": "tap-scripts/stock-prices-tap_a1b2c3d4.py"
}
Fix Script (AI Diagnosis)
POST /api/v1/tap/fix
Content-Type: application/json
Uses AI to fix a script based on a diagnosis of what went wrong.
Body:
{
"tapName": "stock-prices-tap",
"script": "current script content...",
"diagnosis": "The API endpoint is wrong...",
"logs": "script output logs...",
"error": "error message...",
"oldScriptPath": "tap-scripts/old_script.py"
}
Response: Same format as Generate Script.
Test Tap
POST /api/v1/tap/test?testLimit={N}
Content-Type: application/json
Executes the tap script without sending data to a pipeline. Returns results, logs, and AI diagnosis if issues detected.
Query parameters:
| Name | Type | Description |
|---|
testLimit | integer (optional) | When > 0, the runner injects DATRIS_TAP_TEST_LIMIT=<N> into the script process. Well-written tap scripts honor this env var to cap their source reads and iteration loops, making tests faster and lower-cost. Cron and manual runs (via /tap/run) never set this — production runs always read every row. The UI’s “Limit test sample to 20 records” checkbox in Create Tap → step 2 sends testLimit=20. |
Body: A TapConfig JSON object (same as Create).
Structured response (dataType: "csv" shown):
{
"records": [{"ticker": "AAPL", "close": 255.92}, ...],
"recordCount": 503,
"dataType": "csv",
"columns": ["ticker", "date", "open", "high", "low", "close", "volume"],
"logs": "Retrieved 503 tickers...\nBatch 1/11...",
"error": null,
"aiExplanation": null,
"durationMs": 12400
}
Document-tap response (dataType: "document"):
{
"records": [
{
"uri": "s3://contracts/2026/acme-msa.pdf",
"filename": "acme-msa.pdf",
"content": "JVBERi0xLjQKJe…",
"content_hash": "2f1c…",
"metadata": {"source": "s3"}
}
],
"recordCount": 1,
"dataType": "document",
"columns": null,
"logs": "Listed 1 object in s3://contracts/2026/",
"error": null,
"durationMs": 3100
}
If errors or 0 records are detected, aiExplanation contains an AI-generated diagnosis. durationMs is the end-to-end test wall time in milliseconds — this is what the Optimize Script endpoint uses as the baseline timing for its perf rewrite.
For dataType: "csv", column names in columns and the keys inside each record in records are normalized by the platform: lowercase, [a-z0-9_] only, with % rewritten to percent. Source data with names like EPS Estimate or Surprise(%) will appear as eps_estimate and surprise_percent. JSON/XML results destined for MongoDB are not normalized. See Schema Definition → Column Naming Rules. Document-tap records pass through unchanged — the platform never rewrites URI, filename, content, or metadata keys.
Review Script (Post-Run)
POST /api/v1/tap/review
Content-Type: application/json
After a successful test, asks the LLM to scan the script’s captured stderr/stdout for signals that the script itself should change — not for performance. The reviewer looks for rate-limit / throttle / burst warnings, deprecation hints, pagination / partial-response cues, and schema-drift / auth warnings. When a signal is found, the script is regenerated with the appropriate fix (add time.sleep, switch to the recommended endpoint, add pagination, update parsing for renamed fields) and persisted to MinIO as a new version. When no signal is found, the script is returned unchanged.
The UI invokes this automatically before Optimize Script — if the reviewer rewrites the script, the optimizer is skipped on that pass (correctness from output outranks speed). Callers integrating directly should follow the same order: call /tap/review first, re-test on rewritten=true, and only call /tap/optimize when rewritten=false.
Body:
{
"tapName": "stock-prices-tap",
"script": "current working script content...",
"recordCount": 503,
"durationMs": 298000,
"logs": "Retrieved 503 tickers...\nSource warning: Burst pattern detected...",
"oldScriptPath": "tap-scripts/stock-prices-tap_v3.py"
}
| Field | Type | Description |
|---|
tapName | string | Tap identifier. Used for script path versioning. |
script | string | The current working script that passed its test. |
recordCount | integer | How many records the last test returned. |
durationMs | integer | Wall time of the last test run in ms (from /tap/test response). |
logs | string | Captured script stderr/stdout — the reviewer matches signals liberally against this text. |
oldScriptPath | string | MinIO path of the current script. If rewritten, the new script is stored as a new version and the old path is retained for rollback. |
Response:
{
"script": "regenerated Python 3 script...",
"packages": ["extra-package"],
"scriptPath": "tap-scripts/stock-prices-tap_v4.py",
"changes": [
"Added 0.25s sleep between API calls — source reported burst pattern (5 rps limit)",
"Lowered max_workers from 10 to 3"
],
"rewritten": true
}
When rewritten is false, script / scriptPath match the input and changes is empty — the reviewer found nothing in the output worth acting on. Prompt fragments configured via Tap Prompt Fragments are auto-injected into the reviewer’s system prompt when their key or any alias appears in the script.
Optimize Script
POST /api/v1/tap/optimize
Content-Type: application/json
After a successful test, asks the LLM to restructure a working script for performance — e.g. swap serial HTTP calls for a ThreadPoolExecutor, reuse a requests.Session(), or drop unnecessary time.sleep() — while preserving correctness (fetch() signature, DATRIS_TAP_TEST_LIMIT handling, Vault env-var reads, retry/backoff behavior, and raise_for_status calls).
The UI invokes this automatically after a green test in the Create Tap wizard, with a regression guard that auto-reverts if the rewrite runs ≥20% slower. Callers integrating this endpoint directly should implement the same re-test + revert pattern.
Body:
{
"tapName": "stock-prices-tap",
"script": "current working script content...",
"recordCount": 503,
"durationMs": 298000,
"logs": "Retrieved 503 tickers...",
"oldScriptPath": "tap-scripts/stock-prices-tap_v3.py"
}
| Field | Type | Description |
|---|
tapName | string | Tap identifier. Used for script path versioning. |
script | string | The current working script that passed its test. |
recordCount | integer | How many records the last test returned. Used in the user prompt so the LLM can reason about per-record timing. |
durationMs | integer | Wall time of the last test run in ms (from /tap/test response). |
logs | string | Recent script stderr output (last ~40 lines used). |
oldScriptPath | string | MinIO path of the current script. The optimized script is stored as a new version; the old path is retained so the caller can revert. |
Response:
{
"script": "optimized Python 3 script...",
"packages": ["extra-package"],
"scriptPath": "tap-scripts/stock-prices-tap_v4.py",
"changes": [
"Parallelized ticker fetches with ThreadPoolExecutor(10)",
"Removed per-item 0.25s sleep"
]
}
If the LLM concludes the script is already well-optimized, changes is an empty array and script / scriptPath match the input. In that case no re-test is needed.
Run Tap
POST /api/v1/tap/run
Content-Type: application/json
Executes a saved tap. Optionally sends data to the configured pipeline.
Body:
{
"name": "stock-prices-tap",
"mode": "run"
}
Response:
{
"tap": "stock-prices-tap",
"description": "Fetch daily stock prices...",
"status": "success",
"mode": "run",
"targetPipeline": "stock-prices",
"persisted": true,
"publisherToken": "5b2f4a1d-8c7e-4f0a-9b3d-6e1c2a4f8b9e",
"pipelineTokens": ["a1b2c3d4-..."],
"recordCount": 503,
"dataType": "csv",
"logs": "...",
"error": null
}
mode=run does not return the records themselves — the data is in transit to targetPipeline, and the agent / caller should verify completion via get_pipeline_status, not read from the response body. recordCount summarizes how many records were submitted. To preview what a script produces, call the same endpoint with mode=test, which returns up to 20 sample rows in a records array (with recordsTruncated: true set when the script produced more than that).
persisted: true means the records were submitted to targetPipeline. When persisted: false, a persistedReason field names the cause — one of test_mode, run_error, no_records, no_target_pipeline, or debounced. On persisted runs, publisherToken groups every ingestion job this run submitted; pipelineTokens lists each. Document taps fan out to many pipelineTokens but share one publisherToken.
Run debounce (mode=run only)
/tap/run debounces mode=run requests per tap on a 5-second window to suppress accidental duplicates — agents that emit parallel tool_use blocks, UI buttons that get double-clicked, or transport-level retries. If a second run hits within the window, the response is HTTP 200 with status: "skipped", persisted: false, persistedReason: "debounced", and an error string explaining how long ago the previous run started. The previous run keeps executing — pivot via get_tap_logs and get_pipeline_status to track its outcome. mode=test is read-only and is never debounced.
Watching a tap run
A /tap/run response comes back as soon as records are handed off to the async ingestion pipeline — the actual load is still in progress. Use publisherToken to poll:
GET /api/v1/pipeline/status?publishertoken={publisherToken}&withrollup=true
Returns a {rollup, events} wrapper covering every ingestion job this tap run submitted (for structured taps that’s one job; for document taps it’s one per submitted document). Poll until rollup.allDone is true, then read rollup.status for the outcome and rollup.jobs[].lastError for any failure. See the Pipeline Status API → Rollup Response for the full shape.
Generate CRON Expression (AI)
POST /api/v1/tap/cron
Content-Type: application/json
Converts a plain-English schedule description to a Quartz CRON expression.
Body:
{
"description": "Every weekday at 4pm ET"
}
Response:
{
"cronExpression": "0 0 16 ? * MON-FRI"
}
Run History
GET /api/v1/tap/logs?name={tapName}
Returns the last 50 run log entries for a tap, sorted by most recent first.
Response:
[
{
"tapName": "stock-prices-tap",
"runTime": "2026-04-04T20:23:22Z",
"status": "success",
"recordCount": 503,
"dataType": "csv",
"logs": "Retrieved 503 tickers...",
"error": null,
"mode": "test",
"durationMs": 45000
}
]
Document Ledger
Document taps track which files they’ve already processed in a ledger — a MongoDB collection keyed by {tapName}|{uri}. See Document Taps → The Document Ledger for the concept; these endpoints manage it.
Read ledger
GET /api/v1/tap/ledger?name={tapName}
Returns every ledger entry owned by the tap.
Response:
[
{
"uri": "s3://contracts/2026/acme-msa.pdf",
"tapName": "contracts-tap",
"stagedPath": "tap-docs/contracts-tap/a1b2c3d4_acme-msa.pdf",
"filename": "acme-msa.pdf",
"contentHash": "2f1c…",
"firstSeenAt": "2026-04-18 13:42:38 EDT",
"lastSeenAt": "2026-04-19 07:00:11 EDT",
"status": "processed",
"metadata": {"source": "s3", "region": "EU"}
}
]
Delete one entry (force re-process)
DELETE /api/v1/tap/ledger?name={tapName}&uri={uri}
Removes a single entry and its staged MinIO object. The next tap run will re-ingest that specific document from source.
Clear the entire ledger (force full re-scan)
DELETE /api/v1/tap/ledger?name={tapName}
Removes every entry for the tap plus all staged MinIO objects. The next run re-ingests every document the source exposes.
Also triggered automatically when:
- The tap is deleted (
DELETE /api/v1/tap).
- The target pipeline’s data is cleared (
DELETE /api/v1/pipeline?pipeline=X&deleteData=true). Without this, the tap would skip docs it already “processed” and the pipeline would stay empty.
Available Vector Stores
GET /api/v1/vector-stores/available
Returns the subset of [qdrant, weaviate, pgvector, milvus, chroma] whose Vault secret is present and whose service is currently reachable. Used by the document-tap pipeline wizard to drive the store picker. Secret presence alone isn’t sufficient — the dev stack seeds placeholder secrets for every store, so the endpoint actually probes each service.
Response:
Probes use the same logic as /api/v1/health/services, with a 2-second timeout per store (~10 s worst case if everything is unreachable).
Tap Prompt Fragments
User-configured system prompt fragments that auto-inject into tap AI flows when the user’s text matches the fragment’s key or any alias. See Tap Prompt Fragments for the full concept; this section covers the REST endpoints.
Fragments are per-tenant, stored in MongoDB at {env}-tap-prompt, and matched case-insensitively with word boundaries.
List Fragments
Response: Array of TapPromptFragment objects.
[
{
"key": "AWS",
"aliases": ["S3", "EC2", "Lambda", "boto3"],
"content": "When fetching data from AWS services, prefer boto3 with IAM credentials...",
"enabled": true,
"createdAt": "2026-04-20 14:05:00 UTC",
"updatedAt": "2026-04-20 14:05:00 UTC"
}
]
Get Fragment
GET /api/v1/tap-prompts/{key}
Returns a single fragment or HTTP 404 if {key} is not found. {key} should be URL-encoded.
Create or Update Fragment
POST /api/v1/tap-prompts
Content-Type: application/json
Create-or-update semantics — if a fragment with the same key already exists, it is overwritten. createdAt is preserved across updates; updatedAt is refreshed on every write. Writing through this endpoint invalidates the in-process fragment cache immediately, so the next tap generation / fix / optimize / brainstorm call sees the change without restart.
Body:
{
"key": "Polygon",
"aliases": ["polygon.io"],
"content": "Use the requests library with os.environ.get(\"POLYGON_API_KEY\") and pass auth via the Authorization: Bearer {key} header (query param ?apiKey= also works but prefer the header).",
"enabled": true
}
| Field | Type | Required | Description |
|---|
key | string | Yes | Primary match keyword. Unique per tenant. Matched case-insensitively with \b word boundaries. |
aliases | string[] | No | Additional match keywords. Same matching rules as key. |
content | string | Yes | System-prompt text appended under a ## User-provided context block when a match fires. |
enabled | boolean | No (default true) | When false, the fragment stays in storage but is excluded from matching. |
Response: {"status": "ok"} on success.
Delete Fragment
DELETE /api/v1/tap-prompts/{key}
Deletes the fragment and invalidates the cache. URL-encode {key}. Returns {"status": "ok"}.
Suggest Fragment Content
POST /api/v1/tap-prompts/suggest
Content-Type: application/json
Asks the LLM to draft a content body from the fragment’s key and aliases. Useful as a starting point when creating a fragment — the UI exposes this via the Suggest button next to the Content field. If content is non-empty in the request, the LLM is instructed to refine/expand it rather than replace.
Body:
{
"key": "Stripe",
"aliases": ["stripe.com"],
"content": ""
}
Response:
{
"content": "Use the stripe Python SDK (pip install stripe). Set stripe.api_key = os.environ.get(\"STRIPE_API_KEY\"). Use auto_paging_iter() for list endpoints to transparently handle pagination..."
}
The response is plain text — the UI drops it into the Content textarea for the user to review before saving.
Authentication
All endpoints require the x-api-key header if API key authentication is enabled.