Skip to main content

Pipeline Tokens

Every pipeline processing job is assigned a unique pipeline token — a UUID that identifies the job throughout its lifecycle. You can monitor jobs via the REST API, the MCP server, the CLI, or the Datris UI. Pipeline tokens are returned:
  • In the response body of POST /api/v1/pipeline/upload (for uncompressed files)
  • In the job status when querying by pipeline name

Job Status

Query by Pipeline Token

curl "http://localhost:8080/api/v1/pipeline/status?pipelinetoken=pt-abc12345-..."
Returns an array of status entries for the job, one per processing stage:
[
  {
    "id": 1,
    "dateTime": "2026-03-15T10:00:00Z",
    "pipeline": "sales_data",
    "processName": "StreamNotifier",
    "publisherToken": null,
    "pipelineToken": "pt-abc12345-...",
    "filename": "sales_data",
    "state": "begin",
    "code": "begin",
    "description": "Process started",
    "epoch": 1710500400000
  },
  {
    "id": 2,
    "dateTime": "2026-03-15T10:00:01Z",
    "pipeline": "sales_data",
    "processName": "DataQuality",
    "publisherToken": null,
    "pipelineToken": "pt-abc12345-...",
    "filename": "sales_data",
    "state": "processing",
    "code": "processing",
    "description": "Running CodeGen data quality rule",
    "epoch": 1710500401000
  },
  {
    "id": 3,
    "dateTime": "2026-03-15T10:00:03Z",
    "pipeline": "sales_data",
    "processName": "PostgresLoader",
    "publisherToken": null,
    "pipelineToken": "pt-abc12345-...",
    "filename": "sales_data",
    "state": "end",
    "code": "end",
    "description": "Process completed",
    "epoch": 1710500403000
  }
]

Query by Publisher Token

curl "http://localhost:8080/api/v1/pipeline/status?publishertoken=pub-abc12345-..."
Returns every status row whose publisherToken matches — covers all ingestion jobs a single caller submitted. Tap runs set a publisherToken on every job they spawn, so one query covers a structured tap (1 job) or a document tap (N jobs, one per file) in a single call. Use publisherToken when you need to watch “this entire run,” pipelineToken when you need the detail of one specific job. Add &withrollup=true to wrap the response in a {rollup, events} object that classifies each job (success, warning, error, processing, timed_out) and exposes rollup.allDone for a single boolean to poll on. See the status API reference for the full shape. Agents call the same query via the MCP get_pipeline_status tool — pass publisher_token from a run_tap response and poll until rollup.allDone is true. The MCP tool sets withrollup=true automatically. For an upload_data flow, get_job_status does the same with the pipelineToken returned from the upload.

Query by Pipeline Name

curl "http://localhost:8080/api/v1/pipeline/status?pipelinename=sales_data&page=1"
Returns an array of job summaries for the pipeline (20 per page):
[
  {
    "createdAtTimestamp": "2026-03-15T10:00:00Z",
    "createdAt": 1710500400000,
    "updatedAt": 1710500403000,
    "pipeline": "sales_data",
    "pipelineToken": "pt-abc12345-...",
    "process": "PostgresLoader",
    "startTime": "2026-03-15T10:00:00Z",
    "endTime": "2026-03-15T10:00:03Z",
    "totalTime": "3s",
    "status": "end"
  }
]

Job Lifecycle

Jobs progress through these states:
StateDescription
INITIALIZEDJob created, queued for processing
PROCESSINGRunning in a dedicated thread
COMPLETEDFinished (check status messages for success or error)
CANCELLEDJob was killed via the kill_job API or MCP tool

Processing Stages

Each job logs status messages as it progresses through stages:
  1. FileNotifier / StreamNotifier - Initial file or stream intake
  2. DataQuality - Validation (if configured)
  3. Transformation - Data transformation (if configured)
  4. JobRunner - Orchestration of destination loaders
  5. [LoaderName] - Each destination loader (e.g., PostgresLoader, SparkObjectStoreLoader)
Each stage logs begin, processing (with details), and end messages.

Status Storage

Job statuses are stored in MongoDB in the {environment}-pipeline-status collection. Each entry contains the pipeline token, process name, status, message, and timestamp.

Concurrent Job Handling

  • All destination loaders for a single job execute in parallel on a 20-thread pool
  • Jobs targeting the same database table are serialized (only one runs at a time)
  • Multiple jobs for different pipelines run concurrently

Datris UI

The Datris UI provides a visual interface for managing your entire Datris platform. Six top-level tabs — Assistant (in-product agent), MCP (server status, tools, and a pop-out Agent Monitor for live AI agent activity), Catalog (taps and pipelines grouped by catalog), Search (semantic search across vector destinations), Ops (ingestion activity dashboard, job history, file-upload ingestion), and Configuration (AI providers, secrets, API keys, users) — cover everything you can do through the API.

Ops Activity Dashboard

The Ops → Activity tab is the at-a-glance health view for your platform. It pulls every tap run and pipeline job in a rolling window (24h / 7d / 30d, switchable) and surfaces what landed, what’s broken, and what hasn’t refreshed when it should. KPI tiles across the top show total runs, successes, failures, and items ingested for the window. A failing tile turns red and links to the Failures pane below. Runs over time and Items ingested over time are stacked time-series charts. Runs are colored by outcome (success green, warning amber, error red) so a spike of failures is immediately visible against the success baseline. Failures pane lists every tap or pipeline that had a failure in the window, deduped down to one row per unique item with an (N attempts) count next to the reason. Items whose most-recent run is now healthy are marked ✓ recovered and dimmed at the bottom of the list. Each unrecovered row has a Re-run button:
  • Tap failure → Re-run triggers the tap directly.
  • Pipeline failure with an upstream tap → Re-run triggers that tap to retry the load. Direct-upload pipelines with no associated tap stay un-rerunnable from here (the user has to re-upload the source file).
While a re-run is in flight, the button swaps to an amber spinner with “Running…” and disables itself; on completion the dashboard reloads and the row either disappears (full recovery) or shows the updated count. Per-pipeline volume is a 7-day rolling table at the bottom — each row a pipeline, with today’s count, 7-day average, a sparkline, and a vs-avg percentage. Useful for spotting a pipeline that suddenly stopped ingesting or a backfill spike. Tap status semantics: no_records (script ran cleanly but returned zero records — legitimate for polling/incremental taps on a quiet day) is treated as healthy, not as a failure. It does not count in the Failures tile and does not appear in the Failures pane. See Taps → Run Status Outcomes.

Agent Monitor

The Agent Monitor lives on the MCP tab and shows a live view of every AI agent currently connected to the platform’s MCP server, along with a streaming log of the tool calls each agent is making. Both panes (Connections and Activity Log) are sized to fit the viewport on initial load and reflow when you resize the window — the visualization stays roughly the upper third and the log fills the rest, with its own internal scroll. A pop-out icon next to the title opens the same view in a separate browser window so you can park it on a second monitor while you work elsewhere in the UI. The visualization pane draws one icon per active MCP session on the right of the MCP server icon, connected by a line that pulses whenever a call is in flight. Idle sessions fade out automatically once they disconnect. Each agent label uses the most descriptive identifier available, in this order:
  1. The MCP clientInfo.name supplied during the client’s handshake (e.g. claude-ai, claude-code, cursor)
  2. The tenant name (multi-tenant deployments)
  3. The API-key name from the api-keys secret (single-tenant deployments with named keys)
  4. The API-key prefix
  5. The session short-id (last-resort fallback)
The activity log below the visualization lists every tool call as it happens — timestamped per the platform’s configured date format and timezone, with the calling agent, tool name, argument preview, record count, response size, status, and latency. Clicking a row expands it to reveal the full arguments and response bodies as pretty-printed JSON (capped at 2 KB per blob). A header toolbar lets you copy the full log (with per-row detail) to the clipboard or clear the on-screen history. Activity is held in an in-memory ring buffer on the MCP server (the most recent 200 calls) and served via the UI’s internal /api/v1/mcp/activity proxy. It is not persisted — restarts clear the history. Session liveness is tracked from the MCP connection itself: agents appear when their SSE/HTTP session opens and disappear when it closes. A long-idle agent stays visible as long as the connection is open — useful when an agent is between tool calls and you don’t want it disappearing from the Connections pane every few seconds. The Activity Log re-syncs when you return to the tab. Whether you navigated to another route, switched to a different browser tab, or stepped away for a long enough idle that the browser throttled the polling, the log refreshes to the current server state on return — no stale empty view to confuse the trail.

CLI

Check job status from the terminal:
datris status my_pipeline
datris health