from __future__ import annotations import dataclasses import logging import time import requests from PIL import Image, ImageDraw, ImageFont from planemapper.constants import ( DISPLAY_HEIGHT, DISPLAY_WIDTH, REFRESH_INTERVAL_S, RENDER_WARN_S, ) from planemapper.display import DisplayInterface, WaveshareDisplay from planemapper.fetcher import HttpFetcher from planemapper.models import Aircraft from planemapper.provisioning.config import read as read_config from planemapper.renderer.basemap import load as load_basemap from planemapper.renderer.projection import MapBounds from planemapper.renderer.renderer import Renderer log = logging.getLogger(__name__) def _make_startup_screen() -> Image.Image: image = Image.new("RGB", (DISPLAY_WIDTH, DISPLAY_HEIGHT), color=(255, 255, 255)) draw = ImageDraw.Draw(image) font = ImageFont.load_default() draw.text( (DISPLAY_WIDTH // 2 - 60, DISPLAY_HEIGHT // 2), "planeMapper starting...", fill=(0, 0, 0), font=font, ) return image def _run_one_cycle( renderer: Renderer, fetcher: HttpFetcher, display: DisplayInterface, last_aircraft: list[Aircraft], ) -> list[Aircraft]: t0 = time.monotonic() stale_needed = False try: fresh = fetcher.fetch() except requests.Timeout: log.warning("fetch timeout — using stale data") fresh = [] stale_needed = True else: stale_needed = len(fresh) == 0 and len(last_aircraft) > 0 if stale_needed: aircraft_list = [dataclasses.replace(a, is_stale=True) for a in last_aircraft] else: aircraft_list = fresh t1 = time.monotonic() image = renderer.render(aircraft_list) t2 = time.monotonic() display.show(image) t3 = time.monotonic() total = t3 - t0 log.info( "cycle: fetch=%.1fs render=%.1fs spi=%.1fs total=%.1fs", t1 - t0, t2 - t1, t3 - t2, total, ) if total > RENDER_WARN_S: log.warning("render slow: %.1fs > %ds threshold", total, RENDER_WARN_S) return aircraft_list def main() -> None: logging.basicConfig(level=logging.INFO) cfg = read_config() bounds = MapBounds( home_lat=cfg["home_lat"], home_lon=cfg["home_lon"], radius_nm=cfg["coverage_radius_nm"], ) base_map = load_basemap() fetcher = HttpFetcher() renderer = Renderer(base_map, bounds) display = WaveshareDisplay() startup = _make_startup_screen() display.show(startup) last: list[Aircraft] = [] while True: last = _run_one_cycle(renderer, fetcher, display, last) time.sleep(REFRESH_INTERVAL_S)