feat(home): live updates via Mercure — server pushes device state to the PWA
CI / test (push) Has been cancelled
CI / test (push) Has been cancelled
Subscribe per-device with a Symfony Mercure hub: server publishes a fresh device payload after every poll (200/304/204), every PATCH, and every lock/unlock. The frontend opens one EventSource per device topic and splats inbound JSON straight into the devices store — same shape as GET /api/devices, so no envelope handling. Topic: https://pictureframe.edholm.me/devices/{id} Stack mirrors aqua-iq: - symfony/mercure-bundle + config/packages/mercure.yaml - App\Service\MercurePublisher (errors swallowed + logged; a flaky hub must not break a poll response) - App\Service\DeviceSerializer extracted as the single source of truth for the wire shape (REST + Mercure share it) - Frontend useDeviceMercure() composable: opens/closes EventSources to match the device list reactively, reconnects on hub-side closes - SpaController exposes MERCURE_PUBLIC_URL via window.__PF_MERCURE_URL__ Production compose adds a dunglas/mercure container with Traefik labels for pictureframe.edholm.me/.well-known/mercure (handled separately on the host since the file isn't in this repo). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,113 @@
|
||||
import { onUnmounted, watch } from 'vue'
|
||||
import { storeToRefs } from 'pinia'
|
||||
import { useDevicesStore } from '@/stores/devices'
|
||||
import type { Device } from '@/types'
|
||||
|
||||
const TOPIC_PREFIX = 'https://pictureframe.edholm.me/devices/'
|
||||
|
||||
declare global {
|
||||
interface Window {
|
||||
__PF_MERCURE_URL__?: string
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Open one EventSource per device topic and merge inbound updates into the
|
||||
* devices store. Reconnect on close (Mercure usually proxies through Traefik
|
||||
* which can drop idle connections). Cleans up on unmount.
|
||||
*
|
||||
* The subscription set is reactive — when the device list changes (a new
|
||||
* frame is provisioned, or one is removed), connections are added/dropped
|
||||
* to match. The server publishes the same JSON shape that GET /api/devices
|
||||
* returns, so updates are a straight splat into the existing store entry.
|
||||
*/
|
||||
export function useDeviceMercure() {
|
||||
const devicesStore = useDevicesStore()
|
||||
const { devices } = storeToRefs(devicesStore)
|
||||
|
||||
// deviceId → EventSource. Tracked outside Vue's reactivity to avoid
|
||||
// accidentally proxying the native EventSource and breaking it.
|
||||
const sources = new Map<number, EventSource>()
|
||||
// deviceId → reconnect timer handle.
|
||||
const reconnectTimers = new Map<number, number>()
|
||||
|
||||
const baseUrl = window.__PF_MERCURE_URL__
|
||||
if (!baseUrl) {
|
||||
// No URL configured (dev without a hub, or SSR-render fallback) — quietly
|
||||
// no-op rather than throwing. Polling-on-visibility-change is still wired
|
||||
// up in HomeView, so the UI keeps working.
|
||||
return { connectedCount: () => 0 }
|
||||
}
|
||||
|
||||
function open(deviceId: number) {
|
||||
if (sources.has(deviceId)) return
|
||||
try {
|
||||
const url = new URL(baseUrl!)
|
||||
url.searchParams.append('topic', TOPIC_PREFIX + deviceId)
|
||||
const es = new EventSource(url.toString(), { withCredentials: true })
|
||||
|
||||
es.onmessage = (event) => {
|
||||
try {
|
||||
const updated = JSON.parse(event.data) as Device
|
||||
const idx = devices.value.findIndex(d => d.id === updated.id)
|
||||
if (idx !== -1) {
|
||||
// Splice replacement so Vue's reactivity tracks the swap.
|
||||
devices.value[idx] = updated
|
||||
}
|
||||
} catch (e) {
|
||||
console.warn('[mercure] parse error', e)
|
||||
}
|
||||
}
|
||||
|
||||
es.onerror = () => {
|
||||
// Mercure / Traefik will sometimes close idle connections; reopen
|
||||
// after a short delay rather than spinning. CLOSED is the only
|
||||
// terminal state that needs a manual reconnect.
|
||||
if (es.readyState === EventSource.CLOSED) {
|
||||
close(deviceId)
|
||||
const handle = window.setTimeout(() => {
|
||||
reconnectTimers.delete(deviceId)
|
||||
open(deviceId)
|
||||
}, 5000)
|
||||
reconnectTimers.set(deviceId, handle)
|
||||
}
|
||||
}
|
||||
|
||||
sources.set(deviceId, es)
|
||||
} catch (e) {
|
||||
console.warn('[mercure] open failed for device ' + deviceId, e)
|
||||
}
|
||||
}
|
||||
|
||||
function close(deviceId: number) {
|
||||
const es = sources.get(deviceId)
|
||||
if (es) {
|
||||
es.close()
|
||||
sources.delete(deviceId)
|
||||
}
|
||||
const timer = reconnectTimers.get(deviceId)
|
||||
if (timer !== undefined) {
|
||||
clearTimeout(timer)
|
||||
reconnectTimers.delete(deviceId)
|
||||
}
|
||||
}
|
||||
|
||||
// Sync subscriptions to the current device list.
|
||||
watch(
|
||||
devices,
|
||||
(list) => {
|
||||
const wantIds = new Set(list.map(d => d.id))
|
||||
// Open new ones.
|
||||
for (const id of wantIds) if (!sources.has(id)) open(id)
|
||||
// Close stale ones.
|
||||
for (const id of [...sources.keys()]) if (!wantIds.has(id)) close(id)
|
||||
},
|
||||
{ immediate: true, deep: false },
|
||||
)
|
||||
|
||||
onUnmounted(() => {
|
||||
for (const id of [...sources.keys()]) close(id)
|
||||
})
|
||||
|
||||
return { connectedCount: () => sources.size }
|
||||
}
|
||||
Reference in New Issue
Block a user