Implement story 1.5: provisioning execution tile download cache validation wifi kill

Add tiles.py, airspace.py, wifi.join_home_wifi, portal /submit route, and rewire
provision.py main loop; all tasks and quality gates pass (56 tests, ruff clean).

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Matt Edholm
2026-04-22 22:54:34 -04:00
parent a6a6a2796d
commit 4aeeefb488
11 changed files with 455 additions and 81 deletions
+4 -4
View File
@@ -1,6 +1,7 @@
import logging
from planemapper.provisioning import ProvisioningError, wifi
from planemapper.provisioning.portal import app
log = logging.getLogger(__name__)
@@ -19,10 +20,9 @@ def main() -> None:
while not provisioned:
try:
wifi.start_ap()
# Portal runs here (Story 1.3+ wires Flask app)
# Provisioning sequence continues in Story 1.5
log.info("Provisioning sequence started")
provisioned = True # placeholder — full sequence wired in 1.5
log.info("Portal starting on 0.0.0.0:80")
app.run(host="0.0.0.0", port=80)
provisioned = True
except ProvisioningError as e:
log.error("Provisioning failed: %s", e)
_reset_to_portal_state()
+38 -1
View File
@@ -1 +1,38 @@
# stub
import logging
import os
import requests
from planemapper.constants import AIRSPACE_PATH
from planemapper.provisioning import ProvisioningError
log = logging.getLogger(__name__)
_EMPTY_GEOJSON = '{"type": "FeatureCollection", "features": []}'
_OPENAIP_URL = "https://api.openaip.net/api/airspaces"
def download(lat: float, lon: float, radius_nm: float) -> None:
api_key = os.environ.get("OPENAIP_API_KEY")
AIRSPACE_PATH.parent.mkdir(parents=True, exist_ok=True)
if not api_key:
log.warning(
"OPENAIP_API_KEY not set — writing empty airspace cache; "
"airspace outlines will not be shown"
)
AIRSPACE_PATH.write_text(_EMPTY_GEOJSON)
return
radius_deg = radius_nm / 60.0
bbox = [lon - radius_deg, lat - radius_deg, lon + radius_deg, lat + radius_deg]
resp = requests.get(
_OPENAIP_URL,
params={"bbox": ",".join(f"{v:.6f}" for v in bbox)},
headers={"x-openaip-api-key": api_key},
timeout=30,
)
if not resp.ok:
raise ProvisioningError(f"OpenAIP API failed: {resp.status_code}")
AIRSPACE_PATH.write_text(resp.text)
log.info("airspace data saved to %s", AIRSPACE_PATH)
+59 -1
View File
@@ -3,7 +3,7 @@ import logging
from flask import Flask, redirect, request, url_for
from werkzeug.wrappers import Response
from planemapper.provisioning import location
from planemapper.provisioning import ProvisioningError, airspace, config, location, tiles, wifi
log = logging.getLogger(__name__)
@@ -101,3 +101,61 @@ def captive_redirect() -> Response:
@app.errorhandler(404)
def catch_all(e: Exception) -> Response:
return redirect(url_for("index"))
def _run_provisioning(
confirmed_lat: float,
confirmed_lon: float,
radius: float,
wifi_ssid: str,
wifi_password: str,
) -> str:
try:
wifi.join_home_wifi(wifi_ssid, wifi_password)
tiles.download_and_composite(confirmed_lat, confirmed_lon, radius)
airspace.download(confirmed_lat, confirmed_lon, radius)
tiles.validate_cache()
config.write(
{
"home_lat": confirmed_lat,
"home_lon": confirmed_lon,
"coverage_radius_nm": int(radius),
"wifi_ssid": wifi_ssid,
"wifi_password": wifi_password,
"provisioned": True,
}
)
wifi.kill_wifi()
return (
"<html><body>"
"<h1>Setup complete. The device will now start displaying radar.</h1>"
"</body></html>"
)
except ProvisioningError as e:
log.error("provisioning failed: %s", e)
return (
f"<html><body><p>Provisioning failed: {e}. <a href='/'>Try again</a></p></body></html>"
)
@app.route("/submit", methods=["POST"])
def submit() -> str:
try:
confirmed_lat = float(request.form["confirmed_lat"])
confirmed_lon = float(request.form["confirmed_lon"])
confirmed_name = request.form.get("confirmed_name", "")
radius = float(request.form.get("radius", "100"))
wifi_ssid = request.form["wifi_ssid"]
wifi_password = request.form["wifi_password"]
except (KeyError, ValueError) as e:
return f"<html><body><p>Invalid form data: {e}. <a href='/'>Try again</a></p></body></html>"
log.info(
"provisioning started for %s (%.4f, %.4f) r=%.0fnm",
confirmed_name,
confirmed_lat,
confirmed_lon,
radius,
)
result = _run_provisioning(confirmed_lat, confirmed_lon, radius, wifi_ssid, wifi_password)
return result
+93 -1
View File
@@ -1 +1,93 @@
# stub
import io
import logging
import math
import requests
from PIL import Image
from planemapper.constants import BACKGROUND_PATH, COLOUR_WHITE, DISPLAY_HEIGHT, DISPLAY_WIDTH
from planemapper.provisioning import ProvisioningError
log = logging.getLogger(__name__)
_TILE_URL = "https://tile.openstreetmap.org/{z}/{x}/{y}.png"
_USER_AGENT = "planemapper/0.1 (https://github.com/football2801/planeMapper)"
def lat_lon_to_tile(lat: float, lon: float, zoom: int) -> tuple[int, int]:
x = int((lon + 180.0) / 360.0 * (1 << zoom))
lat_r = math.radians(lat)
y = int((1.0 - math.log(math.tan(lat_r) + 1.0 / math.cos(lat_r)) / math.pi) / 2.0 * (1 << zoom))
return x, y
def _zoom_for_radius(radius_nm: float) -> int:
if radius_nm > 200:
return 8
if radius_nm > 100:
return 9
if radius_nm > 50:
return 10
return 11
def download_and_composite(lat: float, lon: float, radius_nm: float) -> None:
zoom = _zoom_for_radius(radius_nm)
radius_deg = radius_nm / 60.0
# Tile bounds
x_min, y_max = lat_lon_to_tile(lat - radius_deg, lon - radius_deg, zoom)
x_max, y_min = lat_lon_to_tile(lat + radius_deg, lon + radius_deg, zoom)
canvas_w = (x_max - x_min + 1) * 256
canvas_h = (y_max - y_min + 1) * 256
# Guard: ensure canvas is at least display size
canvas_w = max(canvas_w, DISPLAY_WIDTH)
canvas_h = max(canvas_h, DISPLAY_HEIGHT)
canvas = Image.new("RGB", (canvas_w, canvas_h), COLOUR_WHITE)
tile_count = 0
for tx in range(x_min, x_max + 1):
for ty in range(y_min, y_max + 1):
url = _TILE_URL.format(z=zoom, x=tx, y=ty)
resp = requests.get(url, headers={"User-Agent": _USER_AGENT}, timeout=10)
resp.raise_for_status()
tile_img = Image.open(io.BytesIO(resp.content)).convert("RGB")
px = (tx - x_min) * 256
py = (ty - y_min) * 256
canvas.paste(tile_img, (px, py))
tile_count += 1
# Crop to display size centred on home location
home_tx, home_ty = lat_lon_to_tile(lat, lon, zoom)
home_px = (home_tx - x_min) * 256 + 128
home_py = (home_ty - y_min) * 256 + 128
left = max(0, home_px - DISPLAY_WIDTH // 2)
top = max(0, home_py - DISPLAY_HEIGHT // 2)
left = min(left, canvas_w - DISPLAY_WIDTH)
top = min(top, canvas_h - DISPLAY_HEIGHT)
left = max(0, left)
top = max(0, top)
cropped = canvas.crop((left, top, left + DISPLAY_WIDTH, top + DISPLAY_HEIGHT))
BACKGROUND_PATH.parent.mkdir(parents=True, exist_ok=True)
cropped.save(str(BACKGROUND_PATH))
log.info("composited %d tiles → %s", tile_count, BACKGROUND_PATH)
def validate_cache() -> None:
if not BACKGROUND_PATH.exists():
raise ProvisioningError("background.png not found")
size = BACKGROUND_PATH.stat().st_size
if size == 0:
raise ProvisioningError("background.png is empty")
try:
with Image.open(BACKGROUND_PATH) as img:
img.verify()
except Exception as e:
raise ProvisioningError(f"background.png is not a valid PNG: {e}") from e
if size >= 2 * 1024**3:
raise ProvisioningError("background.png exceeds 2GB limit")
log.info("cache validation passed: background.png %.1f KB", size / 1024)
+13
View File
@@ -52,3 +52,16 @@ def kill_wifi() -> None:
log.error("rfkill failed with return code %d", result.returncode)
raise ProvisioningError(f"rfkill failed: returncode={result.returncode}")
log.info("WiFi radio killed")
def join_home_wifi(ssid: str, password: str) -> None:
result = subprocess.run(
["nmcli", "device", "wifi", "connect", ssid, "password", password],
capture_output=True,
timeout=30,
check=False,
)
if result.returncode != 0:
log.error("nmcli failed (rc=%d): %s", result.returncode, result.stderr.decode())
raise ProvisioningError(f"nmcli failed (rc={result.returncode}): {result.stderr.decode()}")
log.info("joined home WiFi: %s", ssid)