Deye Battery Pipeline
An automated pipeline that pulls inverter telemetry from the Deye partner API on a cron schedule, stores it on-prem, and serves it to downstream applications through a clean REST surface. Built for fleet-scale battery monitoring at Consulta Technologies.
View on GitHubSCHEDULE
PIPELINE
RECENT RUNS
- 14:00:02320 rows1.4s
- 13:45:01315 rows1.3s
- 13:30:02320 rows1.5s
- 13:15:02290 rows1.4s
- 13:00:01320 rows1.2s
The problem
The downstream application at Consulta needs reliable, fresh telemetry from every Deye battery inverter in the fleet — state-of-charge, voltage, current, grid voltage, temperature, fault flags. Deye exposes this data through a partner cloud API; the application can’t depend on that API being available at every read, so the data needs to be ingested into an on-prem store the app fully owns.
A scheduled pipeline is the natural fit: pull on a cron, write idempotently, expose to the app over a stable in-network REST contract.
What it does
- Cron-scheduled ingestionAPScheduler with a standard cron trigger. Single-instance, coalesced — late ticks are dropped instead of stampeding.
- Idempotent writesPer-device watermark plus ON CONFLICT DO NOTHING on (device_id, ts). Retries and full backfills are both safe.
- On-prem isolationOnly the fetcher reaches the public internet. The DB and the app never leave the customer network.
- Observable runsEvery pipeline run is logged with status, duration, row count, and stack trace on failure. Dashboard reads it back live.
- Clean REST contractAuto-generated OpenAPI / Swagger UI. The application knows nothing about cron, MQTT, or the upstream provider.
- Swappable storageSQLite for the demo. One connection-string change moves to Postgres, MySQL, or TimescaleDB.
How the schedule is wired
The scheduler module is intentionally tiny — one cron trigger, one job, and the pipeline run logic lives in a separate idempotent function so it can be invoked from the scheduler, from a CLI for backfills, or from a test.
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from pipeline import run_once
SCHEDULE = "*/15 * * * *" # every 15 minutes
scheduler = BlockingScheduler(timezone="UTC")
trigger = CronTrigger.from_crontab(SCHEDULE, timezone="UTC")
scheduler.add_job(
run_once, trigger,
id="deye_pipeline",
max_instances=1, # never stampede
coalesce=True, # drop late ticks
)
scheduler.start()Storage schema
Three tables, all on-prem. Watermarks make the pipeline re-runnable; the runs table makes it observable.
| Table | Purpose | Primary key |
|---|---|---|
| samples | One row per telemetry sample. | (device_id, ts) |
| sync_state | Per-device watermark of the latest ingested sample. | device_id |
| pipeline_runs | Run history: status, duration, rows, error. | id (autoincrement) |
Stack, by layer
Each layer has one job. Storage is swappable with one env var, the fetcher is the only piece that touches the public internet, and the API never knows the upstream provider exists.
- 04layerDownstream APIFastAPI surface the application reads. OpenAPI is the only contract.
- FastAPI
- Pydantic
- OpenAPI 3.1
- 03layerOn-prem storageSamples, watermarks, run history. Swappable engine via one env var.
- SQLite
- SQLAlchemy
- Postgres (prod)
- 02layerScheduled fetcherAPScheduler cron, idempotent upserts, single-instance coalescing.
- APScheduler
- httpx
- tenacity
- 01layerUpstream DeyeVendor partner API. Bearer-auth, HTTPS, never reached from the app.
- Deye Cloud
- HTTPS
- token-rotation
What ships in the demo
- 01 · upstream
Mock Deye cloud API
A small FastAPI service that mimics the real Deye partner endpoints with deterministic telemetry — no token, no rate limit.
- 02 · pipeline
Fetcher container
APScheduler runs the pull on cron; the pipeline module fetches, transforms, and persists with idempotent upserts.
- 03 · storage
Shared on-prem volume
Docker volume backed by SQLite. Both the fetcher and the downstream API mount it; nothing else touches it.
- 04 · serving
Downstream REST API
FastAPI exposes /api/inverters, /api/inverters/{id}, /api/runs. Auto OpenAPI docs at /docs.
- 05 · operator
Live dashboard
Vanilla HTML + Chart.js. Polls the API every 5s, shows fleet status, run history, and per-device charts.
Run it
The demo is a runnable, end-to-end version of this pipeline. Clone and bring it up with one command:
git clone https://github.com/BUDDHABHUSHAN23/deye-battery-platform-demo
cd deye-battery-platform-demo
docker compose up --build
# dashboard → http://localhost:8080
# downstream API → http://localhost:8000/docs
# mock Deye API → http://localhost:9000/docs