ws: synthesize job.finished from update watcher so browser stream wakes up
This commit is contained in:
@@ -7,6 +7,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/api"
|
||||
"gitea.dcglab.co.uk/steve/restic-manager/internal/store"
|
||||
)
|
||||
|
||||
@@ -29,6 +30,7 @@ type AlertRaiser interface {
|
||||
type UpdateWatcher struct {
|
||||
store *store.Store
|
||||
alerts AlertRaiser
|
||||
jobHub *JobHub // optional — if nil, no fan-out to browser streams
|
||||
|
||||
mu sync.Mutex
|
||||
entries map[string]*updateEntry // hostID → entry
|
||||
@@ -46,10 +48,11 @@ type updateEntry struct {
|
||||
|
||||
// NewUpdateWatcher builds an unstarted watcher. Call Run in a goroutine
|
||||
// to start the periodic sweep.
|
||||
func NewUpdateWatcher(st *store.Store, alerts AlertRaiser) *UpdateWatcher {
|
||||
func NewUpdateWatcher(st *store.Store, alerts AlertRaiser, jobHub *JobHub) *UpdateWatcher {
|
||||
return &UpdateWatcher{
|
||||
store: st,
|
||||
alerts: alerts,
|
||||
jobHub: jobHub,
|
||||
entries: make(map[string]*updateEntry),
|
||||
tickPeriod: 5 * time.Second,
|
||||
}
|
||||
@@ -95,6 +98,7 @@ func (w *UpdateWatcher) OnHello(ctx context.Context, hostID, agentVersion, targe
|
||||
if err := w.store.MarkJobFinished(ctx, jobID, "succeeded", 0, nil, "", now); err != nil {
|
||||
slog.Warn("ws update watcher: mark succeeded", "job_id", jobID, "host_id", hostID, "err", err)
|
||||
}
|
||||
w.publishJobFinished(jobID, api.JobSucceeded, 0, "", now)
|
||||
if w.alerts != nil {
|
||||
w.alerts.ResolveUpdateFailed(ctx, hostID, now)
|
||||
}
|
||||
@@ -144,8 +148,37 @@ func (w *UpdateWatcher) sweep(ctx context.Context, now time.Time) {
|
||||
if err := w.store.MarkJobFinished(ctx, x.jobID, "failed", -1, nil, errMsg, stamp); err != nil {
|
||||
slog.Warn("ws update watcher: mark failed", "job_id", x.jobID, "host_id", x.hostID, "err", err)
|
||||
}
|
||||
w.publishJobFinished(x.jobID, api.JobFailed, -1, errMsg, stamp)
|
||||
if w.alerts != nil {
|
||||
w.alerts.RaiseUpdateFailed(ctx, x.hostID, x.jobID, reason, stamp)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// publishJobFinished pushes a synthetic job.finished envelope into the
|
||||
// JobHub so any browser still streaming this job sees it terminate.
|
||||
// The agent itself exits before it can send job.finished (it has to —
|
||||
// it's about to relaunch into the new binary), so without this fan-out
|
||||
// the /jobs/{id} page hangs until reload.
|
||||
//
|
||||
// Best-effort: if the hub is nil or the envelope can't be marshalled
|
||||
// we log and move on — the DB-side state is already correct, this is
|
||||
// purely a UI wake-up.
|
||||
func (w *UpdateWatcher) publishJobFinished(jobID string, status api.JobStatus, exitCode int, errMsg string, finishedAt time.Time) {
|
||||
if w.jobHub == nil {
|
||||
return
|
||||
}
|
||||
payload := api.JobFinishedPayload{
|
||||
JobID: jobID,
|
||||
Status: status,
|
||||
ExitCode: exitCode,
|
||||
FinishedAt: finishedAt,
|
||||
Error: errMsg,
|
||||
}
|
||||
env, err := api.Marshal(api.MsgJobFinished, "", payload)
|
||||
if err != nil {
|
||||
slog.Warn("ws update watcher: marshal synthetic job.finished", "job_id", jobID, "err", err)
|
||||
return
|
||||
}
|
||||
w.jobHub.Broadcast(jobID, env)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user