import { onUnmounted, watch } from 'vue' import { storeToRefs } from 'pinia' import { useDevicesStore } from '@/stores/devices' import type { Device } from '@/types' const TOPIC_PREFIX = 'https://wevisto.com/devices/' declare global { interface Window { __PF_MERCURE_URL__?: string } } /** * Open one EventSource per device topic and merge inbound updates into the * devices store. Reconnect on close (Mercure usually proxies through Traefik * which can drop idle connections). Cleans up on unmount. * * The subscription set is reactive — when the device list changes (a new * frame is provisioned, or one is removed), connections are added/dropped * to match. The server publishes the same JSON shape that GET /api/devices * returns, so updates are a straight splat into the existing store entry. */ export function useDeviceMercure() { const devicesStore = useDevicesStore() const { devices } = storeToRefs(devicesStore) // deviceId → EventSource. Tracked outside Vue's reactivity to avoid // accidentally proxying the native EventSource and breaking it. const sources = new Map() // deviceId → reconnect timer handle. const reconnectTimers = new Map() const baseUrl = window.__PF_MERCURE_URL__ if (!baseUrl) { // No URL configured (dev without a hub, or SSR-render fallback) — quietly // no-op rather than throwing. Polling-on-visibility-change is still wired // up in HomeView, so the UI keeps working. return { connectedCount: () => 0 } } function open(deviceId: number) { if (sources.has(deviceId)) return try { const url = new URL(baseUrl!) url.searchParams.append('topic', TOPIC_PREFIX + deviceId) const es = new EventSource(url.toString(), { withCredentials: true }) es.onmessage = (event) => { try { const payload = JSON.parse(event.data) as Device | { id: number; deleted: true } // Deletion sentinel: server sends {id, deleted: true} when the // owner removed the frame. Splice out + close our subscription. if ('deleted' in payload && payload.deleted === true) { devices.value = devices.value.filter(d => d.id !== payload.id) close(payload.id) return } const updated = payload as Device const idx = devices.value.findIndex(d => d.id === updated.id) if (idx !== -1) { // Splice replacement so Vue's reactivity tracks the swap. devices.value[idx] = updated } } catch (e) { console.warn('[mercure] parse error', e) } } es.onerror = () => { // Mercure / Traefik will sometimes close idle connections; reopen // after a short delay rather than spinning. CLOSED is the only // terminal state that needs a manual reconnect. if (es.readyState === EventSource.CLOSED) { close(deviceId) const handle = window.setTimeout(() => { reconnectTimers.delete(deviceId) open(deviceId) }, 5000) reconnectTimers.set(deviceId, handle) } } sources.set(deviceId, es) } catch (e) { console.warn('[mercure] open failed for device ' + deviceId, e) } } function close(deviceId: number) { const es = sources.get(deviceId) if (es) { es.close() sources.delete(deviceId) } const timer = reconnectTimers.get(deviceId) if (timer !== undefined) { clearTimeout(timer) reconnectTimers.delete(deviceId) } } // Sync subscriptions to the current device list. watch( devices, (list) => { const wantIds = new Set(list.map(d => d.id)) // Open new ones. for (const id of wantIds) if (!sources.has(id)) open(id) // Close stale ones. for (const id of [...sources.keys()]) if (!wantIds.has(id)) close(id) }, { immediate: true, deep: false }, ) onUnmounted(() => { for (const id of [...sources.keys()]) close(id) }) return { connectedCount: () => sources.size } }