Processing Flow
Configuration
Add apreprocessor section to the pipeline configuration:
Fields
| Field | Type | Default | Description |
|---|---|---|---|
endpoint | string | Required | URL of the preprocessing service |
async | boolean | false | Synchronous or asynchronous mode |
bearerToken | string | null | Optional Authorization: Bearer token |
timeoutMs | int | 300000 | Request timeout in milliseconds |
Synchronous Mode
In synchronous mode (async: false), the pipeline POSTs the data to the endpoint and waits for the response. The response replaces the pipeline data.
Request Payload
rows is null and rawData contains the raw content.
Expected Response
Return the (optionally modified) data in the same format:error field, processing is aborted:
Asynchronous Mode
In asynchronous mode (async: true), the pipeline POSTs the data and then waits for a callback rather than using the response directly. This is useful for long-running preprocessing tasks.
Flow
- Pipeline POSTs data to the preprocessor endpoint
- Preprocessor returns immediately (e.g.,
200 OK) - Preprocessor processes data in the background
- Preprocessor POSTs the result back to the pipeline’s callback endpoint
- Pipeline resumes with the returned data
Callback Endpoint
The preprocessor sends the result to:pipelineToken must match the token from the original request so the pipeline can correlate the callback with the waiting job.
If the callback is not received within timeoutMs milliseconds, the pipeline aborts with a timeout error.
Example: Preprocessor Service
A complete working example is provided inexamples/preprocessor/app.py. This Python Flask application implements both synchronous and asynchronous preprocessing: