Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.datris.ai/llms.txt

Use this file to discover all available pages before exploring further.

Query job processing status by publisher token, pipeline token, or pipeline name.

Get Status by Publisher Token

GET /api/v1/pipeline/status?publishertoken={token}
Use this to watch every ingestion job a single caller submitted. Tap runs set a publisherToken on every job they spawn, so one query covers:
  • Structured taps → the single job that ingested the fetched records.
  • Document taps → every per-document job (one per file in a run).
Parameters:
ParameterTypeRequiredDescription
publishertokenqueryYesPublisher token returned from /tap/run (publisherToken field) or upload_data
withrollupqueryNoWhen true, wraps the response in a rollup object that classifies each job — see Rollup Response below. Recommended for agents that need a single boolean to poll on.
Example:
curl "http://localhost:8080/api/v1/pipeline/status?publishertoken=pub-abc12345-..."
Response: 200 OK — status entries from every job whose publisherToken matches, sorted oldest-first. Each job has its own pipelineToken; group rows by pipelineToken to see one job at a time. If both publishertoken and pipelinetoken are supplied, publishertoken wins.

Get Status by Pipeline Token

GET /api/v1/pipeline/status?pipelinetoken={token}
Parameters:
ParameterTypeRequiredDescription
pipelinetokenqueryYes*Pipeline token from upload response
withrollupqueryNoWhen true, wraps the response in a rollup object — see Rollup Response.
Example:
curl "http://localhost:8080/api/v1/pipeline/status?pipelinetoken=pt-abc12345-..."
Response: 200 OK - an array of status entries, one per processing stage:
[
  {
    "id": 1,
    "dateTime": "2026-03-15T10:00:00Z",
    "pipeline": "sales_data",
    "processName": "StreamNotifier",
    "publisherToken": null,
    "pipelineToken": "pt-abc12345-...",
    "filename": "sales_data",
    "state": "begin",
    "code": "begin",
    "description": "Process started",
    "epoch": 1710500400000
  },
  {
    "id": 2,
    "dateTime": "2026-03-15T10:00:03Z",
    "pipeline": "sales_data",
    "processName": "PostgresLoader",
    "publisherToken": null,
    "pipelineToken": "pt-abc12345-...",
    "filename": "sales_data",
    "state": "end",
    "code": "end",
    "description": "Process completed",
    "epoch": 1710500403000
  }
]

Get Status by Pipeline Name

GET /api/v1/pipeline/status?pipelinename={name}&page={page}
Parameters:
ParameterTypeRequiredDescription
pipelinenamequeryYes*Pipeline name
pagequeryNoPage number (default: 1)
Example:
curl "http://localhost:8080/api/v1/pipeline/status?pipelinename=sales_data&page=1"
Response: 200 OK - an array of job summaries:
[
  {
    "createdAtTimestamp": "2026-03-15T10:00:00Z",
    "createdAt": 1710500400000,
    "updatedAt": 1710500403000,
    "pipeline": "sales_data",
    "pipelineToken": "pt-abc12345-...",
    "process": "PostgresLoader",
    "startTime": "2026-03-15T10:00:00Z",
    "endTime": "2026-03-15T10:00:03Z",
    "totalTime": "3s",
    "status": "end"
  }
]

Rollup Response

Add &withrollup=true to a publishertoken or pipelinetoken query and the response is wrapped:
{
  "rollup": {
    "allDone": false,
    "status": "processing",
    "jobs": [
      {
        "pipelineToken": "pt-abc12345-...",
        "pipeline": "sales_data",
        "filename": "sales_data.csv",
        "status": "success",
        "startedAt": "2026-04-29 14:32:11.123",
        "lastEventAt": "2026-04-29 14:32:47.901",
        "elapsed": "36.78 sec",
        "lastError": null
      },
      {
        "pipelineToken": "pt-def67890-...",
        "pipeline": "sales_data",
        "filename": "orders.csv",
        "status": "error",
        "startedAt": "2026-04-29 14:32:11.140",
        "lastEventAt": "2026-04-29 14:33:02.512",
        "elapsed": "51.37 sec",
        "lastError": {
          "processName": "PostgresLoader",
          "description": "duplicate key value violates unique constraint \"customers_pkey\""
        }
      }
    ]
  },
  "events": [ /* the same raw status entries as without withrollup */ ]
}
rollup.allDone flips to true when every job has reached a terminal state. rollup.status is the aggregate outcome — success, warning, error, or processing. Per-job status values are success, warning, error, processing, or timed_out (no terminal event within 8 hours of the first event). When a job’s status is error, lastError carries the failing process and its message. Polling pattern (agents and clients):
  1. Call with withrollup=true.
  2. Re-call every few seconds until rollup.allDone is true.
  3. Read rollup.status for the outcome and rollup.jobs[].lastError for any failures.
Without withrollup=true, all three queries return their original array shapes (no wrapper) — backward compatible.
*Use one of publishertoken, pipelinetoken, or pipelinename. When multiple are supplied, publishertoken wins over pipelinetoken wins over pipelinename.

Status Fields

Each status entry (PipelineStatus) contains:
FieldDescription
idEntry index (internal)
dateTimeHuman-readable timestamp
pipelinePipeline name
processNameProcessing stage name (see below)
publisherTokenPublisher identifier (if provided on upload)
pipelineTokenPipeline job token
filenameSource filename
stateStage state: begin, processing, end, or error
codeSame as state
descriptionDetail message
epochUnix epoch milliseconds

State Values

StateDescription
beginProcessing stage started
processingIn progress with detail message
endProcessing stage completed
errorProcessing stage failed

Process Names

ProcessDescription
FileNotifierFile intake from MinIO bucket
StreamNotifierDirect upload intake
DataQualityData quality validation
TransformationData transformation
JobRunnerDestination orchestration
PostgresLoaderPostgreSQL loading
MongoDBLoaderMongoDB loading
SparkObjectStoreLoaderObject store writing
KafkaLoaderKafka producing
ActiveMQLoaderActiveMQ queue writing
RestEndpointRunnerREST endpoint posting