Documentation Index
Fetch the complete documentation index at: https://docs.datris.ai/llms.txt
Use this file to discover all available pages before exploring further.
Query job processing status by publisher token, pipeline token, or pipeline name.
Get Status by Publisher Token
GET /api/v1/pipeline/status?publishertoken={token}
Use this to watch every ingestion job a single caller submitted. Tap runs set a publisherToken on every job they spawn, so one query covers:
- Structured taps → the single job that ingested the fetched records.
- Document taps → every per-document job (one per file in a run).
Parameters:
| Parameter | Type | Required | Description |
|---|
publishertoken | query | Yes | Publisher token returned from /tap/run (publisherToken field) or upload_data |
withrollup | query | No | When true, wraps the response in a rollup object that classifies each job — see Rollup Response below. Recommended for agents that need a single boolean to poll on. |
Example:
curl "http://localhost:8080/api/v1/pipeline/status?publishertoken=pub-abc12345-..."
Response: 200 OK — status entries from every job whose publisherToken matches, sorted oldest-first. Each job has its own pipelineToken; group rows by pipelineToken to see one job at a time.
If both publishertoken and pipelinetoken are supplied, publishertoken wins.
Get Status by Pipeline Token
GET /api/v1/pipeline/status?pipelinetoken={token}
Parameters:
| Parameter | Type | Required | Description |
|---|
pipelinetoken | query | Yes* | Pipeline token from upload response |
withrollup | query | No | When true, wraps the response in a rollup object — see Rollup Response. |
Example:
curl "http://localhost:8080/api/v1/pipeline/status?pipelinetoken=pt-abc12345-..."
Response: 200 OK - an array of status entries, one per processing stage:
[
{
"id": 1,
"dateTime": "2026-03-15T10:00:00Z",
"pipeline": "sales_data",
"processName": "StreamNotifier",
"publisherToken": null,
"pipelineToken": "pt-abc12345-...",
"filename": "sales_data",
"state": "begin",
"code": "begin",
"description": "Process started",
"epoch": 1710500400000
},
{
"id": 2,
"dateTime": "2026-03-15T10:00:03Z",
"pipeline": "sales_data",
"processName": "PostgresLoader",
"publisherToken": null,
"pipelineToken": "pt-abc12345-...",
"filename": "sales_data",
"state": "end",
"code": "end",
"description": "Process completed",
"epoch": 1710500403000
}
]
Get Status by Pipeline Name
GET /api/v1/pipeline/status?pipelinename={name}&page={page}
Parameters:
| Parameter | Type | Required | Description |
|---|
pipelinename | query | Yes* | Pipeline name |
page | query | No | Page number (default: 1) |
Example:
curl "http://localhost:8080/api/v1/pipeline/status?pipelinename=sales_data&page=1"
Response: 200 OK - an array of job summaries:
[
{
"createdAtTimestamp": "2026-03-15T10:00:00Z",
"createdAt": 1710500400000,
"updatedAt": 1710500403000,
"pipeline": "sales_data",
"pipelineToken": "pt-abc12345-...",
"process": "PostgresLoader",
"startTime": "2026-03-15T10:00:00Z",
"endTime": "2026-03-15T10:00:03Z",
"totalTime": "3s",
"status": "end"
}
]
Rollup Response
Add &withrollup=true to a publishertoken or pipelinetoken query and the response is wrapped:
{
"rollup": {
"allDone": false,
"status": "processing",
"jobs": [
{
"pipelineToken": "pt-abc12345-...",
"pipeline": "sales_data",
"filename": "sales_data.csv",
"status": "success",
"startedAt": "2026-04-29 14:32:11.123",
"lastEventAt": "2026-04-29 14:32:47.901",
"elapsed": "36.78 sec",
"lastError": null
},
{
"pipelineToken": "pt-def67890-...",
"pipeline": "sales_data",
"filename": "orders.csv",
"status": "error",
"startedAt": "2026-04-29 14:32:11.140",
"lastEventAt": "2026-04-29 14:33:02.512",
"elapsed": "51.37 sec",
"lastError": {
"processName": "PostgresLoader",
"description": "duplicate key value violates unique constraint \"customers_pkey\""
}
}
]
},
"events": [ /* the same raw status entries as without withrollup */ ]
}
rollup.allDone flips to true when every job has reached a terminal state. rollup.status is the aggregate outcome — success, warning, error, or processing. Per-job status values are success, warning, error, processing, or timed_out (no terminal event within 8 hours of the first event). When a job’s status is error, lastError carries the failing process and its message.
Polling pattern (agents and clients):
- Call with
withrollup=true.
- Re-call every few seconds until
rollup.allDone is true.
- Read
rollup.status for the outcome and rollup.jobs[].lastError for any failures.
Without withrollup=true, all three queries return their original array shapes (no wrapper) — backward compatible.
*Use one of publishertoken, pipelinetoken, or pipelinename. When multiple are supplied, publishertoken wins over pipelinetoken wins over pipelinename.
Status Fields
Each status entry (PipelineStatus) contains:
| Field | Description |
|---|
id | Entry index (internal) |
dateTime | Human-readable timestamp |
pipeline | Pipeline name |
processName | Processing stage name (see below) |
publisherToken | Publisher identifier (if provided on upload) |
pipelineToken | Pipeline job token |
filename | Source filename |
state | Stage state: begin, processing, end, or error |
code | Same as state |
description | Detail message |
epoch | Unix epoch milliseconds |
State Values
| State | Description |
|---|
begin | Processing stage started |
processing | In progress with detail message |
end | Processing stage completed |
error | Processing stage failed |
Process Names
| Process | Description |
|---|
FileNotifier | File intake from MinIO bucket |
StreamNotifier | Direct upload intake |
DataQuality | Data quality validation |
Transformation | Data transformation |
JobRunner | Destination orchestration |
PostgresLoader | PostgreSQL loading |
MongoDBLoader | MongoDB loading |
SparkObjectStoreLoader | Object store writing |
KafkaLoader | Kafka producing |
ActiveMQLoader | ActiveMQ queue writing |
RestEndpointRunner | REST endpoint posting |