diff --git a/cmd/agent/main.go b/cmd/agent/main.go index ac43d3c..5cac43e 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -210,6 +210,45 @@ type dispatcher struct { bwMu sync.Mutex bwUpKBps int bwDownKBps int + + // Per-running-job cancellation handles. Populated when runJob + // spawns the goroutine, removed when it returns. Looked up by + // the command.cancel handler (server → agent) to abort an + // in-flight restic invocation. + cancelMu sync.Mutex + cancels map[string]context.CancelFunc +} + +// trackJob registers a cancel func for an in-flight job and returns a +// cleanup that removes it. Call cleanup when the job goroutine exits +// regardless of outcome — runs even on panic. +func (d *dispatcher) trackJob(jobID string, cancel context.CancelFunc) func() { + d.cancelMu.Lock() + if d.cancels == nil { + d.cancels = make(map[string]context.CancelFunc) + } + d.cancels[jobID] = cancel + d.cancelMu.Unlock() + return func() { + d.cancelMu.Lock() + delete(d.cancels, jobID) + d.cancelMu.Unlock() + } +} + +// cancelJob fires the cancel func for jobID if there is one and +// returns whether the job was actually known. The runner is expected +// to surface the resulting context.Canceled as a JobCancelled status +// in its job.finished envelope (see runner.sendFinished). +func (d *dispatcher) cancelJob(jobID string) bool { + d.cancelMu.Lock() + cancel, ok := d.cancels[jobID] + d.cancelMu.Unlock() + if !ok { + return false + } + cancel() + return true } func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.Sender) error { @@ -222,8 +261,18 @@ func (d *dispatcher) handle(ctx context.Context, env api.Envelope, tx wsclient.S return d.runJob(ctx, p, tx) case api.MsgCommandCancel: - // TODO(P2): cancellation requires keeping a job→cancelFunc map. - slog.Info("ws agent: command.cancel received (cancellation lands in P2)", "id", env.ID) + var p api.CommandCancelPayload + if err := env.UnmarshalPayload(&p); err != nil { + return fmt.Errorf("command.cancel: %w", err) + } + if d.cancelJob(p.JobID) { + slog.Info("ws agent: command.cancel applied", "job_id", p.JobID) + } else { + // Job already finished or was never seen on this agent. + // Not an error — operator may have raced cancel against + // natural completion. Server-side state is authoritative. + slog.Info("ws agent: command.cancel for unknown job (already finished?)", "job_id", p.JobID) + } case api.MsgScheduleSet: var p api.ScheduleSetPayload @@ -374,6 +423,25 @@ func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsc LimitDownloadKBps: downKBps, }, tx, time.Second) + // spawn wraps the kind-specific goroutine: derives a per-job + // cancellable context from the connection-scoped ctx, registers + // the cancel func so command.cancel can fire it, deregisters on + // completion. Per-job ctx means canceling one job doesn't kill + // any other in-flight invocations. + spawn := func(name string, fn func(ctx context.Context) error) { + jobCtx, cancel := context.WithCancel(ctx) + cleanup := d.trackJob(p.JobID, cancel) + go func() { + defer cleanup() + defer cancel() // release ctx resources on goroutine exit + if err := fn(jobCtx); err != nil { + slog.Warn("agent: "+name+" job failed", "job_id", p.JobID, "err", err) + return + } + slog.Info("agent: "+name+" job complete", "job_id", p.JobID) + }() + } + switch p.Kind { case api.JobBackup: // Includes/Excludes/Tag come from the source group resolved @@ -391,22 +459,14 @@ func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsc slog.Info("agent: accepting backup job", "job_id", p.JobID, "paths", paths, "excludes", p.Excludes, "tag", p.Tag) hooks := runner.BackupHooks{Pre: p.PreHook, Post: p.PostHook} - go func() { - if err := r.RunBackup(ctx, p.JobID, paths, p.Excludes, tags, hooks); err != nil { - slog.Warn("agent: backup job failed", "job_id", p.JobID, "err", err) - return - } - slog.Info("agent: backup job complete", "job_id", p.JobID) - }() + spawn("backup", func(jobCtx context.Context) error { + return r.RunBackup(jobCtx, p.JobID, paths, p.Excludes, tags, hooks) + }) case api.JobInit: slog.Info("agent: accepting init job", "job_id", p.JobID) - go func() { - if err := r.RunInit(ctx, p.JobID); err != nil { - slog.Warn("agent: init job failed", "job_id", p.JobID, "err", err) - return - } - slog.Info("agent: init job complete", "job_id", p.JobID) - }() + spawn("init", func(jobCtx context.Context) error { + return r.RunInit(jobCtx, p.JobID) + }) case api.JobForget: if len(p.ForgetGroups) == 0 { // Hard-error rather than fall back to a single-policy form: @@ -433,13 +493,9 @@ func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsc }) } slog.Info("agent: accepting forget job", "job_id", p.JobID, "groups", len(groups)) - go func() { - if err := r.RunForget(ctx, p.JobID, groups); err != nil { - slog.Warn("agent: forget job failed", "job_id", p.JobID, "err", err) - return - } - slog.Info("agent: forget job complete", "job_id", p.JobID) - }() + spawn("forget", func(jobCtx context.Context) error { + return r.RunForget(jobCtx, p.JobID, groups) + }) case api.JobPrune: // Prune may require admin creds (delete authority on rest-server). runCreds := creds @@ -462,29 +518,23 @@ func (d *dispatcher) runJob(ctx context.Context, p api.CommandRunPayload, tx wsc LimitDownloadKBps: downKBps, }, tx, time.Second) slog.Info("agent: accepting prune job", "job_id", p.JobID, "admin_creds", p.RequiresAdminCreds) - go func() { - if err := prr.RunPrune(ctx, p.JobID); err != nil { - slog.Warn("agent: prune job failed", "job_id", p.JobID, "err", err) - } - }() + spawn("prune", func(jobCtx context.Context) error { + return prr.RunPrune(jobCtx, p.JobID) + }) case api.JobCheck: subset := 0 if len(p.Args) > 0 { subset, _ = strconv.Atoi(p.Args[0]) } slog.Info("agent: accepting check job", "job_id", p.JobID, "subset_pct", subset) - go func() { - if err := r.RunCheck(ctx, p.JobID, subset); err != nil { - slog.Warn("agent: check job failed", "job_id", p.JobID, "err", err) - } - }() + spawn("check", func(jobCtx context.Context) error { + return r.RunCheck(jobCtx, p.JobID, subset) + }) case api.JobUnlock: slog.Info("agent: accepting unlock job", "job_id", p.JobID) - go func() { - if err := r.RunUnlock(ctx, p.JobID); err != nil { - slog.Warn("agent: unlock job failed", "job_id", p.JobID, "err", err) - } - }() + spawn("unlock", func(jobCtx context.Context) error { + return r.RunUnlock(jobCtx, p.JobID) + }) default: return fmt.Errorf("kind %q not implemented yet (Phase 2 lands the rest)", p.Kind) } diff --git a/internal/agent/runner/cancel_test.go b/internal/agent/runner/cancel_test.go new file mode 100644 index 0000000..cacadd2 --- /dev/null +++ b/internal/agent/runner/cancel_test.go @@ -0,0 +1,100 @@ +package runner + +import ( + "context" + "strings" + "sync" + "testing" + "time" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" +) + +// safeSender is a thread-safe variant of fakeSender. The cancel test +// has the runner goroutine sending envelopes while the test goroutine +// is reading the slice, so we need a mutex. +type safeSender struct { + mu sync.Mutex + envs []api.Envelope +} + +func (s *safeSender) Send(e api.Envelope) error { + s.mu.Lock() + s.envs = append(s.envs, e) + s.mu.Unlock() + return nil +} + +func (s *safeSender) snapshot() []api.Envelope { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]api.Envelope, len(s.envs)) + copy(out, s.envs) + return out +} + +// TestRunBackupCanceledMidRunReportsCanceled spawns a backup against +// a fake restic that sleeps for 30 seconds, cancels the context after +// a short delay, and confirms the resulting job.finished envelope +// reports status=canceled (not failed). +func TestRunBackupCanceledMidRunReportsCanceled(t *testing.T) { + t.Parallel() + + // Fake restic: replace the shell with a long sleep via `exec` so the + // process tree is one process — SIGTERM goes directly to sleep and + // it exits. Without `exec`, the shell stays in the foreground while + // sleep is its child; SIGTERM-to-shell may or may not propagate to + // sleep depending on the shell, leading to the WaitDelay-then- + // SIGKILL fallback path firing — slower and noisier. + bin := setupScript(t, `exec sleep 30`) + + tx := &safeSender{} + r := New(Config{ResticBin: bin}, tx, 0) + + ctx, cancel := context.WithCancel(context.Background()) + done := make(chan error, 1) + go func() { + done <- r.RunBackup(ctx, "job-cancel", []string{"/tmp/x"}, nil, nil, BackupHooks{}) + }() + + // Wait long enough for the subprocess to actually start before + // canceling. Without this, exec.CommandContext can race the + // kill against Start and produce a different error path. + time.Sleep(150 * time.Millisecond) + cancel() + + select { + case <-done: + case <-time.After(15 * time.Second): + t.Fatal("RunBackup did not return within 15s of cancel") + } + + // Locate the job.finished envelope and check its status. + envs := tx.snapshot() + var finEnv api.Envelope + var found bool + for _, e := range envs { + if e.Type == api.MsgJobFinished { + finEnv = e + found = true + break + } + } + if !found { + t.Fatal("no job.finished envelope was sent") + } + var fin api.JobFinishedPayload + if err := finEnv.UnmarshalPayload(&fin); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if fin.Status != api.JobCancelled { + t.Fatalf("status: got %q, want %q", fin.Status, api.JobCancelled) + } + if fin.ExitCode != 130 { + t.Errorf("exit_code: got %d, want 130 (POSIX cancel convention)", fin.ExitCode) + } + // The error message should be empty for canceled jobs (see runner.sendFinished). + if !strings.HasPrefix(fin.Error, "") || fin.Error != "" { + t.Errorf("error: got %q, want empty for canceled jobs", fin.Error) + } +} diff --git a/internal/agent/runner/runner.go b/internal/agent/runner/runner.go index aae9882..9666b39 100644 --- a/internal/agent/runner/runner.go +++ b/internal/agent/runner/runner.go @@ -95,8 +95,10 @@ func (r *Runner) streamHandler(jobID string, seq *atomic.Int64) restic.LineHandl } // sendFinished ships a job.finished envelope. err==nil → succeeded; -// otherwise failed. statsBlob is forwarded as JobFinishedPayload.Stats. -func (r *Runner) sendFinished(jobID string, finishedAt time.Time, err error, statsBlob json.RawMessage) { +// otherwise failed (or canceled if ctx was canceled — operator +// hit the Cancel button or the agent is shutting down). +// statsBlob is forwarded as JobFinishedPayload.Stats. +func (r *Runner) sendFinished(ctx context.Context, jobID string, finishedAt time.Time, err error, statsBlob json.RawMessage) { status := api.JobSucceeded exit := 0 errMsg := "" @@ -104,6 +106,16 @@ func (r *Runner) sendFinished(jobID string, finishedAt time.Time, err error, sta status = api.JobFailed exit = -1 errMsg = err.Error() + // If the context was canceled, the failure is operator-driven + // (or shutdown). Surface as JobCancelled so the UI shows a + // neutral "canceled" state rather than a red "failed" one. + // exec.CommandContext returns the process's exit error on + // ctx-cancel, which we'd otherwise rebadge as failed. + if ctxErr := ctx.Err(); ctxErr != nil { + status = api.JobCancelled + exit = 130 // POSIX convention for SIGINT/SIGTERM-killed + errMsg = "" // no need to surface the underlying restic error + } } finEnv, _ := api.Marshal(api.MsgJobFinished, jobID, api.JobFinishedPayload{ JobID: jobID, @@ -138,7 +150,7 @@ func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, t if hooks.Pre != "" { if err := r.runHook(ctx, jobID, "pre", hooks.Pre, "", &seq); err != nil { finishedAt := time.Now().UTC() - r.sendFinished(jobID, finishedAt, err, nil) + r.sendFinished(ctx, jobID, finishedAt, err, nil) return fmt.Errorf("pre_hook failed: %w", err) } } @@ -206,7 +218,7 @@ func (r *Runner) RunBackup(ctx context.Context, jobID string, paths, excludes, t } } - r.sendFinished(jobID, finishedAt, err, statsBlob) + r.sendFinished(ctx, jobID, finishedAt, err, statsBlob) // On a successful backup, refresh the server's snapshot projection. // We do this *after* job.finished so the UI sees the job land first; @@ -240,7 +252,7 @@ func (r *Runner) RunInit(ctx context.Context, jobID string) error { var seq atomic.Int64 err := env.RunInit(ctx, r.streamHandler(jobID, &seq)) finishedAt := time.Now().UTC() - r.sendFinished(jobID, finishedAt, err, nil) + r.sendFinished(ctx, jobID, finishedAt, err, nil) if err != nil { return fmt.Errorf("runner init: %w", err) } @@ -262,7 +274,7 @@ func (r *Runner) RunForget(ctx context.Context, jobID string, groups []restic.Fo var seq atomic.Int64 err := env.RunForget(ctx, groups, r.streamHandler(jobID, &seq)) finishedAt := time.Now().UTC() - r.sendFinished(jobID, finishedAt, err, nil) + r.sendFinished(ctx, jobID, finishedAt, err, nil) // Refresh the server's snapshot projection — forget rewrites the // index so the host's snapshot list almost certainly shrunk. @@ -300,7 +312,7 @@ func (r *Runner) RunPrune(ctx context.Context, jobID string) error { } } - r.sendFinished(jobID, finishedAt, err, nil) + r.sendFinished(ctx, jobID, finishedAt, err, nil) if err != nil { return fmt.Errorf("runner prune: %w", err) @@ -339,7 +351,7 @@ func (r *Runner) RunCheck(ctx context.Context, jobID string, subsetPct int) erro slog.Warn("runner: stats.report after check failed", "job_id", jobID, "err", rerr) } - r.sendFinished(jobID, finishedAt, err, nil) + r.sendFinished(ctx, jobID, finishedAt, err, nil) if err != nil { return fmt.Errorf("runner check: %w", err) @@ -366,7 +378,7 @@ func (r *Runner) RunUnlock(ctx context.Context, jobID string) error { } } - r.sendFinished(jobID, finishedAt, err, nil) + r.sendFinished(ctx, jobID, finishedAt, err, nil) if err != nil { return fmt.Errorf("runner unlock: %w", err) diff --git a/internal/restic/cancel_unix.go b/internal/restic/cancel_unix.go new file mode 100644 index 0000000..f74fdd6 --- /dev/null +++ b/internal/restic/cancel_unix.go @@ -0,0 +1,7 @@ +//go:build !windows + +package restic + +import "syscall" + +var sigterm = syscall.SIGTERM diff --git a/internal/restic/cancel_windows.go b/internal/restic/cancel_windows.go new file mode 100644 index 0000000..b8f47b7 --- /dev/null +++ b/internal/restic/cancel_windows.go @@ -0,0 +1,12 @@ +//go:build windows + +package restic + +import "os" + +// Windows has no SIGTERM. The closest equivalent is os.Interrupt +// (CTRL_BREAK_EVENT), but Go's exec.Cmd.Process.Signal() on Windows +// only supports os.Kill — sending anything else returns an error and +// no signal is delivered. Fall back to os.Kill so Cancel still works +// (immediate force-kill); WaitDelay is unused but harmless. +var sigterm = os.Kill diff --git a/internal/restic/runner.go b/internal/restic/runner.go index 6104e7a..0ba9bb9 100644 --- a/internal/restic/runner.go +++ b/internal/restic/runner.go @@ -72,11 +72,30 @@ func (e Env) globalArgs() []string { // before the supplied subcommand args. Centralizing this so every // command (backup/forget/prune/check/unlock/init/stats) honors // the caps without each call site having to remember. +// +// Cancellation: by default exec.CommandContext sends SIGKILL when +// ctx is canceled, which leaves restic no chance to clean up its +// repository lock. Override Cmd.Cancel to send SIGTERM first, and +// set Cmd.WaitDelay so the process is force-killed if it doesn't +// exit within five seconds. Restic responds to SIGTERM by removing +// its lock file before exiting, which is what we want when an +// operator cancels a long-running backup/restore from the UI. func (e Env) resticCmd(ctx context.Context, sub ...string) *exec.Cmd { args := append(e.globalArgs(), sub...) cmd := exec.CommandContext(ctx, e.Bin, args...) cmd.Env = e.envSlice() cmd.Dir = e.WorkDir + cmd.Cancel = func() error { + // Cmd.Process is set after Start; Cancel only fires post-Start + // so the nil check is defensive against the documented but + // unlikely race. Signal returns ErrProcessDone if the process + // already exited; that's not a problem here either. + if cmd.Process == nil { + return nil + } + return cmd.Process.Signal(sigterm) + } + cmd.WaitDelay = 5 * time.Second return cmd } diff --git a/internal/server/http/cancel.go b/internal/server/http/cancel.go new file mode 100644 index 0000000..ffb26ed --- /dev/null +++ b/internal/server/http/cancel.go @@ -0,0 +1,86 @@ +package http + +import ( + stdhttp "net/http" + "time" + + "github.com/go-chi/chi/v5" + "github.com/oklog/ulid/v2" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" + "gitea.dcglab.co.uk/steve/restic-manager/internal/store" +) + +// handleCancelJob is POST /api/jobs/{id}/cancel. Sends a command.cancel +// envelope to the host that owns the job; the agent kills the running +// restic subprocess, and the resulting job.finished envelope (status = +// canceled) is what actually transitions the job row — this handler +// does not touch the jobs table directly. Returning 202 makes that +// asynchronicity explicit. +// +// 4xx cases: +// - job not found (404) +// - job already in a terminal state (409 — nothing to cancel) +// - host offline (503 — same code path the run-now endpoint uses) +// +// Audit-logged as job.cancel with the job ID as target. +func (s *Server) handleCancelJob(w stdhttp.ResponseWriter, r *stdhttp.Request) { + user, ok := s.requireUser(r) + if !ok { + writeJSONError(w, stdhttp.StatusUnauthorized, "unauthorized", "") + return + } + jobID := chi.URLParam(r, "id") + if jobID == "" { + writeJSONError(w, stdhttp.StatusBadRequest, "missing_job_id", "") + return + } + + job, err := s.deps.Store.GetJob(r.Context(), jobID) + if err != nil { + writeJSONError(w, stdhttp.StatusNotFound, "job_not_found", "") + return + } + switch api.JobStatus(job.Status) { + case api.JobSucceeded, api.JobFailed, api.JobCancelled: + writeJSONError(w, stdhttp.StatusConflict, "job_terminal", + "job is already in a terminal state ("+job.Status+")") + return + } + + if !s.deps.Hub.Connected(job.HostID) { + writeJSONError(w, stdhttp.StatusServiceUnavailable, "host_offline", + "agent is not connected; can't deliver cancel signal") + return + } + + env, err := api.Marshal(api.MsgCommandCancel, jobID, api.CommandCancelPayload{ + JobID: jobID, + }) + if err != nil { + writeJSONError(w, stdhttp.StatusInternalServerError, "internal", "") + return + } + if err := s.deps.Hub.Send(r.Context(), job.HostID, env); err != nil { + writeJSONError(w, stdhttp.StatusServiceUnavailable, "host_offline", err.Error()) + return + } + + var actorID *string + actor := "system" + if user != nil { + actor = "user" + actorID = &user.ID + } + _ = s.deps.Store.AppendAudit(r.Context(), store.AuditEntry{ + ID: ulid.Make().String(), + UserID: actorID, + Actor: actor, + Action: "job.cancel", + TargetKind: ptr("job"), + TargetID: &jobID, + TS: time.Now().UTC(), + }) + + w.WriteHeader(stdhttp.StatusAccepted) +} diff --git a/internal/server/http/cancel_test.go b/internal/server/http/cancel_test.go new file mode 100644 index 0000000..efcc953 --- /dev/null +++ b/internal/server/http/cancel_test.go @@ -0,0 +1,204 @@ +// cancel_test.go — covers POST /api/jobs/{id}/cancel. +package http + +import ( + "context" + "encoding/json" + stdhttp "net/http" + "strings" + "testing" + "time" + + "github.com/coder/websocket" + "github.com/oklog/ulid/v2" + + "gitea.dcglab.co.uk/steve/restic-manager/internal/api" + "gitea.dcglab.co.uk/steve/restic-manager/internal/store" +) + +// TestCancelJobRunningHappyPath: a running job's cancel endpoint sends +// a command.cancel envelope with the right job id, returns 202, and +// writes a job.cancel audit row. +func TestCancelJobRunningHappyPath(t *testing.T) { + t.Parallel() + srv, ts, st := rawTestServer(t) + hostID, token := enrolHostForWS(t, srv, st, "cancel-host") + c := agentDial(t, srv, ts, hostID, token) + sendHello(t, c, "cancel-host") + _ = drainUntil(t, c, api.MsgScheduleSet) + + // Seed a running job we can target. + jobID := ulid.Make().String() + now := time.Now().UTC() + if err := st.CreateJob(context.Background(), store.Job{ + ID: jobID, HostID: hostID, Kind: "backup", + ActorKind: "user", CreatedAt: now, + }); err != nil { + t.Fatalf("create job: %v", err) + } + if err := st.MarkJobStarted(context.Background(), jobID, now); err != nil { + t.Fatalf("mark started: %v", err) + } + + cookie := loginAsAdmin(t, st) + req, _ := stdhttp.NewRequest("POST", + ts.URL+"/api/jobs/"+jobID+"/cancel", nil) + req.AddCookie(cookie) + res, err := stdhttp.DefaultClient.Do(req) + if err != nil { + t.Fatalf("do: %v", err) + } + defer res.Body.Close() + if res.StatusCode != stdhttp.StatusAccepted { + t.Fatalf("status: got %d, want 202", res.StatusCode) + } + + // Read the dispatched command.cancel envelope. + deadline := time.Now().Add(2 * time.Second) + var got api.Envelope + for time.Now().Before(deadline) { + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + mt, raw, rerr := c.Read(ctx) + cancel() + if rerr != nil { + break + } + if mt != websocket.MessageText { + continue + } + if !strings.Contains(string(raw), `"command.cancel"`) { + continue + } + if err := json.Unmarshal(raw, &got); err != nil { + t.Fatalf("unmarshal: %v", err) + } + break + } + if got.Type != api.MsgCommandCancel { + t.Fatalf("never received command.cancel envelope") + } + var cp api.CommandCancelPayload + if err := got.UnmarshalPayload(&cp); err != nil { + t.Fatalf("unmarshal payload: %v", err) + } + if cp.JobID != jobID { + t.Fatalf("payload job_id: got %q want %q", cp.JobID, jobID) + } + + // Audit row exists. + var n int + if err := st.DB().QueryRow( + `SELECT COUNT(*) FROM audit_log WHERE action = 'job.cancel' AND target_id = ?`, + jobID).Scan(&n); err != nil { + t.Fatalf("audit count: %v", err) + } + if n != 1 { + t.Fatalf("audit rows: got %d, want 1", n) + } +} + +// TestCancelJobAlreadyTerminal: a job in succeeded/failed/canceled +// state returns 409 and does NOT send a WS envelope. +func TestCancelJobAlreadyTerminal(t *testing.T) { + t.Parallel() + srv, ts, st := rawTestServer(t) + hostID, token := enrolHostForWS(t, srv, st, "term-host") + c := agentDial(t, srv, ts, hostID, token) + sendHello(t, c, "term-host") + _ = drainUntil(t, c, api.MsgScheduleSet) + + jobID := ulid.Make().String() + now := time.Now().UTC() + if err := st.CreateJob(context.Background(), store.Job{ + ID: jobID, HostID: hostID, Kind: "backup", + ActorKind: "user", CreatedAt: now, + }); err != nil { + t.Fatalf("create job: %v", err) + } + if err := st.MarkJobFinished(context.Background(), jobID, "succeeded", 0, nil, "", now); err != nil { + t.Fatalf("mark finished: %v", err) + } + + cookie := loginAsAdmin(t, st) + req, _ := stdhttp.NewRequest("POST", + ts.URL+"/api/jobs/"+jobID+"/cancel", nil) + req.AddCookie(cookie) + res, err := stdhttp.DefaultClient.Do(req) + if err != nil { + t.Fatalf("do: %v", err) + } + defer res.Body.Close() + if res.StatusCode != stdhttp.StatusConflict { + t.Fatalf("status: got %d, want 409", res.StatusCode) + } + + // Drain — no command.cancel should arrive. + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) + defer cancel() + for { + mt, raw, rerr := c.Read(ctx) + if rerr != nil { + break + } + if mt == websocket.MessageText && strings.Contains(string(raw), `"command.cancel"`) { + t.Fatalf("unexpected command.cancel envelope for terminal job") + } + } +} + +// TestCancelJobNotFound: 404 for a job id that doesn't exist. +func TestCancelJobNotFound(t *testing.T) { + t.Parallel() + _, ts, st := rawTestServer(t) + cookie := loginAsAdmin(t, st) + req, _ := stdhttp.NewRequest("POST", + ts.URL+"/api/jobs/"+ulid.Make().String()+"/cancel", nil) + req.AddCookie(cookie) + res, err := stdhttp.DefaultClient.Do(req) + if err != nil { + t.Fatalf("do: %v", err) + } + defer res.Body.Close() + if res.StatusCode != stdhttp.StatusNotFound { + t.Fatalf("status: got %d, want 404", res.StatusCode) + } +} + +// TestCancelJobHostOffline: a queued/running job whose host has no +// active WS connection returns 503. +func TestCancelJobHostOffline(t *testing.T) { + t.Parallel() + _, ts, st := rawTestServer(t) + // Create a host but don't connect a WS for it. + hostID := ulid.Make().String() + if err := st.CreateHost(context.Background(), store.Host{ + ID: hostID, Name: "offline-host", OS: "linux", Arch: "amd64", + EnrolledAt: time.Now().UTC(), + }, "deadbeef", ""); err != nil { + t.Fatalf("create host: %v", err) + } + jobID := ulid.Make().String() + now := time.Now().UTC() + if err := st.CreateJob(context.Background(), store.Job{ + ID: jobID, HostID: hostID, Kind: "backup", + ActorKind: "user", CreatedAt: now, + }); err != nil { + t.Fatalf("create job: %v", err) + } + if err := st.MarkJobStarted(context.Background(), jobID, now); err != nil { + t.Fatalf("mark started: %v", err) + } + + cookie := loginAsAdmin(t, st) + req, _ := stdhttp.NewRequest("POST", + ts.URL+"/api/jobs/"+jobID+"/cancel", nil) + req.AddCookie(cookie) + res, err := stdhttp.DefaultClient.Do(req) + if err != nil { + t.Fatalf("do: %v", err) + } + defer res.Body.Close() + if res.StatusCode != stdhttp.StatusServiceUnavailable { + t.Fatalf("status: got %d, want 503", res.StatusCode) + } +} diff --git a/internal/server/http/server.go b/internal/server/http/server.go index 5fd6539..b808cfc 100644 --- a/internal/server/http/server.go +++ b/internal/server/http/server.go @@ -178,6 +178,12 @@ func (s *Server) routes(r chi.Router) { r.Post("/hosts/{id}/repo/prune", s.handleRunRepoPrune) r.Post("/hosts/{id}/repo/check", s.handleRunRepoCheck) r.Post("/hosts/{id}/repo/unlock", s.handleRunRepoUnlock) + + // Cancel a running job. Operator-driven, sends command.cancel + // to the agent which kills the restic subprocess; the agent's + // resulting job.finished (status=canceled) is what flips the + // job row. + r.Post("/jobs/{id}/cancel", s.handleCancelJob) }) // Per-source-group Run-now (HTMX form action). Available even