Skip to main content
Beta. The Taps API is stable but may add fields or refine response shapes in upcoming releases. Existing endpoints will remain backward-compatible where possible.

List Taps

GET /api/v1/taps
Returns all registered taps. Response: JSON array of tap configurations.

Get Tap

GET /api/v1/tap?name={tapName}
Returns a single tap configuration plus the Python script body from MinIO. Parameters:
ParameterTypeDescription
namequeryTap name (required)
Response (success):
{
  "name": "stock-prices-tap",
  "description": "Fetch daily stock prices from yfinance",
  "scriptPath": "tap-scripts/stock-prices-tap_a1b2c3d4.py",
  "targetPipeline": "stock-prices-pipeline",
  "script": "def fetch():\n    ...",
  "packages": ["yfinance"],
  "enabled": true,
  "tapType": "structured"
}
Response when the stored script is missing from object storage:
{
  "name": "stock-prices-tap",
  "scriptPath": "tap-scripts/stock-prices-tap_a1b2c3d4.py",
  "script": null,
  "scriptMissing": true,
  ...
}
scriptMissing only appears when scriptPath is set but the object is absent — typically after an interrupted edit or re-generation wiped the old script before the new one landed. Distinguish from “never generated” (scriptPath null, script null, no scriptMissing key). The Edit Tap UI uses this flag to render a recoverable-state banner instead of the generic “Generate a script first” validation.

Create or Update Tap

POST /api/v1/tap
Content-Type: application/json
Body:
{
  "name": "stock-prices-tap",
  "description": "Fetch daily stock prices from yfinance",
  "scriptPath": "tap-scripts/stock-prices-tap_a1b2c3d4.py",
  "targetPipeline": "stock-prices-pipeline",
  "packages": ["yfinance"],
  "secretName": "my-api-creds",
  "cronExpression": "0 0 0 * * ?",
  "enabled": true,
  "tapType": "structured"
}
FieldTypeDefaultDescription
tapTypestring"structured""structured" or "document". Document taps return {uri, filename, content} dicts and require a targetPipeline with an unstructuredAttributes source and a vector-store destination. Cannot be changed once set.
catalogstringnullFree-form label that groups related taps and pipelines in the Data Catalog. Empty or null = Uncataloged.
Document-tap compatibility rule. When tapType == "document" and targetPipeline is non-empty, the server validates that the pipeline has:
  • source.fileAttributes.unstructuredAttributes != null
  • A vector-store destination (one of qdrant, pgvector, weaviate, milvus, chroma)
Mismatches return HTTP 400 with a message naming the violation. Also enforced at run time in TapRunner.feedDocumentPipeline in case the pipeline is reshaped after the tap is saved.

Delete Tap

DELETE /api/v1/tap?name={tapName}
Deletes the tap configuration and its script from MinIO. For document taps, also clears the ledger entries and deletes all staged MinIO objects for that tap.

Brainstorm (AI Chat)

POST /api/v1/tap/brainstorm
Content-Type: application/json
Multi-turn conversational endpoint that helps the user refine a vague tap idea into a clear instruction. The UI calls this on every message in the brainstorm chat panel of the tap creation wizard. The AI:
  • Asks one focused clarifying question at a time
  • Suggests specific data sources (e.g., yfinance, Alpha Vantage, Open-Meteo)
  • Recognizes Datris platform tables and uses the metadata/query endpoints
  • Returns an updated instruction draft on every turn so the UI can keep the instruction box in sync
  • Returns suggestedEnvVars when an external API requires authentication
Body:
{
  "messages": [
    {"role": "user", "content": "I'm looking for company earnings data"},
    {"role": "assistant", "content": "Which companies?..."},
    {"role": "user", "content": "Tickers from the consumer_discretionary_earnings table on Datris"}
  ],
  "currentDescription": "Optional: the current draft instruction so the AI has context",
  "tapType": "structured"
}
FieldTypeDefaultDescription
tapTypestring"structured"Selects the system prompt. "document" uses a document-tap-aware prompt that asks about document sources (SharePoint, S3, URL listings, etc.) and never asks about chunking, embedding, or vector-store details (those are pipeline concerns).
Response:
{
  "reply": "Got it. Which earnings data source — yfinance (free), Alpha Vantage, or Polygon.io?",
  "description": "Query the consumer_discretionary_earnings table via /api/v1/query/postgres to get the ticker list. (Source TBD.)",
  "suggestedEnvVars": []
}
After the user picks Alpha Vantage, a subsequent call returns:
{
  "reply": "Good choice. The script will need an Alpha Vantage API key — please create a tap secret with ALPHA_VANTAGE_API_KEY in the credentials section below.",
  "description": "Query the consumer_discretionary_earnings table via /api/v1/query/postgres to get tickers, then for each ticker call the Alpha Vantage EARNINGS endpoint and return historical earnings records. Use os.environ.get('ALPHA_VANTAGE_API_KEY') for authentication.",
  "suggestedEnvVars": ["ALPHA_VANTAGE_API_KEY"]
}
The AI never suggests DATRIS_POSTGRES_DATABASE, DATRIS_MONGODB_DATABASE, DATRIS_PLATFORM_HOST, or DATRIS_PLATFORM_PORT in suggestedEnvVars — those are always injected by the platform.

Generate Script (AI)

POST /api/v1/tap/generate
Content-Type: application/json
Uses AI to generate a Python fetch() script from a plain-English description. The system prompt branches on tapType — document taps get instructions to return {uri, filename, content} dicts and a different set of rules (no chunking, no embedding, no local-filesystem fallback). Body:
{
  "description": "Discover all PDFs in the S3 bucket 'contracts' under prefix 2026/",
  "tapName": "contracts-tap",
  "oldScriptPath": "tap-scripts/old_script.py",
  "secretName": "aws-creds",
  "tapType": "document"
}
FieldTypeDefaultDescription
tapTypestring"structured"Selects the system prompt. Set to "document" when generating a document-tap script.
Response:
{
  "script": "import requests\n\ndef fetch():\n    ...",
  "packages": ["yfinance"],
  "scriptPath": "tap-scripts/stock-prices-tap_a1b2c3d4.py"
}

Fix Script (AI Diagnosis)

POST /api/v1/tap/fix
Content-Type: application/json
Uses AI to fix a script based on a diagnosis of what went wrong. Body:
{
  "tapName": "stock-prices-tap",
  "script": "current script content...",
  "diagnosis": "The API endpoint is wrong...",
  "logs": "script output logs...",
  "error": "error message...",
  "oldScriptPath": "tap-scripts/old_script.py"
}
Response: Same format as Generate Script.

Test Tap

POST /api/v1/tap/test?testLimit={N}
Content-Type: application/json
Executes the tap script without sending data to a pipeline. Returns results, logs, and AI diagnosis if issues detected. Query parameters:
NameTypeDescription
testLimitinteger (optional)When > 0, the runner injects DATRIS_TAP_TEST_LIMIT=<N> into the script process. Well-written tap scripts honor this env var to cap their source reads and iteration loops, making tests faster and lower-cost. Cron and manual runs (via /tap/run) never set this — production runs always read every row. The UI’s “Limit test sample to 20 records” checkbox in Create Tap → step 2 sends testLimit=20.
Body: A TapConfig JSON object (same as Create). Structured response (dataType: "csv" shown):
{
  "records": [{"ticker": "AAPL", "close": 255.92}, ...],
  "recordCount": 503,
  "dataType": "csv",
  "columns": ["ticker", "date", "open", "high", "low", "close", "volume"],
  "logs": "Retrieved 503 tickers...\nBatch 1/11...",
  "error": null,
  "aiExplanation": null,
  "durationMs": 12400
}
Document-tap response (dataType: "document"):
{
  "records": [
    {
      "uri": "s3://contracts/2026/acme-msa.pdf",
      "filename": "acme-msa.pdf",
      "content": "JVBERi0xLjQKJe…",
      "content_hash": "2f1c…",
      "metadata": {"source": "s3"}
    }
  ],
  "recordCount": 1,
  "dataType": "document",
  "columns": null,
  "logs": "Listed 1 object in s3://contracts/2026/",
  "error": null,
  "durationMs": 3100
}
If errors or 0 records are detected, aiExplanation contains an AI-generated diagnosis. durationMs is the end-to-end test wall time in milliseconds — this is what the Optimize Script endpoint uses as the baseline timing for its perf rewrite. For dataType: "csv", column names in columns and the keys inside each record in records are normalized by the platform: lowercase, [a-z0-9_] only, with % rewritten to percent. Source data with names like EPS Estimate or Surprise(%) will appear as eps_estimate and surprise_percent. JSON/XML results destined for MongoDB are not normalized. See Schema Definition → Column Naming Rules. Document-tap records pass through unchanged — the platform never rewrites URI, filename, content, or metadata keys.

Review Script (Post-Run)

POST /api/v1/tap/review
Content-Type: application/json
After a successful test, asks the LLM to scan the script’s captured stderr/stdout for signals that the script itself should change — not for performance. The reviewer looks for rate-limit / throttle / burst warnings, deprecation hints, pagination / partial-response cues, and schema-drift / auth warnings. When a signal is found, the script is regenerated with the appropriate fix (add time.sleep, switch to the recommended endpoint, add pagination, update parsing for renamed fields) and persisted to MinIO as a new version. When no signal is found, the script is returned unchanged. The UI invokes this automatically before Optimize Script — if the reviewer rewrites the script, the optimizer is skipped on that pass (correctness from output outranks speed). Callers integrating directly should follow the same order: call /tap/review first, re-test on rewritten=true, and only call /tap/optimize when rewritten=false. Body:
{
  "tapName": "stock-prices-tap",
  "script": "current working script content...",
  "recordCount": 503,
  "durationMs": 298000,
  "logs": "Retrieved 503 tickers...\nSource warning: Burst pattern detected...",
  "oldScriptPath": "tap-scripts/stock-prices-tap_v3.py"
}
FieldTypeDescription
tapNamestringTap identifier. Used for script path versioning.
scriptstringThe current working script that passed its test.
recordCountintegerHow many records the last test returned.
durationMsintegerWall time of the last test run in ms (from /tap/test response).
logsstringCaptured script stderr/stdout — the reviewer matches signals liberally against this text.
oldScriptPathstringMinIO path of the current script. If rewritten, the new script is stored as a new version and the old path is retained for rollback.
Response:
{
  "script": "regenerated Python 3 script...",
  "packages": ["extra-package"],
  "scriptPath": "tap-scripts/stock-prices-tap_v4.py",
  "changes": [
    "Added 0.25s sleep between API calls — source reported burst pattern (5 rps limit)",
    "Lowered max_workers from 10 to 3"
  ],
  "rewritten": true
}
When rewritten is false, script / scriptPath match the input and changes is empty — the reviewer found nothing in the output worth acting on. Prompt fragments configured via Tap Prompt Fragments are auto-injected into the reviewer’s system prompt when their key or any alias appears in the script.

Optimize Script

POST /api/v1/tap/optimize
Content-Type: application/json
After a successful test, asks the LLM to restructure a working script for performance — e.g. swap serial HTTP calls for a ThreadPoolExecutor, reuse a requests.Session(), or drop unnecessary time.sleep() — while preserving correctness (fetch() signature, DATRIS_TAP_TEST_LIMIT handling, Vault env-var reads, retry/backoff behavior, and raise_for_status calls). The UI invokes this automatically after a green test in the Create Tap wizard, with a regression guard that auto-reverts if the rewrite runs ≥20% slower. Callers integrating this endpoint directly should implement the same re-test + revert pattern. Body:
{
  "tapName": "stock-prices-tap",
  "script": "current working script content...",
  "recordCount": 503,
  "durationMs": 298000,
  "logs": "Retrieved 503 tickers...",
  "oldScriptPath": "tap-scripts/stock-prices-tap_v3.py"
}
FieldTypeDescription
tapNamestringTap identifier. Used for script path versioning.
scriptstringThe current working script that passed its test.
recordCountintegerHow many records the last test returned. Used in the user prompt so the LLM can reason about per-record timing.
durationMsintegerWall time of the last test run in ms (from /tap/test response).
logsstringRecent script stderr output (last ~40 lines used).
oldScriptPathstringMinIO path of the current script. The optimized script is stored as a new version; the old path is retained so the caller can revert.
Response:
{
  "script": "optimized Python 3 script...",
  "packages": ["extra-package"],
  "scriptPath": "tap-scripts/stock-prices-tap_v4.py",
  "changes": [
    "Parallelized ticker fetches with ThreadPoolExecutor(10)",
    "Removed per-item 0.25s sleep"
  ]
}
If the LLM concludes the script is already well-optimized, changes is an empty array and script / scriptPath match the input. In that case no re-test is needed.

Run Tap

POST /api/v1/tap/run
Content-Type: application/json
Executes a saved tap. Optionally sends data to the configured pipeline. Body:
{
  "name": "stock-prices-tap",
  "mode": "run"
}
Response:
{
  "tap": "stock-prices-tap",
  "description": "Fetch daily stock prices...",
  "status": "success",
  "mode": "run",
  "targetPipeline": "stock-prices",
  "persisted": true,
  "publisherToken": "5b2f4a1d-8c7e-4f0a-9b3d-6e1c2a4f8b9e",
  "pipelineTokens": ["a1b2c3d4-..."],
  "recordCount": 503,
  "dataType": "csv",
  "logs": "...",
  "error": null
}
mode=run does not return the records themselves — the data is in transit to targetPipeline, and the agent / caller should verify completion via get_pipeline_status, not read from the response body. recordCount summarizes how many records were submitted. To preview what a script produces, call the same endpoint with mode=test, which returns up to 20 sample rows in a records array (with recordsTruncated: true set when the script produced more than that). persisted: true means the records were submitted to targetPipeline. When persisted: false, a persistedReason field names the cause — one of test_mode, run_error, no_records, no_target_pipeline, or debounced. On persisted runs, publisherToken groups every ingestion job this run submitted; pipelineTokens lists each. Document taps fan out to many pipelineTokens but share one publisherToken.

Run debounce (mode=run only)

/tap/run debounces mode=run requests per tap on a 5-second window to suppress accidental duplicates — agents that emit parallel tool_use blocks, UI buttons that get double-clicked, or transport-level retries. If a second run hits within the window, the response is HTTP 200 with status: "skipped", persisted: false, persistedReason: "debounced", and an error string explaining how long ago the previous run started. The previous run keeps executing — pivot via get_tap_logs and get_pipeline_status to track its outcome. mode=test is read-only and is never debounced.

Watching a tap run

A /tap/run response comes back as soon as records are handed off to the async ingestion pipeline — the actual load is still in progress. Use publisherToken to poll:
GET /api/v1/pipeline/status?publishertoken={publisherToken}&withrollup=true
Returns a {rollup, events} wrapper covering every ingestion job this tap run submitted (for structured taps that’s one job; for document taps it’s one per submitted document). Poll until rollup.allDone is true, then read rollup.status for the outcome and rollup.jobs[].lastError for any failure. See the Pipeline Status API → Rollup Response for the full shape.

Generate CRON Expression (AI)

POST /api/v1/tap/cron
Content-Type: application/json
Converts a plain-English schedule description to a Quartz CRON expression. Body:
{
  "description": "Every weekday at 4pm ET"
}
Response:
{
  "cronExpression": "0 0 16 ? * MON-FRI"
}

Run History

GET /api/v1/tap/logs?name={tapName}
Returns the last 50 run log entries for a tap, sorted by most recent first. Response:
[
  {
    "tapName": "stock-prices-tap",
    "runTime": "2026-04-04T20:23:22Z",
    "status": "success",
    "recordCount": 503,
    "dataType": "csv",
    "logs": "Retrieved 503 tickers...",
    "error": null,
    "mode": "test",
    "durationMs": 45000
  }
]

Document Ledger

Document taps track which files they’ve already processed in a ledger — a MongoDB collection keyed by {tapName}|{uri}. See Document Taps → The Document Ledger for the concept; these endpoints manage it.

Read ledger

GET /api/v1/tap/ledger?name={tapName}
Returns every ledger entry owned by the tap. Response:
[
  {
    "uri": "s3://contracts/2026/acme-msa.pdf",
    "tapName": "contracts-tap",
    "stagedPath": "tap-docs/contracts-tap/a1b2c3d4_acme-msa.pdf",
    "filename": "acme-msa.pdf",
    "contentHash": "2f1c…",
    "firstSeenAt": "2026-04-18 13:42:38 EDT",
    "lastSeenAt": "2026-04-19 07:00:11 EDT",
    "status": "processed",
    "metadata": {"source": "s3", "region": "EU"}
  }
]

Delete one entry (force re-process)

DELETE /api/v1/tap/ledger?name={tapName}&uri={uri}
Removes a single entry and its staged MinIO object. The next tap run will re-ingest that specific document from source.

Clear the entire ledger (force full re-scan)

DELETE /api/v1/tap/ledger?name={tapName}
Removes every entry for the tap plus all staged MinIO objects. The next run re-ingests every document the source exposes. Also triggered automatically when:
  • The tap is deleted (DELETE /api/v1/tap).
  • The target pipeline’s data is cleared (DELETE /api/v1/pipeline?pipeline=X&deleteData=true). Without this, the tap would skip docs it already “processed” and the pipeline would stay empty.

Available Vector Stores

GET /api/v1/vector-stores/available
Returns the subset of [qdrant, weaviate, pgvector, milvus, chroma] whose Vault secret is present and whose service is currently reachable. Used by the document-tap pipeline wizard to drive the store picker. Secret presence alone isn’t sufficient — the dev stack seeds placeholder secrets for every store, so the endpoint actually probes each service. Response:
["pgvector"]
Probes use the same logic as /api/v1/health/services, with a 2-second timeout per store (~10 s worst case if everything is unreachable).

Tap Prompt Fragments

User-configured system prompt fragments that auto-inject into tap AI flows when the user’s text matches the fragment’s key or any alias. See Tap Prompt Fragments for the full concept; this section covers the REST endpoints. Fragments are per-tenant, stored in MongoDB at {env}-tap-prompt, and matched case-insensitively with word boundaries.

List Fragments

GET /api/v1/tap-prompts
Response: Array of TapPromptFragment objects.
[
  {
    "key": "AWS",
    "aliases": ["S3", "EC2", "Lambda", "boto3"],
    "content": "When fetching data from AWS services, prefer boto3 with IAM credentials...",
    "enabled": true,
    "createdAt": "2026-04-20 14:05:00 UTC",
    "updatedAt": "2026-04-20 14:05:00 UTC"
  }
]

Get Fragment

GET /api/v1/tap-prompts/{key}
Returns a single fragment or HTTP 404 if {key} is not found. {key} should be URL-encoded.

Create or Update Fragment

POST /api/v1/tap-prompts
Content-Type: application/json
Create-or-update semantics — if a fragment with the same key already exists, it is overwritten. createdAt is preserved across updates; updatedAt is refreshed on every write. Writing through this endpoint invalidates the in-process fragment cache immediately, so the next tap generation / fix / optimize / brainstorm call sees the change without restart. Body:
{
  "key": "Polygon",
  "aliases": ["polygon.io"],
  "content": "Use the requests library with os.environ.get(\"POLYGON_API_KEY\") and pass auth via the Authorization: Bearer {key} header (query param ?apiKey= also works but prefer the header).",
  "enabled": true
}
FieldTypeRequiredDescription
keystringYesPrimary match keyword. Unique per tenant. Matched case-insensitively with \b word boundaries.
aliasesstring[]NoAdditional match keywords. Same matching rules as key.
contentstringYesSystem-prompt text appended under a ## User-provided context block when a match fires.
enabledbooleanNo (default true)When false, the fragment stays in storage but is excluded from matching.
Response: {"status": "ok"} on success.

Delete Fragment

DELETE /api/v1/tap-prompts/{key}
Deletes the fragment and invalidates the cache. URL-encode {key}. Returns {"status": "ok"}.

Suggest Fragment Content

POST /api/v1/tap-prompts/suggest
Content-Type: application/json
Asks the LLM to draft a content body from the fragment’s key and aliases. Useful as a starting point when creating a fragment — the UI exposes this via the Suggest button next to the Content field. If content is non-empty in the request, the LLM is instructed to refine/expand it rather than replace. Body:
{
  "key": "Stripe",
  "aliases": ["stripe.com"],
  "content": ""
}
Response:
{
  "content": "Use the stripe Python SDK (pip install stripe). Set stripe.api_key = os.environ.get(\"STRIPE_API_KEY\"). Use auto_paging_iter() for list endpoints to transparently handle pagination..."
}
The response is plain text — the UI drops it into the Content textarea for the user to review before saving.

Authentication

All endpoints require the x-api-key header if API key authentication is enabled.