Beta. Taps are a new feature and the API and UI may evolve based on user feedback. Core functionality is stable and production-ready, but we may add fields, rename properties, or adjust workflow steps in upcoming releases.
A Tap is an AI-generated Python script that fetches data from external sources — APIs, web scraping, databases — and pushes it into a Datris pipeline. Describe what data you want in plain English, and Datris generates the script for you.
Taps can run on a schedule using CRON expressions, or be triggered manually. They support credentials management via Vault secrets and include AI-powered diagnosis when scripts fail, return zero records, or emit deprecation/warning output.
Building several related taps from the same source? Try Discovery instead — it enumerates every dataset a source exposes, generates all the tap scripts in parallel, builds the matching pipelines, and groups everything in a Data Catalog. The Tap wizard below is best when you already know exactly which dataset you want.
Creating a Tap
The Tap creation wizard walks you through 4 steps — or 5, if you link the tap to a pipeline (the optional 5th step lets you run the tap immediately and push data through the pipeline before leaving the wizard).
Step 1: Describe
Step 1 is where everything except testing happens — you name the tap, describe what you want, attach credentials, and generate the script.
Tap Name — A short identifier for the tap (e.g., weather-data).
Brainstorm with AI (Optional) — Not sure how to describe what you want? Use the chat panel above the instruction box. Type a rough idea and the AI will:
- Ask one focused clarifying question at a time
- Suggest specific data sources (e.g., yfinance for stocks, Open-Meteo for weather, Alpha Vantage for fundamentals)
- Recognize when you reference a Datris table (it knows the platform’s metadata and query endpoints)
- Auto-update the Instruction box below on every turn as the conversation progresses
- Suggest the environment variable names you’ll need (e.g.,
ALPHA_VANTAGE_API_KEY) when an external API requires authentication
When the AI suggests environment variables, a panel appears below the chat with the variable names as chips and a + Create tap secret with these keys button that pre-fills the credentials section with those keys — you just paste in the values.
You can stop chatting at any time and edit the Instruction box directly, or skip the brainstorm entirely and write the instruction yourself.
Instruction — The technical directive used to generate the Python script. Filled in by the brainstorm chat or written directly.
Credentials (Optional) — Select an existing tap secret, edit one, or create a new one. Secret keys are injected as environment variables at runtime — the AI generates code that reads them via os.environ.get('KEY_NAME'). Never hardcode credentials in scripts.
Generate Script — Click to have the AI produce a Python script with a fetch() function. The result appears below in a scrollable preview, along with any extra pip packages it needs.
The generated script can use these pre-installed packages:
requests, beautifulsoup4, pandas, lxml, feedparser
- Additional packages can be specified and are installed at runtime via pip. Datris caches installed packages in Docker volumes so subsequent runs reuse them instantly.
Example instructions:
- “Fetch current weather data for New York from the Open-Meteo API”
- “Retrieve daily stock prices for all S&P 500 companies from yfinance”
- “Get the latest news headlines from the BBC RSS feed”
- “Query the consumer_discretionary_earnings table on Datris to get tickers, then fetch historical earnings from Alpha Vantage”
Step 2: Edit & Test
Review and edit the generated script. You can:
- Modify the Python code directly
- Add or remove pip packages
- Edit the instruction and click Regenerate to produce a new script
- Click Test Script to execute and preview results
- If errors occur, an AI Diagnosis explains what went wrong
- Click Apply Diagnosis to Script to auto-fix the issue
The test must return records successfully before you can proceed to step 3.
Step 3: Schedule (Optional)
Set up automatic runs with a CRON expression:
- Presets: Every Hour, Daily (Midnight), Weekdays (Midnight), Weekly (Monday)
- Custom: Describe your schedule in plain English (e.g., “Every weekday at 4pm ET”) and click Generate to create the CRON expression
The Schedule is active checkbox controls whether the cron actually fires. Uncheck it to pause a tap without losing its schedule definition — useful when you want to temporarily stop a tap but keep its CRON expression intact for later.
Step 4: Review & Save
Review all settings (including a human-readable description of the CRON schedule, e.g. 0 0 16 ? * MON-FRI — at 4:00 PM, on MON-FRI).
From step 4 you can also link the tap to a pipeline so its output flows through validation, transformation, and into your destination automatically. Two options:
- Attach to Pipeline — pick an existing pipeline whose source columns match the tap’s test output exactly. If columns don’t match, you’ll see exactly which columns are missing or extra so you can fix the mismatch before attaching.
- Generate Pipeline — create a new pipeline pre-wired to the tap’s output. The new pipeline’s destination is Postgres (
public.<tap_name>) for tabular data, or MongoDB for nested/document data. All columns are created as string for safe ingestion — tap output shape can vary across runs, and string columns ingest reliably regardless of what the script returns. You can promote individual columns to richer types in the pipeline editor once you’re confident about the data shape.
If a pipeline is linked when you click save, the wizard advances to step 5; otherwise it returns to the Taps list.
Step 5: Run the Tap (Optional)
Step 5 only appears when the tap is linked to a pipeline. It’s a one-time launch screen with two buttons:
- Run the Tap Now — runs the tap script immediately and pushes the records it returns into the linked pipeline. The pipeline applies any data quality and transformation rules and writes to its destination. After the run completes you’re returned to the Taps list.
- Done — skip the run and return to the Taps list. You can run the tap later from the Taps page or via the MCP
run_tap tool.
Querying Datris Data
Tap scripts can query data already stored in the Datris platform. The following environment variables are always automatically injected at runtime — your script does not need to fall back to defaults:
| Variable | Description |
|---|
DATRIS_POSTGRES_DATABASE | PostgreSQL database name (configured via postgres.database in application.yaml, or the tenant name in multi-tenant mode) |
DATRIS_MONGODB_DATABASE | MongoDB database name (configured via mongodb.database in application.yaml, or the tenant name in multi-tenant mode) |
DATRIS_PLATFORM_HOST | Hostname of the Datris platform (localhost inside the Docker container) |
DATRIS_PLATFORM_PORT | Port of the Datris platform (8080 by default) |
In single-tenant deployments the postgres and mongo database names may differ (postgres defaults to datris, mongo defaults to oss); in multi-tenant mode both resolve to the same tenant name. Always use the variable that matches the backend you are querying.
Scripts can call the Datris API directly using these:
Discover tables (PostgreSQL):
import os, requests
host = os.environ['DATRIS_PLATFORM_HOST']
port = os.environ['DATRIS_PLATFORM_PORT']
pg_db = os.environ['DATRIS_POSTGRES_DATABASE']
base_url = f'http://{host}:{port}/api/v1'
# List all tables
response = requests.get(f'{base_url}/metadata/postgres/tables',
params={'database': pg_db, 'schema': 'public'})
tables = response.json()
Column names returned by /api/v1/metadata/postgres/columns reflect the live table schema, which always satisfies the platform’s [A-Za-z0-9_]+ rule (see Column Naming Rules). You can use them directly in SELECT clauses without quoting.
Query PostgreSQL data:
response = requests.post(f'{base_url}/query/postgres', json={
'sql': 'SELECT * FROM public.my_table',
'database': pg_db
})
results = response.json()['results']
Query MongoDB data:
mongo_db = os.environ['DATRIS_MONGODB_DATABASE']
# List collections
response = requests.get(f'{base_url}/metadata/mongodb/collections',
params={'database': mongo_db})
collections = response.json()
# Query a collection
response = requests.post(f'{base_url}/query/mongodb', json={
'database': mongo_db,
'collection': 'my_collection',
'query': '{}'
})
results = response.json()['results']
The brainstorm AI knows about all of these endpoints, so if you mention “the X table on Datris” or “the Y collection on Datris” it will write the instruction to use the appropriate backend — no need to describe schema or column names; the script can introspect those at runtime via /api/v1/metadata/postgres/columns or /api/v1/metadata/mongodb/collections.
Credentials / Secrets
Tap secrets are stored in HashiCorp Vault and injected as environment variables when the script runs. They are managed directly in the tap wizard (Step 1).
- Select existing: Choose from previously created tap secrets
- Create new: Define key-value pairs inline (e.g.,
API_KEY=your-key)
- Edit existing: Modify an existing secret’s fields
- Use suggested keys: When the brainstorm AI suggests environment variables, click + Create tap secret with these keys to jump straight into the create form with the keys pre-filled
Tap secrets are tagged with _type=tap and only appear in the tap dropdown — not mixed with system secrets like database credentials.
Never hardcode credentials in scripts. Always use os.environ.get('KEY_NAME').
Running Taps
Test Run (from UI)
Click the play button on the taps list to open the Run page:
- Shows the tap instruction
- Send to pipeline checkbox (only if a target pipeline is configured)
- Displays script output, results table, and errors
- Run Again button for re-execution
CRON Schedule
Taps with a cronExpression run automatically. The scheduler checks every 30 seconds for taps that are due. A tap won’t run if it’s already running or has never been run before (first run must be manual).
Sending Data to Pipelines
When “Send to pipeline” is checked:
- The tap executes the script
- Records are converted to the pipeline’s expected format (CSV or JSON)
- Data is fed through the pipeline’s processing chain (data quality, transformation, destinations)
Data Types
The system automatically detects the data type from what fetch() returns:
| Return Value | Detected Type |
|---|
| List of flat dicts (all scalar values) | CSV |
| List of dicts with nested objects | JSON |
| List of lists/tuples | CSV |
| XML string | XML |
| Other string | Text |
Pipelines and Taps
The Pipelines page shows a Tap column for each pipeline, indicating which tap (if any) feeds it. Click the tap name to jump straight to that tap’s edit page.
On the Pipeline creation wizard, select From Tap to auto-populate the pipeline’s source type and schema from a tap’s test results. The tap’s data type and columns are used to configure the pipeline automatically.
Run History
Click the history icon on the taps list to view a tap’s run history. Each entry shows:
- Status (success/failure)
- Timestamp and duration (formatted using the configured
dateFormat and dateTimezone — see Configuration Reference)
- Record count
- Whether data was sent to a pipeline
- Expandable script output logs and error messages
AI agents can manage taps via MCP:
| Tool | Description |
|---|
create_tap | Create a tap from a plain-English instruction (AI generates the script), a user-provided script, or config only |
list_taps | List all taps with status, schedule, and last run info |
get_tap | Get full details of a single tap including its Python script |
run_tap | Execute a tap and push fetched data to the target pipeline |
test_tap | Test-run a tap without pushing data to the pipeline |
update_tap | Update a tap’s config (enabled, schedule, pipeline, description) without regenerating the script |
get_tap_logs | Get run history for a tap (last 50 entries) |
delete_tap | Delete a tap and its stored script |
CLI Commands
# List all taps
datris taps
# Create a tap with AI-generated script
datris tap create "Fetch weather data from Open-Meteo API" --pipeline weather --cron "0 0 * * * ?"
# Create a tap with your own script
datris tap create --script ./my_script.py --name my-tap --pipeline weather
# Create a tap config only (add script later)
datris tap create --name my-tap --pipeline weather
# Show tap details including script
datris tap show my-tap
# Test-run a tap (no data push)
datris tap test my-tap
# Run a tap manually
datris tap run my-tap
# View run history
datris tap logs my-tap
# Update a tap's schedule or config
datris tap update my-tap --cron "0 0 * * * ?" --enabled
# Disable a tap
datris tap update my-tap --disabled
# Delete a tap
datris tap delete my-tap
Configuration
| Setting | File | Default | Description |
|---|
tapScriptTimeoutSeconds | application.yaml | 300 | Maximum script execution time in seconds |
schedule.checkTapSchedules | application.yaml | 30000 | CRON scheduler poll interval in milliseconds |
dateFormat | application.yaml | yyyy-MM-dd HH:mm:ss z | SimpleDateFormat pattern used for tap run timestamps |
dateTimezone | application.yaml | UTC | IANA timezone (e.g., America/New_York) for displayed timestamps |
Script Requirements
The fetch() function must:
- Take no arguments
- Return a list of dictionaries (records) or a string (JSON/XML/text)
- Handle errors gracefully with try/except
- Include timeouts on network requests (30 seconds recommended)
- Return an empty list on failure rather than raising exceptions
- Use
os.environ.get() for any credentials
- Read
DATRIS_POSTGRES_DATABASE, DATRIS_MONGODB_DATABASE, DATRIS_PLATFORM_HOST, and DATRIS_PLATFORM_PORT directly without fallback defaults — they are always injected by the platform
- Column names are auto-normalized for tabular results (list of dicts). The platform converts each key to lowercase snake_case using only
[a-z0-9_] so that downstream pipeline registration succeeds and SQL queries don’t need quoting. Examples: EPS Estimate → eps_estimate, Surprise(%) → surprise_percent. You can return raw source column names — the platform will clean them — but for clarity in the test preview, prefer to emit clean keys directly. JSON/XML results destined for MongoDB are not normalized (they go through as raw blobs in the _json field). See Schema Definition → Column Naming Rules for the underlying validator rule.