Skip to main content
Location: examples/market-macro-agent/ A real-time financial data pipeline agent powered by Datris. MacroAgent connects to the Datris MCP server, discovers available tools, fetches live market data from public APIs, creates pipelines, ingests data, and answers market questions grounded in actual numbers. Everything runs in the browser with a live activity feed. 100% Python — FastAPI backend, vanilla JS frontend, no Node.js required.

Architecture

Browser (vanilla JS)

  ├── GET  /stream/state  (SSE — pipeline tiles, activity feed, row counts)

  └── POST /chat          (SSE — tool calls, partial text, answers)

           └── agent/loop.py

                 ├── Anthropic API (Claude)
                 │     ↕ tool_use / end_turn

                 ├── agent/executor.py
                 │     ├── MCP tools → Datris MCP Server (port 3000)
                 │     └── ingest_data → data_fetcher.py (cached server-side)

                 └── agent/scheduler.py (background refresh)

How It Works

  1. On startup — Connects to Datris MCP server via SSE, discovers tools via tools/list, reads the Pipeline Configuration Reference resource
  2. User asks a question — MacroAgent determines which data sources are needed
  3. Data acquisition — Agent fetches live data from public APIs, caches it server-side, and uses MCP tools (create_pipelineupload_data) to ingest
  4. Pipeline management — MacroAgent monitors jobs via get_job_status, queries results via query_postgres — all through MCP tools discovered dynamically
  5. Intelligent acquisition — If the user asks about data the agent doesn’t have, it asks for confirmation before fetching and ingesting new data
  6. Background refresh — Active pipelines are automatically refreshed on a configurable timer

Data Sources

SourceDataAuth
FREDMacro series — yields, VIX, CPI, unemployment, credit spreadsFree API key
yfinanceEquity / ETF OHLCV (SPY, QQQ, GLD, TLT, XLE, IWM)None
CoinGeckoCrypto prices, market cap, volume (BTC, ETH, SOL)None (30 req/min)
SEC EDGAR10-K / 10-Q filings for RAGNone (10 req/sec)

Quick Start

cd examples/market-macro-agent
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
cp .env.example .env
# Edit .env → add ANTHROPIC_API_KEY (and optionally FRED_API_KEY)

# Make sure Datris is running:
cd ../../ && docker compose up -d && cd examples/market-macro-agent

uvicorn main:app --reload --port 8001
Open http://localhost:8001

Demo Queries

  • "What's the current macro picture?" — creates pipelines, fetches live FRED + equity data
  • "Refresh all pipelines" — re-fetches all active data sources
  • "Is crypto confirming the risk-on trade in equities?" — cross-pipeline analysis
  • "Which pipeline is most stale?" — exercises list_pipelines and timestamp comparison

Environment Variables

VariableDefaultDescription
ANTHROPIC_API_KEYRequired
MODELclaude-haiku-4-5-20251001Claude model
PORT8001uvicorn port
MCP_SERVER_URLhttp://localhost:3000/sseDatris MCP server SSE endpoint
FRED_API_KEYGet one free
REFRESH_INTERVAL_MINUTES15Background refresh cycle

Requirements