Beta. Taps are a new feature and the API and UI may evolve based on user feedback. Core functionality is stable and production-ready, but we may add fields, rename properties, or adjust workflow steps in upcoming releases.
A Tap is an AI-generated Python script that fetches data from external sources — APIs, web scraping, databases, document repositories — and pushes it into a Datris pipeline. Describe what data you want in plain English, and Datris generates the script for you.
Taps can run on a schedule using CRON expressions, or be triggered manually. They support credentials management via Vault secrets and include AI-powered diagnosis when scripts fail, return zero records, or emit deprecation/warning output.
Tap Types
Every tap is one of three types. Choose the type when you create the tap; it can’t be changed later because the target pipeline shape differs.
- Structured/Semi-Structured — the script returns records that flow into a structured pipeline destination.
- Document Ingestion — the script returns
{uri, filename, content} dicts for each file; the pipeline handles text extraction, chunking, and embedding.
- I Have My Own Code — skip the AI and paste your own Python
fetch() script directly. The script still runs through the same test / auto-fix / post-run review / auto-optimize flow and can use any of the pre-installed packages or bring its own.
Pick Document Ingestion when the thing you want to ingest is a file (PDF, DOCX, HTML, etc.) and you want retrieval over its text. Pick Structured for everything else AI-generated. Pick I Have My Own Code when you already have a working Python script you want to schedule and feed a pipeline with.
| Structured/Semi-Structured | Document |
|---|
| Returns | A list of records (rows or semi-structured dicts) | A list of {uri, filename, content, ...} dicts — raw file bytes base64-encoded |
| Target pipeline | CSV / JSON / XML source → Postgres, MongoDB, object store, Kafka, vector store, etc. | unstructuredAttributes source → vector store (Qdrant, pgvector, Weaviate, Milvus, Chroma) |
| What the pipeline does | Validates schema, applies data-quality/transformation rules, writes records | Extracts text from each document, chunks it, embeds it, upserts vectors |
| Deduplication | Pipeline-level (truncate-before-write, upserts) | Per-document ledger — each uri + content-hash is processed once. See Document Taps |
| Chunking / embedding | N/A | Configured on the pipeline, not the tap |
I Have My Own Code produces either Structured or Document output depending on what the pasted fetch() returns — the shape rules in Data Types are what determine downstream routing, not the radio button. In practice pick this when you already have Python code you trust; if you change your mind and want AI help later, create a new tap.
Creating a Tap
The Tap creation wizard walks you through 4 steps — or 5, if you link the tap to a pipeline (the optional 5th step lets you run the tap immediately and push data through the pipeline before leaving the wizard).
Step 1: Describe
Step 1 is where everything except testing happens — you name the tap, describe what you want, attach credentials, and generate or paste the script.
Tap Name — A short identifier for the tap (e.g., weather-data or contracts-sharepoint). If the name matches an existing tap (and you’re not in edit mode), an amber warning appears under the field letting you know that continuing will overwrite the existing tap’s configuration and script. The warning is non-blocking — overwriting is allowed if that’s what you intend.
Tap Type — Three radio options:
- Structured/Semi-Structured (default) — AI generates a script that returns records flowing into a structured pipeline destination.
- Document Ingestion — AI generates a script that returns
{uri, filename, content} dicts for each file; the pipeline handles text extraction, chunking, and embedding.
- I Have My Own Code — skip the AI and paste your own Python
fetch() script. The AI brainstorm, Instruction textarea, and Generate Script button are all hidden; you see a full-width code textarea and a Use My Code button that uploads the pasted script to MinIO. Once uploaded, Next is enabled and the rest of the wizard (Edit & Test, Schedule, Review & Save) behaves identically to an AI-authored tap. Editing the textarea after upload invalidates the upload — you’ll see the button flip to Re-upload My Code and Next will be disabled until you re-click it.
Pick this first — the brainstorm prompts, the suggested env vars, and the Step 4 pipeline wizard all branch on this choice. The selector is locked in edit mode (change the type by creating a new tap).
Brainstorm with AI (Optional) — Not sure how to describe what you want? Use the chat panel above the instruction box. Type a rough idea and the AI will:
- Ask one focused clarifying question at a time
- Suggest specific data sources (e.g., yfinance for stocks, Open-Meteo for weather, Alpha Vantage for fundamentals)
- Recognize when you reference a Datris table (it knows the platform’s metadata and query endpoints)
- Auto-update the Instruction box below on every turn as the conversation progresses
- Suggest the environment variable names you’ll need (e.g.,
ALPHA_VANTAGE_API_KEY) when an external API requires authentication
When the AI suggests environment variables, a panel appears below the chat with the variable names as chips and a + Create tap secret with these keys button that pre-fills the credentials section with those keys — you just paste in the values.
You can stop chatting at any time and edit the Instruction box directly, or skip the brainstorm entirely and write the instruction yourself.
Instruction — The technical directive used to generate the Python script. Filled in by the brainstorm chat or written directly.
Credentials (Optional) — Select an existing tap secret, edit one, or create a new one. Secret keys are injected as environment variables at runtime — the AI generates code that reads them via os.environ.get('KEY_NAME'). Never hardcode credentials in scripts.
Generate Script — Click to have the AI produce a Python script with a fetch() function. The result appears below in a scrollable preview, along with any extra pip packages it needs.
The generated script can use these pre-installed packages:
requests, beautifulsoup4, pandas, lxml, feedparser
boto3 (AWS S3), google-cloud-storage (GCS), azure-storage-blob (Azure Blob)
openpyxl (Excel), pyyaml (YAML), python-dateutil, pytz
- Additional packages can be specified and are installed at runtime via pip. Datris caches installed packages in Docker volumes so subsequent runs reuse them instantly.
Example instructions:
- “Fetch current weather data for New York from the Open-Meteo API”
- “Retrieve daily stock prices for all S&P 500 companies from yfinance”
- “Get the latest news headlines from the BBC RSS feed”
- “Query the consumer_discretionary_earnings table on Datris to get tickers, then fetch historical earnings from Alpha Vantage”
Step 2: Edit & Test
Review and edit the generated script. You can:
- Modify the Python code directly
- Add or remove pip packages
- Edit the instruction and click Regenerate to produce a new script
- Copy the script to your clipboard with the Copy button next to the editor
- Click Test Script to execute and preview results. Preview renders as JSON (up to 100 records) in a scrollable pane
- If the script fails, an AI Diagnosis explains what went wrong and is auto-applied — the platform fixes the script and retests, up to 3 attempts. If all retries fail, the diagnosis stays visible for you to review and apply manually
- Click Stop Test to cancel an in-flight test (also halts the auto-fix chain)
Limit test sample (checkbox, default on) with an inline record-count input (default 20, minimum 1): when enabled, the runner injects DATRIS_TAP_TEST_LIMIT=<n> into the script process. Well-written tap scripts honor this env var to cap both their /api/v1/query/* reads and per-item iteration loops, making tests fast and low-cost. Cron and manual runs (via Run Tap) never set this env var, so production runs always read every row from every source.
Post-run script review
When a test passes, the platform first sends the working script and its captured stderr/stdout to the LLM for a functional review — not a performance pass. The reviewer looks for signals in the script’s own output that the script should behave differently:
- Rate-limit / throttle / burst warnings — add
time.sleep, lower max_workers, add adaptive backoff on HTTP 429
- Deprecation / migration hints — switch to the recommended replacement endpoint, method, or field
- Pagination / partial-response hints (“truncated”, “page 1 of 5”, “next_cursor=…”) — add pagination so the tap keeps fetching until the source says no more
- Schema-drift / auth warnings — update parsing for renamed fields; for auth errors, add a clear stderr note naming the suspect env var
If the reviewer finds a signal, it regenerates the script with the fix applied, the UI swaps it in, and the test auto-reruns. An amber banner then lists what the reviewer changed (e.g. Added 0.25s sleep between API calls — source reported burst pattern). When the reviewer rewrites the script, the perf optimizer below is skipped — correctness-from-output outranks speed on that pass. If the reviewer finds nothing worth changing, the flow falls through to the auto-optimizer.
This review step runs once per successful test, so the reviewer won’t loop on its own output. If you edit the script yourself and click Test again, the reviewer gets a fresh look at the new output.
Auto-optimize after a successful test
If the post-run review left the script unchanged, the platform sends the working script back to the LLM with the measured duration and record count, asks it to restructure for performance, and re-tests the rewrite:
- One pass per successful test. The optimizer is instructed to preserve
fetch(), DATRIS_TAP_TEST_LIMIT handling, error handling, and Vault env-var reads — it only changes how the script fetches, not what
- If the optimized re-test runs ≥20% slower than the original, or fails, the platform auto-reverts to the original script with no user action
- On success, an amber banner shows
"Auto-optimized: 12.4s → 1.1s (11.3×)" with a list of the changes the AI made (e.g. Parallelized ticker fetches with ThreadPoolExecutor(10), Removed per-item sleep) and a Revert to original link
- If you revert or edit the script, the auto-optimize pass won’t rerun until you click Generate again
The test must return records successfully before you can proceed to step 3.
Step 3: Schedule (Optional)
Set up automatic runs with a CRON expression:
- Presets: Every Hour, Daily (Midnight), Weekdays (Midnight), Weekly (Monday)
- Custom: Describe your schedule in plain English (e.g., “Every weekday at 4pm ET”) and click Generate to create the CRON expression
The Schedule is active checkbox controls whether the cron actually fires. Uncheck it to pause a tap without losing its schedule definition — useful when you want to temporarily stop a tap but keep its CRON expression intact for later.
Step 4: Review & Save
Review all settings (including a human-readable description of the CRON schedule, e.g. 0 0 16 ? * MON-FRI — at 4:00 PM, on MON-FRI).
From step 4 you can also link the tap to a pipeline so its output flows through validation, transformation, and into your destination automatically. Two options — both branches change shape based on the tap’s type:
Structured taps
- Attach to Pipeline — pick an existing pipeline whose source columns match the tap’s test output exactly. If columns don’t match, you’ll see exactly which columns are missing or extra so you can fix the mismatch before attaching.
- Generate Pipeline — create a new pipeline pre-wired to the tap’s output. The new pipeline’s destination is Postgres (
public.<tap_name>) for tabular data, or MongoDB for nested/document data. All columns are created as string for safe ingestion — tap output shape can vary across runs, and string columns ingest reliably regardless of what the script returns. You can promote individual columns to richer types in the pipeline editor once you’re confident about the data shape.
Document taps
- Attach to Pipeline — the picker is filtered to only compatible pipelines: those whose source is
unstructuredAttributes and whose destination is a vector store (Qdrant, pgvector, Weaviate, Milvus, Chroma). Column matching doesn’t apply — every document has the same {uri, filename, content} shape. The selection card shows the destination (e.g. pgvector → public.contracts).
- Generate Pipeline — creates an unstructured → vector-store pipeline. You pick:
- Vector store — only stores whose secret is configured and the service is reachable are listed (server probes each on modal open via
/api/v1/vector-stores/available). If exactly one is available it’s preselected; if none are, the modal blocks with a link to Configuration.
- Destination name — collection name (Qdrant / Milvus / Chroma), class name (Weaviate), or table + schema (pgvector).
- Chunking strategy — recursive (default), fixed, sentence, paragraph, or none. Plus chunk size (default 500 chars) and overlap (default 50). These live on the pipeline, not the tap.
- Embedding is inherited from the environment’s default embedding secret — no picker this pass.
The server rejects any save attempt that points a document tap at an incompatible pipeline (HTTP 400), regardless of where the save came from (UI, CLI, MCP, or raw API).
If a pipeline is linked when you click save, the wizard advances to step 5; otherwise it returns to the Taps list.
Step 5: Run the Tap (Optional)
Step 5 only appears when the tap is linked to a pipeline. It’s a one-time launch screen with two buttons:
- Run the Tap Now — runs the tap script immediately and pushes the records it returns into the linked pipeline. The pipeline applies any data quality and transformation rules and writes to its destination. After the run completes you’re returned to the Taps list.
- Done — skip the run and return to the Taps list. You can run the tap later from the Taps page or via the MCP
run_tap tool.
Querying Datris Data
Tap scripts can query data already stored in the Datris platform. The following environment variables are always automatically injected at runtime — your script does not need to fall back to defaults:
| Variable | Description |
|---|
DATRIS_POSTGRES_DATABASE | PostgreSQL database name (configured via postgres.database in application.yaml, or the tenant name in multi-tenant mode) |
DATRIS_MONGODB_DATABASE | MongoDB database name (configured via mongodb.database in application.yaml, or the tenant name in multi-tenant mode) |
DATRIS_PLATFORM_HOST | Hostname of the Datris platform (localhost inside the Docker container) |
DATRIS_PLATFORM_PORT | Port of the Datris platform (8080 by default) |
In single-tenant deployments the postgres and mongo database names may differ (postgres defaults to datris, mongo defaults to oss); in multi-tenant mode both resolve to the same tenant name. Always use the variable that matches the backend you are querying.
Scripts can call the Datris API directly using these:
Discover tables (PostgreSQL):
import os, requests
host = os.environ['DATRIS_PLATFORM_HOST']
port = os.environ['DATRIS_PLATFORM_PORT']
pg_db = os.environ['DATRIS_POSTGRES_DATABASE']
base_url = f'http://{host}:{port}/api/v1'
# List all tables
response = requests.get(f'{base_url}/metadata/postgres/tables',
params={'database': pg_db, 'schema': 'public'})
tables = response.json()
Column names returned by /api/v1/metadata/postgres/columns reflect the live table schema, which always satisfies the platform’s [A-Za-z0-9_]+ rule (see Column Naming Rules). You can use them directly in SELECT clauses without quoting.
Query PostgreSQL data:
response = requests.post(f'{base_url}/query/postgres', json={
'sql': 'SELECT * FROM public.my_table',
'database': pg_db
})
results = response.json()['results']
Query MongoDB data:
mongo_db = os.environ['DATRIS_MONGODB_DATABASE']
# List collections
response = requests.get(f'{base_url}/metadata/mongodb/collections',
params={'database': mongo_db})
collections = response.json()
# Query a collection
response = requests.post(f'{base_url}/query/mongodb', json={
'database': mongo_db,
'collection': 'my_collection',
'query': '{}'
})
results = response.json()['results']
The brainstorm AI knows about all of these endpoints, so if you mention “the X table on Datris” or “the Y collection on Datris” it will write the instruction to use the appropriate backend — no need to describe schema or column names; the script can introspect those at runtime via /api/v1/metadata/postgres/columns or /api/v1/metadata/mongodb/collections.
Credentials / Secrets
Tap secrets are stored in HashiCorp Vault and injected as environment variables when the script runs. They are managed directly in the tap wizard (Step 1).
- Select existing: Choose from previously created tap secrets
- Create new: Define key-value pairs inline (e.g.,
API_KEY=your-key)
- Edit existing: Modify an existing secret’s fields
- Use suggested keys: When the brainstorm AI suggests environment variables, click + Create tap secret with these keys to jump straight into the create form with the keys pre-filled
Tap secrets are tagged with _type=tap and only appear in the tap dropdown — not mixed with system secrets like database credentials.
Never hardcode credentials in scripts. Always use os.environ.get('KEY_NAME').
Running Taps
Test Run (from UI)
Click the play button on the taps list to open the Run page:
- Shows the tap instruction
- Send to pipeline checkbox (only if a target pipeline is configured)
- Displays script output, results table, and errors
- Run Again button for re-execution
CRON Schedule
Taps with a cronExpression run automatically. The scheduler checks every 30 seconds for taps that are due. A tap won’t fire if it’s already running. For a tap that has never run before, the scheduler anchors the “next valid cron time” to updatedAt or createdAt, so the first scheduled slot after the tap was saved fires on its own — no manual bootstrap run needed.
Sending Data to Pipelines
When “Send to pipeline” is checked:
- The tap executes the script
- Records are converted to the pipeline’s expected format (CSV or JSON)
- Data is fed through the pipeline’s processing chain (data quality, transformation, destinations)
Watching a Run
run_tap returns as soon as records are handed off — the pipeline load runs async after that. The response carries:
persisted: true/false with persistedReason when false (no_target_pipeline, test_mode, no_records, run_error, debounced) so callers know whether the data actually landed. debounced means another run for this same tap was triggered within the last 5 seconds — that earlier run is still executing; do not retry, just track it via get_tap_logs / get_pipeline_status.
publisherToken — a single ID covering every ingestion job this run submitted. Document taps fan out to many jobs per run; one publisherToken covers them all.
pipelineTokens — the per-job IDs (length 1 for structured, N for document).
recordCount — how many records the script produced.
run_tap does not return the records themselves. Use test_tap (or the Test Script button in the UI) when you need to preview what a script produces before pushing it — test_tap returns a sample of up to 20 rows with a recordsTruncated flag when trimmed.
Poll GET /api/v1/pipeline/status?publishertoken=...&withrollup=true (or the MCP get_pipeline_status tool, which sets withrollup=true automatically) until rollup.allDone is true, then read rollup.status for the outcome and rollup.jobs[].lastError for any failure detail.
Per-run Parameters
run_tap accepts an optional params object — a map of caller-supplied values that get injected into the script as environment variables for that one run only. Use this for values that vary per call (date windows, ticker lists, page cursors, batch sizes, geographic regions). Each key/value becomes an env var the script reads:
import os, json
start_date = os.environ.get("DATRIS_TAP_PARAM_start_date", "2026-01-01") # cron-safe default
end_date = os.environ.get("DATRIS_TAP_PARAM_end_date")
tickers = json.loads(os.environ.get("DATRIS_TAP_PARAM_tickers", "[]")) # JSON-encoded list
- Key constraints: must match
[A-Za-z_][A-Za-z0-9_]* so the keys map cleanly onto env var names. Anything else is rejected with an actionable error.
- Value handling: strings pass through; numbers/booleans are stringified; nested objects/arrays are JSON-encoded so the script can
json.loads() them back.
- Scheduled runs supply no params — cron-triggered runs have an empty params bag. Scripts MUST apply sensible defaults when the env var is absent.
Use params for things that change between runs; use secret_name for credentials that don’t. Rewriting a tap secret on every run to smuggle per-call values through is an anti-pattern — it clobbers concurrent runs, pollutes audit history, and wastes Vault writes.
Run Status Outcomes
Each tap run produces one of three states on its lastRunStatus field and matching run log entry:
| Status | Meaning |
|---|
success | Script ran, returned records, and the pipeline accepted them. The recordCount is non-zero. |
no_records | Script ran cleanly but returned zero records. Legitimate for polling/incremental taps on a quiet day (no new data since last run, weekend with markets closed, dedup-already-loaded). NOT counted as a failure in the Failures pane on the Ops Activity dashboard. |
failure | Script errored (Python exception, HTTP error from the source, timeout, output-size guard, etc.). The error field carries the message. |
Cron-scheduled runs that legitimately produce zero records (incremental tap that’s caught up, polling tap with no new entries) record no_records rather than failure and the Ops Activity dashboard surfaces them as healthy.
Output Size Guard
The platform caps tap-script output at tapMaxOutputMB (default 100 MB, settable via the TAP_MAX_OUTPUT_MB env var on the datris service). Runs that exceed the cap fail fast with an actionable error before the JSON is parsed — preventing the whole batch from buffering in JVM heap and OOM-killing the server.
If a backfill blows the cap, reduce the source range using run_tap params (shorter date window, smaller page, per-symbol chunks) and call again. Multiple smaller runs all land in the same destination pipeline; with keyFields configured on a Postgres or MongoDB pipeline, overlapping ranges upsert safely.
Data Types
The system automatically detects the data type from what fetch() returns:
| Return Value | Detected Type |
|---|
List of dicts with uri and content keys | Document (triggers the document-tap path) |
| List of flat dicts (all scalar values) | CSV |
| List of dicts with nested objects | JSON |
| List of lists/tuples | CSV |
| XML string | XML |
| Other string | Text |
The uri + content detection takes priority — if your script returns records with those keys, the runner routes every element through the document pipeline path even if the tap’s type is structured. In practice the type radio and the return shape should always agree; this rule just makes misconfigurations fail loudly rather than silently corrupt a structured destination.
Pipelines and Taps
The Pipelines page shows a Tap column for each pipeline, indicating which tap (if any) feeds it. Click the tap name to jump straight to that tap’s edit page.
On the Pipeline creation wizard, select From Tap to auto-populate the pipeline’s source type and schema from a tap’s test results. The tap’s data type and columns are used to configure the pipeline automatically.
Run History
Click the history icon on the taps list to view a tap’s run history. Each entry shows:
- Status (success/failure)
- Timestamp and duration (formatted using the configured
dateFormat and dateTimezone — see Configuration Reference)
- Record count
- Whether data was sent to a pipeline
- Expandable script output logs and error messages
AI agents can manage taps via MCP:
| Tool | Description |
|---|
create_tap | Create a tap from a plain-English instruction (AI generates the script), a user-provided script, or config only |
create_tap_secret | Create a tap secret the agent can then reference via secret_name on create_tap. Fails on name collision unless overwrite=true. Reserved AI-slot names are blocked. Agents can only overwrite secrets tagged _type=tap — human-owned secrets are refused. |
delete_tap_secret | Delete a tap secret. Only _type=tap secrets can be deleted by an agent; human-owned secrets must be removed from Configuration → Secrets in the UI. |
list_taps | List all taps with status, schedule, and last run info |
get_tap | Get full details of a single tap including its Python script |
run_tap | Execute a tap and push fetched data to the target pipeline. Response carries persisted, persistedReason, recordCount, publisherToken, pipelineTokens so the agent can confirm what landed. Records themselves are not returned — use test_tap to preview a script. |
test_tap | Test-run a tap without pushing data to the pipeline |
get_pipeline_status | Watch ingestion progress after a run_tap. Pass the response’s publisherToken to get status rows for every job this run submitted (one for structured taps, N for document taps). |
update_tap | Update a tap’s config (enabled, schedule, pipeline, description) without regenerating the script |
get_tap_logs | Get run history for a tap (last 50 entries). Each entry that submitted records includes its publisherToken, so an agent can pivot from a scheduled-run log entry to get_pipeline_status(publisher_token=...) and verify the run actually landed in the destination — not just that the script ran. |
get_tap_ledger | Document taps only: read the ledger of discovered documents (URI, filename, status, hashes, first/last seen timestamps). Pass clear_uri to force one file to be re-processed on the next run, or clear_all=true to force a full re-scan. |
delete_tap | Delete a tap and its stored script |
End-to-end agent flow when a tap needs credentials:
create_tap_secret(name="stripe-key", fields={"apiKey": "sk_..."})
create_tap(name="stripe-charges", script="...", secret_name="stripe-key")
CLI Commands
# List all taps
datris taps
# Create a tap with AI-generated script
datris tap create "Fetch weather data from Open-Meteo API" --pipeline weather --cron "0 0 * * * ?"
# Create a tap with your own script
datris tap create --script ./my_script.py --name my-tap --pipeline weather
# Create a tap config only (add script later)
datris tap create --name my-tap --pipeline weather
# Show tap details including script
datris tap show my-tap
# Test-run a tap (no data push)
datris tap test my-tap
# Run a tap manually
datris tap run my-tap
# View run history
datris tap logs my-tap
# Update a tap's schedule or config
datris tap update my-tap --cron "0 0 * * * ?" --enabled
# Disable a tap
datris tap update my-tap --disabled
# Delete a tap
datris tap delete my-tap
Configuration
| Setting | File | Default | Description |
|---|
tapScriptTimeoutSeconds | application.yaml | 300 | Maximum script execution time in seconds |
schedule.checkTapSchedules | application.yaml | 30000 | CRON scheduler poll interval in milliseconds |
dateFormat | application.yaml | yyyy-MM-dd HH:mm:ss z | SimpleDateFormat pattern used for tap run timestamps |
dateTimezone | application.yaml | UTC | IANA timezone (e.g., America/New_York) for displayed timestamps |
Document Taps
Document taps feed files (PDFs, DOCX, HTML, Markdown, etc.) into a vector-store pipeline. The pipeline owns text extraction, chunking, and embedding — the tap’s only job is discovery and retrieval.
Return shape
Each element of the list fetch() returns is a dict describing one document:
| Field | Required | Description |
|---|
uri | ✓ | Unique source identifier (URL, S3 key, file path). Primary key for change detection. |
filename | ✓ | Original filename with extension — the extension drives TextExtractorUtil’s parser choice (.pdf → PDFBox, .docx → POI, etc.). |
content | ✓ | Raw file bytes, base64-encoded: base64.b64encode(raw_bytes).decode('ascii'). Must be the complete file, not a chunk. |
content_hash | optional | Precomputed ETag or hash for change detection. If omitted, the platform computes SHA-256 of the decoded bytes. |
metadata | optional | Arbitrary dict[str, str] attached to the ledger entry (e.g., {"author": "ACME", "region": "EU"}). |
Rules the generator and users must follow:
- Return one entry per source file. Never chunk, split, or segment. Never decode the bytes.
- Don’t generate embeddings, pick embedding models, choose a vector store, or create tables — those are pipeline concerns.
- Don’t invent metadata fields that describe pipeline behavior (
chunk_size, embedding_model, target_table); those belong on the pipeline.
The AI brainstorm chat and script generator both enforce these rules: if you ask for chunking or embedding choices, the assistant redirects you to pipeline configuration.
The Document Ledger
Every document the tap successfully stages is recorded in a ledger — a MongoDB collection keyed by {tapName}|{uri}. On each run, the tap:
- Loads
uri → contentHash for every existing ledger entry owned by this tap.
- For each document returned by
fetch(), computes (or reads) its content hash.
- Skips the document if the ledger already has an entry with the same hash (just refreshes
lastSeenAt for operator visibility).
- Otherwise, stages the bytes to MinIO, writes a ledger entry (
status: staged), submits to the pipeline, and marks the entry processed (or failed on error).
This is what makes document taps safe to run on a schedule: re-runs only process new or changed files.
Ledger fields: uri, tapName, stagedPath (MinIO key), filename, contentHash, firstSeenAt, lastSeenAt, status (staged/processed/failed), metadata.
Managing the ledger:
- UI — the document-ledger button (page icon) next to a document tap in the Taps list opens a modal listing every entry. Actions: Clear All (force full re-scan on next run), or delete a single entry (force that one file to re-process).
- REST —
GET /api/v1/tap/ledger?name={tap}, DELETE /api/v1/tap/ledger?name={tap}&uri={uri} for a single entry, DELETE /api/v1/tap/ledger?name={tap} to clear all.
- MCP —
get_tap_ledger with optional clear_uri / clear_all.
Automatic cleanup:
- Deleting a tap clears its ledger entries and deletes all staged MinIO objects.
- Deleting a pipeline’s data (from the Pipelines page → Delete Data Only, or
DELETE /api/v1/pipeline?pipeline=X&deleteData=true) clears the ledger for every document tap pointed at that pipeline — so the next run re-ingests every file into the now-empty destination. Without this, taps would skip docs they already “processed” and the pipeline would stay empty.
Pipeline compatibility
The server validates at two points:
- On save (
POST /tap) — if tapType == "document" and targetPipeline is set, it checks the pipeline’s shape. Rejects with HTTP 400 if the source isn’t unstructuredAttributes or the destination isn’t a vector store.
- On feed (
TapRunner.feedDocumentPipeline) — re-checks before submitting bytes, so a pipeline that was reshaped after the tap was saved fails the run with a clear error instead of silently corrupting a destination.
Concurrency
Document taps feed many documents through StreamNotifier.process in rapid succession. Each becomes its own JobRunner, and all five vector-store loaders are now race-safe when multiple runners hit ensureCollection / ensureTable at the same time on a fresh pipeline:
- pgvector — transactional advisory lock plus
CREATE TABLE IF NOT EXISTS; serializes concurrent sessions at the schema-qualified table name.
- Qdrant / Milvus / Weaviate — try-create; on error, re-check existence and swallow if a concurrent runner won the race.
- Chroma — sends
get_or_create: true plus a re-GET fallback for older servers.
Script Requirements
The fetch() function must:
- Take no arguments
- Return a list of dictionaries (records) or a string (JSON/XML/text)
- Handle errors gracefully with try/except
- Include timeouts on network requests (30 seconds recommended)
- Return an empty list on failure rather than raising exceptions
- Use
os.environ.get() for any credentials
- Read
DATRIS_POSTGRES_DATABASE, DATRIS_MONGODB_DATABASE, DATRIS_PLATFORM_HOST, and DATRIS_PLATFORM_PORT directly without fallback defaults — they are always injected by the platform
- Column names are auto-normalized for tabular results (list of dicts). The platform converts each key to lowercase snake_case using only
[a-z0-9_] so that downstream pipeline registration succeeds and SQL queries don’t need quoting. Examples: EPS Estimate → eps_estimate, Surprise(%) → surprise_percent. You can return raw source column names — the platform will clean them — but for clarity in the test preview, prefer to emit clean keys directly. JSON/XML results destined for MongoDB are not normalized (they go through as raw blobs in the _json field). See Schema Definition → Column Naming Rules for the underlying validator rule.