ws: synthesize job.finished from update watcher so browser stream wakes up
This commit is contained in:
+1
-1
@@ -92,7 +92,7 @@ func run() error {
|
|||||||
|
|
||||||
notifHub := notification.NewHub(st, aead, cfg.BaseURL)
|
notifHub := notification.NewHub(st, aead, cfg.BaseURL)
|
||||||
alertEngine := alert.NewEngine(st, notifHub)
|
alertEngine := alert.NewEngine(st, notifHub)
|
||||||
updateWatcher := ws.NewUpdateWatcher(st, alertEngine)
|
updateWatcher := ws.NewUpdateWatcher(st, alertEngine, jobHub)
|
||||||
|
|
||||||
renderer, err := ui.New()
|
renderer, err := ui.New()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -29,6 +30,7 @@ type AlertRaiser interface {
|
|||||||
type UpdateWatcher struct {
|
type UpdateWatcher struct {
|
||||||
store *store.Store
|
store *store.Store
|
||||||
alerts AlertRaiser
|
alerts AlertRaiser
|
||||||
|
jobHub *JobHub // optional — if nil, no fan-out to browser streams
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
entries map[string]*updateEntry // hostID → entry
|
entries map[string]*updateEntry // hostID → entry
|
||||||
@@ -46,10 +48,11 @@ type updateEntry struct {
|
|||||||
|
|
||||||
// NewUpdateWatcher builds an unstarted watcher. Call Run in a goroutine
|
// NewUpdateWatcher builds an unstarted watcher. Call Run in a goroutine
|
||||||
// to start the periodic sweep.
|
// to start the periodic sweep.
|
||||||
func NewUpdateWatcher(st *store.Store, alerts AlertRaiser) *UpdateWatcher {
|
func NewUpdateWatcher(st *store.Store, alerts AlertRaiser, jobHub *JobHub) *UpdateWatcher {
|
||||||
return &UpdateWatcher{
|
return &UpdateWatcher{
|
||||||
store: st,
|
store: st,
|
||||||
alerts: alerts,
|
alerts: alerts,
|
||||||
|
jobHub: jobHub,
|
||||||
entries: make(map[string]*updateEntry),
|
entries: make(map[string]*updateEntry),
|
||||||
tickPeriod: 5 * time.Second,
|
tickPeriod: 5 * time.Second,
|
||||||
}
|
}
|
||||||
@@ -95,6 +98,7 @@ func (w *UpdateWatcher) OnHello(ctx context.Context, hostID, agentVersion, targe
|
|||||||
if err := w.store.MarkJobFinished(ctx, jobID, "succeeded", 0, nil, "", now); err != nil {
|
if err := w.store.MarkJobFinished(ctx, jobID, "succeeded", 0, nil, "", now); err != nil {
|
||||||
slog.Warn("ws update watcher: mark succeeded", "job_id", jobID, "host_id", hostID, "err", err)
|
slog.Warn("ws update watcher: mark succeeded", "job_id", jobID, "host_id", hostID, "err", err)
|
||||||
}
|
}
|
||||||
|
w.publishJobFinished(jobID, api.JobSucceeded, 0, "", now)
|
||||||
if w.alerts != nil {
|
if w.alerts != nil {
|
||||||
w.alerts.ResolveUpdateFailed(ctx, hostID, now)
|
w.alerts.ResolveUpdateFailed(ctx, hostID, now)
|
||||||
}
|
}
|
||||||
@@ -144,8 +148,37 @@ func (w *UpdateWatcher) sweep(ctx context.Context, now time.Time) {
|
|||||||
if err := w.store.MarkJobFinished(ctx, x.jobID, "failed", -1, nil, errMsg, stamp); err != nil {
|
if err := w.store.MarkJobFinished(ctx, x.jobID, "failed", -1, nil, errMsg, stamp); err != nil {
|
||||||
slog.Warn("ws update watcher: mark failed", "job_id", x.jobID, "host_id", x.hostID, "err", err)
|
slog.Warn("ws update watcher: mark failed", "job_id", x.jobID, "host_id", x.hostID, "err", err)
|
||||||
}
|
}
|
||||||
|
w.publishJobFinished(x.jobID, api.JobFailed, -1, errMsg, stamp)
|
||||||
if w.alerts != nil {
|
if w.alerts != nil {
|
||||||
w.alerts.RaiseUpdateFailed(ctx, x.hostID, x.jobID, reason, stamp)
|
w.alerts.RaiseUpdateFailed(ctx, x.hostID, x.jobID, reason, stamp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// publishJobFinished pushes a synthetic job.finished envelope into the
|
||||||
|
// JobHub so any browser still streaming this job sees it terminate.
|
||||||
|
// The agent itself exits before it can send job.finished (it has to —
|
||||||
|
// it's about to relaunch into the new binary), so without this fan-out
|
||||||
|
// the /jobs/{id} page hangs until reload.
|
||||||
|
//
|
||||||
|
// Best-effort: if the hub is nil or the envelope can't be marshalled
|
||||||
|
// we log and move on — the DB-side state is already correct, this is
|
||||||
|
// purely a UI wake-up.
|
||||||
|
func (w *UpdateWatcher) publishJobFinished(jobID string, status api.JobStatus, exitCode int, errMsg string, finishedAt time.Time) {
|
||||||
|
if w.jobHub == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
payload := api.JobFinishedPayload{
|
||||||
|
JobID: jobID,
|
||||||
|
Status: status,
|
||||||
|
ExitCode: exitCode,
|
||||||
|
FinishedAt: finishedAt,
|
||||||
|
Error: errMsg,
|
||||||
|
}
|
||||||
|
env, err := api.Marshal(api.MsgJobFinished, "", payload)
|
||||||
|
if err != nil {
|
||||||
|
slog.Warn("ws update watcher: marshal synthetic job.finished", "job_id", jobID, "err", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
w.jobHub.Broadcast(jobID, env)
|
||||||
|
}
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
|
|
||||||
"github.com/oklog/ulid/v2"
|
"github.com/oklog/ulid/v2"
|
||||||
|
|
||||||
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -50,7 +51,7 @@ func TestUpdateWatcherOnHelloSuccess(t *testing.T) {
|
|||||||
jobID := seedJob(t, st, hostID)
|
jobID := seedJob(t, st, hostID)
|
||||||
|
|
||||||
a := &fakeAlerts{}
|
a := &fakeAlerts{}
|
||||||
w := NewUpdateWatcher(st, a)
|
w := NewUpdateWatcher(st, a, nil)
|
||||||
w.Track(jobID, hostID)
|
w.Track(jobID, hostID)
|
||||||
|
|
||||||
w.OnHello(context.Background(), hostID, "v2", "v2")
|
w.OnHello(context.Background(), hostID, "v2", "v2")
|
||||||
@@ -83,7 +84,7 @@ func TestUpdateWatcherTimeout(t *testing.T) {
|
|||||||
jobID := seedJob(t, st, hostID)
|
jobID := seedJob(t, st, hostID)
|
||||||
|
|
||||||
a := &fakeAlerts{}
|
a := &fakeAlerts{}
|
||||||
w := NewUpdateWatcher(st, a)
|
w := NewUpdateWatcher(st, a, nil)
|
||||||
w.Track(jobID, hostID)
|
w.Track(jobID, hostID)
|
||||||
|
|
||||||
time.Sleep(80 * time.Millisecond)
|
time.Sleep(80 * time.Millisecond)
|
||||||
@@ -113,7 +114,7 @@ func TestUpdateWatcherMismatchedVersionNoOp(t *testing.T) {
|
|||||||
jobID := seedJob(t, st, hostID)
|
jobID := seedJob(t, st, hostID)
|
||||||
|
|
||||||
a := &fakeAlerts{}
|
a := &fakeAlerts{}
|
||||||
w := NewUpdateWatcher(st, a)
|
w := NewUpdateWatcher(st, a, nil)
|
||||||
w.Track(jobID, hostID)
|
w.Track(jobID, hostID)
|
||||||
|
|
||||||
w.OnHello(context.Background(), hostID, "v1", "v2")
|
w.OnHello(context.Background(), hostID, "v1", "v2")
|
||||||
@@ -140,7 +141,7 @@ func TestUpdateWatcherHelloAfterTimeoutIsNoOp(t *testing.T) {
|
|||||||
jobID := seedJob(t, st, hostID)
|
jobID := seedJob(t, st, hostID)
|
||||||
|
|
||||||
a := &fakeAlerts{}
|
a := &fakeAlerts{}
|
||||||
w := NewUpdateWatcher(st, a)
|
w := NewUpdateWatcher(st, a, nil)
|
||||||
w.Track(jobID, hostID)
|
w.Track(jobID, hostID)
|
||||||
|
|
||||||
time.Sleep(80 * time.Millisecond)
|
time.Sleep(80 * time.Millisecond)
|
||||||
@@ -159,3 +160,71 @@ func TestUpdateWatcherHelloAfterTimeoutIsNoOp(t *testing.T) {
|
|||||||
t.Fatalf("late hello triggered ResolveUpdateFailed: %v", a.resolved)
|
t.Fatalf("late hello triggered ResolveUpdateFailed: %v", a.resolved)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestUpdateWatcherOnHelloBroadcastsJobFinished(t *testing.T) {
|
||||||
|
st := openWSTestStore(t)
|
||||||
|
hostID := ulid.Make().String()
|
||||||
|
seedHostWS(t, st, hostID)
|
||||||
|
jobID := seedJob(t, st, hostID)
|
||||||
|
|
||||||
|
hub := NewJobHub()
|
||||||
|
sub := hub.Register(jobID)
|
||||||
|
defer sub.unregister()
|
||||||
|
|
||||||
|
w := NewUpdateWatcher(st, &fakeAlerts{}, hub)
|
||||||
|
w.Track(jobID, hostID)
|
||||||
|
w.OnHello(context.Background(), hostID, "v2", "v2")
|
||||||
|
|
||||||
|
select {
|
||||||
|
case env := <-sub.ch:
|
||||||
|
if env.Type != api.MsgJobFinished {
|
||||||
|
t.Fatalf("envelope type: got %q want %q", env.Type, api.MsgJobFinished)
|
||||||
|
}
|
||||||
|
var p api.JobFinishedPayload
|
||||||
|
if err := env.UnmarshalPayload(&p); err != nil {
|
||||||
|
t.Fatalf("unmarshal payload: %v", err)
|
||||||
|
}
|
||||||
|
if p.JobID != jobID || p.Status != api.JobSucceeded {
|
||||||
|
t.Fatalf("payload: got %+v", p)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("expected synthetic job.finished broadcast, got nothing")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdateWatcherTimeoutBroadcastsJobFinished(t *testing.T) {
|
||||||
|
prev := updateTimeout
|
||||||
|
updateTimeout = 50 * time.Millisecond
|
||||||
|
t.Cleanup(func() { updateTimeout = prev })
|
||||||
|
|
||||||
|
st := openWSTestStore(t)
|
||||||
|
hostID := ulid.Make().String()
|
||||||
|
seedHostWS(t, st, hostID)
|
||||||
|
jobID := seedJob(t, st, hostID)
|
||||||
|
|
||||||
|
hub := NewJobHub()
|
||||||
|
sub := hub.Register(jobID)
|
||||||
|
defer sub.unregister()
|
||||||
|
|
||||||
|
w := NewUpdateWatcher(st, &fakeAlerts{}, hub)
|
||||||
|
w.Track(jobID, hostID)
|
||||||
|
|
||||||
|
time.Sleep(80 * time.Millisecond)
|
||||||
|
w.sweep(context.Background(), time.Now())
|
||||||
|
|
||||||
|
select {
|
||||||
|
case env := <-sub.ch:
|
||||||
|
if env.Type != api.MsgJobFinished {
|
||||||
|
t.Fatalf("envelope type: got %q want %q", env.Type, api.MsgJobFinished)
|
||||||
|
}
|
||||||
|
var p api.JobFinishedPayload
|
||||||
|
if err := env.UnmarshalPayload(&p); err != nil {
|
||||||
|
t.Fatalf("unmarshal payload: %v", err)
|
||||||
|
}
|
||||||
|
if p.JobID != jobID || p.Status != api.JobFailed {
|
||||||
|
t.Fatalf("payload: got %+v", p)
|
||||||
|
}
|
||||||
|
case <-time.After(time.Second):
|
||||||
|
t.Fatal("expected synthetic job.finished broadcast, got nothing")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user