From 6ef58a707e1c06d7f4993c1cca14857768e01af8 Mon Sep 17 00:00:00 2001 From: Steve Cliff Date: Thu, 7 May 2026 20:32:48 +0100 Subject: [PATCH] ws: synthesize job.finished from update watcher so browser stream wakes up --- cmd/server/main.go | 2 +- internal/server/ws/update_watch.go | 35 ++++++++++- internal/server/ws/update_watch_test.go | 77 +++++++++++++++++++++++-- 3 files changed, 108 insertions(+), 6 deletions(-) diff --git a/cmd/server/main.go b/cmd/server/main.go index dcd0d38..b79d201 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -92,7 +92,7 @@ func run() error { notifHub := notification.NewHub(st, aead, cfg.BaseURL) alertEngine := alert.NewEngine(st, notifHub) - updateWatcher := ws.NewUpdateWatcher(st, alertEngine) + updateWatcher := ws.NewUpdateWatcher(st, alertEngine, jobHub) renderer, err := ui.New() if err != nil { diff --git a/internal/server/ws/update_watch.go b/internal/server/ws/update_watch.go index be2fef8..afdc82f 100644 --- a/internal/server/ws/update_watch.go +++ b/internal/server/ws/update_watch.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" "gitea.dcglab.co.uk/steve/restic-manager/internal/store" ) @@ -29,6 +30,7 @@ type AlertRaiser interface { type UpdateWatcher struct { store *store.Store alerts AlertRaiser + jobHub *JobHub // optional — if nil, no fan-out to browser streams mu sync.Mutex entries map[string]*updateEntry // hostID → entry @@ -46,10 +48,11 @@ type updateEntry struct { // NewUpdateWatcher builds an unstarted watcher. Call Run in a goroutine // to start the periodic sweep. -func NewUpdateWatcher(st *store.Store, alerts AlertRaiser) *UpdateWatcher { +func NewUpdateWatcher(st *store.Store, alerts AlertRaiser, jobHub *JobHub) *UpdateWatcher { return &UpdateWatcher{ store: st, alerts: alerts, + jobHub: jobHub, entries: make(map[string]*updateEntry), tickPeriod: 5 * time.Second, } @@ -95,6 +98,7 @@ func (w *UpdateWatcher) OnHello(ctx context.Context, hostID, agentVersion, targe if err := w.store.MarkJobFinished(ctx, jobID, "succeeded", 0, nil, "", now); err != nil { slog.Warn("ws update watcher: mark succeeded", "job_id", jobID, "host_id", hostID, "err", err) } + w.publishJobFinished(jobID, api.JobSucceeded, 0, "", now) if w.alerts != nil { w.alerts.ResolveUpdateFailed(ctx, hostID, now) } @@ -144,8 +148,37 @@ func (w *UpdateWatcher) sweep(ctx context.Context, now time.Time) { if err := w.store.MarkJobFinished(ctx, x.jobID, "failed", -1, nil, errMsg, stamp); err != nil { slog.Warn("ws update watcher: mark failed", "job_id", x.jobID, "host_id", x.hostID, "err", err) } + w.publishJobFinished(x.jobID, api.JobFailed, -1, errMsg, stamp) if w.alerts != nil { w.alerts.RaiseUpdateFailed(ctx, x.hostID, x.jobID, reason, stamp) } } } + +// publishJobFinished pushes a synthetic job.finished envelope into the +// JobHub so any browser still streaming this job sees it terminate. +// The agent itself exits before it can send job.finished (it has to — +// it's about to relaunch into the new binary), so without this fan-out +// the /jobs/{id} page hangs until reload. +// +// Best-effort: if the hub is nil or the envelope can't be marshalled +// we log and move on — the DB-side state is already correct, this is +// purely a UI wake-up. +func (w *UpdateWatcher) publishJobFinished(jobID string, status api.JobStatus, exitCode int, errMsg string, finishedAt time.Time) { + if w.jobHub == nil { + return + } + payload := api.JobFinishedPayload{ + JobID: jobID, + Status: status, + ExitCode: exitCode, + FinishedAt: finishedAt, + Error: errMsg, + } + env, err := api.Marshal(api.MsgJobFinished, "", payload) + if err != nil { + slog.Warn("ws update watcher: marshal synthetic job.finished", "job_id", jobID, "err", err) + return + } + w.jobHub.Broadcast(jobID, env) +} diff --git a/internal/server/ws/update_watch_test.go b/internal/server/ws/update_watch_test.go index 4081501..845b986 100644 --- a/internal/server/ws/update_watch_test.go +++ b/internal/server/ws/update_watch_test.go @@ -8,6 +8,7 @@ import ( "github.com/oklog/ulid/v2" + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" "gitea.dcglab.co.uk/steve/restic-manager/internal/store" ) @@ -50,7 +51,7 @@ func TestUpdateWatcherOnHelloSuccess(t *testing.T) { jobID := seedJob(t, st, hostID) a := &fakeAlerts{} - w := NewUpdateWatcher(st, a) + w := NewUpdateWatcher(st, a, nil) w.Track(jobID, hostID) w.OnHello(context.Background(), hostID, "v2", "v2") @@ -83,7 +84,7 @@ func TestUpdateWatcherTimeout(t *testing.T) { jobID := seedJob(t, st, hostID) a := &fakeAlerts{} - w := NewUpdateWatcher(st, a) + w := NewUpdateWatcher(st, a, nil) w.Track(jobID, hostID) time.Sleep(80 * time.Millisecond) @@ -113,7 +114,7 @@ func TestUpdateWatcherMismatchedVersionNoOp(t *testing.T) { jobID := seedJob(t, st, hostID) a := &fakeAlerts{} - w := NewUpdateWatcher(st, a) + w := NewUpdateWatcher(st, a, nil) w.Track(jobID, hostID) w.OnHello(context.Background(), hostID, "v1", "v2") @@ -140,7 +141,7 @@ func TestUpdateWatcherHelloAfterTimeoutIsNoOp(t *testing.T) { jobID := seedJob(t, st, hostID) a := &fakeAlerts{} - w := NewUpdateWatcher(st, a) + w := NewUpdateWatcher(st, a, nil) w.Track(jobID, hostID) time.Sleep(80 * time.Millisecond) @@ -159,3 +160,71 @@ func TestUpdateWatcherHelloAfterTimeoutIsNoOp(t *testing.T) { t.Fatalf("late hello triggered ResolveUpdateFailed: %v", a.resolved) } } + +func TestUpdateWatcherOnHelloBroadcastsJobFinished(t *testing.T) { + st := openWSTestStore(t) + hostID := ulid.Make().String() + seedHostWS(t, st, hostID) + jobID := seedJob(t, st, hostID) + + hub := NewJobHub() + sub := hub.Register(jobID) + defer sub.unregister() + + w := NewUpdateWatcher(st, &fakeAlerts{}, hub) + w.Track(jobID, hostID) + w.OnHello(context.Background(), hostID, "v2", "v2") + + select { + case env := <-sub.ch: + if env.Type != api.MsgJobFinished { + t.Fatalf("envelope type: got %q want %q", env.Type, api.MsgJobFinished) + } + var p api.JobFinishedPayload + if err := env.UnmarshalPayload(&p); err != nil { + t.Fatalf("unmarshal payload: %v", err) + } + if p.JobID != jobID || p.Status != api.JobSucceeded { + t.Fatalf("payload: got %+v", p) + } + case <-time.After(time.Second): + t.Fatal("expected synthetic job.finished broadcast, got nothing") + } +} + +func TestUpdateWatcherTimeoutBroadcastsJobFinished(t *testing.T) { + prev := updateTimeout + updateTimeout = 50 * time.Millisecond + t.Cleanup(func() { updateTimeout = prev }) + + st := openWSTestStore(t) + hostID := ulid.Make().String() + seedHostWS(t, st, hostID) + jobID := seedJob(t, st, hostID) + + hub := NewJobHub() + sub := hub.Register(jobID) + defer sub.unregister() + + w := NewUpdateWatcher(st, &fakeAlerts{}, hub) + w.Track(jobID, hostID) + + time.Sleep(80 * time.Millisecond) + w.sweep(context.Background(), time.Now()) + + select { + case env := <-sub.ch: + if env.Type != api.MsgJobFinished { + t.Fatalf("envelope type: got %q want %q", env.Type, api.MsgJobFinished) + } + var p api.JobFinishedPayload + if err := env.UnmarshalPayload(&p); err != nil { + t.Fatalf("unmarshal payload: %v", err) + } + if p.JobID != jobID || p.Status != api.JobFailed { + t.Fatalf("payload: got %+v", p) + } + case <-time.After(time.Second): + t.Fatal("expected synthetic job.finished broadcast, got nothing") + } +}