Beta. The Airflow provider is new. Core functionality is stable, but field
names and operator arguments may evolve based on feedback.
Datris fits the modern data stack as execution: Airflow is the orchestrator,
Datris runs the work. The airflow-provider-datris package ships an operator
that triggers a Datris tap, waits for the resulting pipeline to reach a terminal
state, streams Datris logs into the Airflow task log, and surfaces run tokens and
metrics as XComs.
No execution moves into Airflow — it only triggers and observes. If you don’t run
Airflow, nothing changes: Datris’s built-in scheduler keeps working as before.
Install
pip install airflow-provider-datris
The provider targets Airflow 3.x.
Add a connection of type Datris in the Airflow UI (or via env var / CLI):
| Field | Maps to | Required |
|---|
| Host | Datris base URL | yes |
| Password | API key (x-api-key) | only when the Datris install requires keys |
| Extra | {"verify": true, "timeout": 30} | no |
When is the API key required?
- Local / single-tenant with
useApiKeys=false (default): leave the password
blank — the header is ignored.
- Single-tenant with
useApiKeys=true, or a hosted/multi-tenant install:
set the password to an API key that holds the tap:run capability for the
taps you trigger.
API keys are capability-scoped. A key scoped tap:run:owner=self can only run
taps it created. For an Airflow connection, use a full-access key, or a key with
an unscoped tap:run capability that also created the taps it triggers. See
API Keys.
Trigger a tap
from datris_provider.operators import DatrisRunTapOperator
ingest_customers = DatrisRunTapOperator(
task_id="ingest_customers",
tap_name="customers_pg_to_snowflake",
datris_conn_id="datris_default",
wait_for_completion=True,
poll_interval=15,
)
The operator:
- Calls
POST /tap/run with mode=run.
- Polls
GET /pipeline/status?publishertoken=...&withrollup=true to a terminal
state.
- Streams Datris log lines into the Airflow task log on each poll.
- Pushes
publisher_token, pipeline_tokens, and final metrics
(row_count, duration_ms) as XComs.
- On Airflow task timeout or DAG cancellation, calls
POST /job/kill so the
Datris job transitions to CANCELLED.
Date-windowed backfills
Pass per-run params via tap_params (named to avoid Airflow’s reserved
params attribute). Each value is exposed to the tap script as an environment
variable, so the script can read a run-specific window. Values are
Jinja-templated by Airflow:
DatrisRunTapOperator(
task_id="ingest_daily",
tap_name="orders_api_to_postgres",
datris_conn_id="datris_default",
wait_for_completion=True,
tap_params={"since": "{{ ds }}", "until": "{{ next_ds }}"},
)
Inside the tap script, read them as env vars (e.g. os.environ["since"]).
One scheduler at a time
A tap should be scheduled by either Datris or Airflow — never both, or it
fires twice. The rule is simple and based on the tap’s cron:
- Tap has a cron schedule → Datris’s built-in scheduler owns it. The operator
refuses to trigger it and fails the task, so you can’t accidentally
double-fire.
- Tap has no cron (manual-only) → Airflow owns scheduling. The operator
triggers it normally.
So to drive a tap from Airflow, leave its cron empty in Datris (don’t enable
a schedule on the tap). If you see the operator fail with a “tap is
Datris-scheduled” error, remove the tap’s cron — or remove the Airflow DAG and
let Datris run it on its cron.
Retries and idempotency
Datris debounces rapid duplicate mode=run calls for the same tap. If a retry
lands inside the debounce window, /tap/run returns
status="skipped", persistedReason="debounced". The operator treats this as
success with a warning — the in-flight run is the real one, and failing the
task would mask it.
Troubleshooting
| Symptom | Cause / fix |
|---|
401/403 from /tap/run | Missing or wrong API key, or the key lacks tap:run for that tap. Check the connection password and key capabilities. |
| Task succeeds but no data loaded | The tap returned zero records, or has no target pipeline. Check the tap’s Run History in Datris. |
| Operator fails: “tap is Datris-scheduled” | The tap has a cron, so Datris already schedules it. Remove the tap’s cron to let Airflow drive it (or remove the DAG). |
params not visible in the script | Read them as environment variables, not function arguments. Nested values arrive JSON-encoded. |
What’s not covered yet
Deferrable/async operators, a standalone sensor, file-upload pipeline operators,
and Airflow Datasets / OpenLineage lineage are planned for a future release.