Skip to main content

Deye Battery Pipeline

An automated pipeline that pulls inverter telemetry from the Deye partner API on a cron schedule, stores it on-prem, and serves it to downstream applications through a clean REST surface. Built for fleet-scale battery monitoring at Consulta Technologies.

View on GitHub
  • Pipeline Architecture
  • Scheduled Ingestion
  • On-prem Storage
  • Downstream REST API
deye-pipeline · on-prem
cron*/15 * * * *last2m agonextin 13m

SCHEDULE

triggerCronTriggercron"*/15 * * * *"tzUTCcoalescetruemax_instances1
next run14:15:00 UTC

PIPELINE

Deye Cloud API
HTTPS · bearer auth
Fetcher
idempotent · upsert
On-prem DB
samples · sync_state
App API
REST · OpenAPI
5 devices·60s interval·ON CONFLICT DO NOTHING

RECENT RUNS

  • 14:00:02320 rows1.4s
  • 13:45:01315 rows1.3s
  • 13:30:02320 rows1.5s
  • 13:15:02290 rows1.4s
  • 13:00:01320 rows1.2s
5
Inverters in fleet
15min
Pull cadence
200
Rows / run
0
Duplicates (idempotent)

The problem

The downstream application at Consulta needs reliable, fresh telemetry from every Deye battery inverter in the fleet — state-of-charge, voltage, current, grid voltage, temperature, fault flags. Deye exposes this data through a partner cloud API; the application can’t depend on that API being available at every read, so the data needs to be ingested into an on-prem store the app fully owns.

A scheduled pipeline is the natural fit: pull on a cron, write idempotently, expose to the app over a stable in-network REST contract.

What it does

  • Cron-scheduled ingestion
    APScheduler with a standard cron trigger. Single-instance, coalesced — late ticks are dropped instead of stampeding.
  • Idempotent writes
    Per-device watermark plus ON CONFLICT DO NOTHING on (device_id, ts). Retries and full backfills are both safe.
  • On-prem isolation
    Only the fetcher reaches the public internet. The DB and the app never leave the customer network.
  • Observable runs
    Every pipeline run is logged with status, duration, row count, and stack trace on failure. Dashboard reads it back live.
  • Clean REST contract
    Auto-generated OpenAPI / Swagger UI. The application knows nothing about cron, MQTT, or the upstream provider.
  • Swappable storage
    SQLite for the demo. One connection-string change moves to Postgres, MySQL, or TimescaleDB.

How the schedule is wired

The scheduler module is intentionally tiny — one cron trigger, one job, and the pipeline run logic lives in a separate idempotent function so it can be invoked from the scheduler, from a CLI for backfills, or from a test.

fetcher/scheduler.pypython
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from pipeline import run_once

SCHEDULE = "*/15 * * * *"     # every 15 minutes

scheduler = BlockingScheduler(timezone="UTC")
trigger = CronTrigger.from_crontab(SCHEDULE, timezone="UTC")

scheduler.add_job(
    run_once, trigger,
    id="deye_pipeline",
    max_instances=1,   # never stampede
    coalesce=True,     # drop late ticks
)
scheduler.start()

Storage schema

Three tables, all on-prem. Watermarks make the pipeline re-runnable; the runs table makes it observable.

on-prem sqlite (or postgres in prod)
TablePurposePrimary key
samplesOne row per telemetry sample.(device_id, ts)
sync_statePer-device watermark of the latest ingested sample.device_id
pipeline_runsRun history: status, duration, rows, error.id (autoincrement)

Stack, by layer

Each layer has one job. Storage is swappable with one env var, the fetcher is the only piece that touches the public internet, and the API never knows the upstream provider exists.

stack · four layers · one container
boundarycloud → on-prem
  1. 04layer
    Downstream API
    FastAPI surface the application reads. OpenAPI is the only contract.
    • FastAPI
    • Pydantic
    • OpenAPI 3.1
  2. 03layer
    On-prem storage
    Samples, watermarks, run history. Swappable engine via one env var.
    • SQLite
    • SQLAlchemy
    • Postgres (prod)
  3. 02layer
    Scheduled fetcher
    APScheduler cron, idempotent upserts, single-instance coalescing.
    • APScheduler
    • httpx
    • tenacity
  4. 01layer
    Upstream Deye
    Vendor partner API. Bearer-auth, HTTPS, never reached from the app.
    • Deye Cloud
    • HTTPS
    • token-rotation
fetcher → storage → api·only the fetcher leaves the network

What ships in the demo

  • 01 · upstream

    Mock Deye cloud API

    A small FastAPI service that mimics the real Deye partner endpoints with deterministic telemetry — no token, no rate limit.

  • 02 · pipeline

    Fetcher container

    APScheduler runs the pull on cron; the pipeline module fetches, transforms, and persists with idempotent upserts.

  • 03 · storage

    Shared on-prem volume

    Docker volume backed by SQLite. Both the fetcher and the downstream API mount it; nothing else touches it.

  • 04 · serving

    Downstream REST API

    FastAPI exposes /api/inverters, /api/inverters/{id}, /api/runs. Auto OpenAPI docs at /docs.

  • 05 · operator

    Live dashboard

    Vanilla HTML + Chart.js. Polls the API every 5s, shows fleet status, run history, and per-device charts.

Run it

The demo is a runnable, end-to-end version of this pipeline. Clone and bring it up with one command:

terminalbash
git clone https://github.com/BUDDHABHUSHAN23/deye-battery-platform-demo
cd deye-battery-platform-demo
docker compose up --build

# dashboard      → http://localhost:8080
# downstream API → http://localhost:8000/docs
# mock Deye API  → http://localhost:9000/docs