Skip to main content
Beta. The Airflow provider is new. Core functionality is stable, but field names and operator arguments may evolve based on feedback.
Datris fits the modern data stack as execution: Airflow is the orchestrator, Datris runs the work. The airflow-provider-datris package ships an operator that triggers a Datris tap, waits for the resulting pipeline to reach a terminal state, streams Datris logs into the Airflow task log, and surfaces run tokens and metrics as XComs. No execution moves into Airflow — it only triggers and observes. If you don’t run Airflow, nothing changes: Datris’s built-in scheduler keeps working as before.

Install

pip install airflow-provider-datris
The provider targets Airflow 3.x.

Configure a connection

Add a connection of type Datris in the Airflow UI (or via env var / CLI):
FieldMaps toRequired
HostDatris base URLyes
PasswordAPI key (x-api-key)only when the Datris install requires keys
Extra{"verify": true, "timeout": 30}no
When is the API key required?
  • Local / single-tenant with useApiKeys=false (default): leave the password blank — the header is ignored.
  • Single-tenant with useApiKeys=true, or a hosted/multi-tenant install: set the password to an API key that holds the tap:run capability for the taps you trigger.
API keys are capability-scoped. A key scoped tap:run:owner=self can only run taps it created. For an Airflow connection, use a full-access key, or a key with an unscoped tap:run capability that also created the taps it triggers. See API Keys.

Trigger a tap

from datris_provider.operators import DatrisRunTapOperator

ingest_customers = DatrisRunTapOperator(
    task_id="ingest_customers",
    tap_name="customers_pg_to_snowflake",
    datris_conn_id="datris_default",
    wait_for_completion=True,
    poll_interval=15,
)
The operator:
  • Calls POST /tap/run with mode=run.
  • Polls GET /pipeline/status?publishertoken=...&withrollup=true to a terminal state.
  • Streams Datris log lines into the Airflow task log on each poll.
  • Pushes publisher_token, pipeline_tokens, and final metrics (row_count, duration_ms) as XComs.
  • On Airflow task timeout or DAG cancellation, calls POST /job/kill so the Datris job transitions to CANCELLED.

Date-windowed backfills

Pass per-run params via tap_params (named to avoid Airflow’s reserved params attribute). Each value is exposed to the tap script as an environment variable, so the script can read a run-specific window. Values are Jinja-templated by Airflow:
DatrisRunTapOperator(
    task_id="ingest_daily",
    tap_name="orders_api_to_postgres",
    datris_conn_id="datris_default",
    wait_for_completion=True,
    tap_params={"since": "{{ ds }}", "until": "{{ next_ds }}"},
)
Inside the tap script, read them as env vars (e.g. os.environ["since"]).

One scheduler at a time

A tap should be scheduled by either Datris or Airflow — never both, or it fires twice. The rule is simple and based on the tap’s cron:
  • Tap has a cron schedule → Datris’s built-in scheduler owns it. The operator refuses to trigger it and fails the task, so you can’t accidentally double-fire.
  • Tap has no cron (manual-only) → Airflow owns scheduling. The operator triggers it normally.
So to drive a tap from Airflow, leave its cron empty in Datris (don’t enable a schedule on the tap). If you see the operator fail with a “tap is Datris-scheduled” error, remove the tap’s cron — or remove the Airflow DAG and let Datris run it on its cron.

Retries and idempotency

Datris debounces rapid duplicate mode=run calls for the same tap. If a retry lands inside the debounce window, /tap/run returns status="skipped", persistedReason="debounced". The operator treats this as success with a warning — the in-flight run is the real one, and failing the task would mask it.

Troubleshooting

SymptomCause / fix
401/403 from /tap/runMissing or wrong API key, or the key lacks tap:run for that tap. Check the connection password and key capabilities.
Task succeeds but no data loadedThe tap returned zero records, or has no target pipeline. Check the tap’s Run History in Datris.
Operator fails: “tap is Datris-scheduled”The tap has a cron, so Datris already schedules it. Remove the tap’s cron to let Airflow drive it (or remove the DAG).
params not visible in the scriptRead them as environment variables, not function arguments. Nested values arrive JSON-encoded.

What’s not covered yet

Deferrable/async operators, a standalone sensor, file-upload pipeline operators, and Airflow Datasets / OpenLineage lineage are planned for a future release.