agent/runner: ship repo.stats before job.finished in RunCheck/RunUnlock
RunCheck and RunUnlock were calling sendFinished before reportStats, inverting the required job.started → log.stream → repo.stats → job.finished envelope order. Move reportStats ahead of sendFinished in both functions to match the pattern already correct in RunPrune. Strengthen TestRunCheckShipsCheckStatus, TestRunCheckErrorsFoundShipsErrorsStatus, and TestRunUnlockClearsLock with the same position-index ordering assertions used by TestRunPruneShipsExpectedEnvelopes; these assertions would have failed against the pre-fix code.
This commit is contained in:
@@ -276,7 +276,6 @@ func (r *Runner) RunCheck(ctx context.Context, jobID string, subsetPct int) erro
|
|||||||
var seq atomic.Int64
|
var seq atomic.Int64
|
||||||
res, err := env.RunCheck(ctx, subsetPct, r.streamHandler(jobID, &seq))
|
res, err := env.RunCheck(ctx, subsetPct, r.streamHandler(jobID, &seq))
|
||||||
finishedAt := time.Now().UTC()
|
finishedAt := time.Now().UTC()
|
||||||
r.sendFinished(jobID, finishedAt, err, nil)
|
|
||||||
|
|
||||||
// Determine check status string.
|
// Determine check status string.
|
||||||
checkStatus := "ok"
|
checkStatus := "ok"
|
||||||
@@ -297,6 +296,8 @@ func (r *Runner) RunCheck(ctx context.Context, jobID string, subsetPct int) erro
|
|||||||
slog.Warn("runner: stats.report after check failed", "job_id", jobID, "err", rerr)
|
slog.Warn("runner: stats.report after check failed", "job_id", jobID, "err", rerr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.sendFinished(jobID, finishedAt, err, nil)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("runner check: %w", err)
|
return fmt.Errorf("runner check: %w", err)
|
||||||
}
|
}
|
||||||
@@ -313,7 +314,6 @@ func (r *Runner) RunUnlock(ctx context.Context, jobID string) error {
|
|||||||
var seq atomic.Int64
|
var seq atomic.Int64
|
||||||
err := env.RunUnlock(ctx, r.streamHandler(jobID, &seq))
|
err := env.RunUnlock(ctx, r.streamHandler(jobID, &seq))
|
||||||
finishedAt := time.Now().UTC()
|
finishedAt := time.Now().UTC()
|
||||||
r.sendFinished(jobID, finishedAt, err, nil)
|
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
lockFalse := false
|
lockFalse := false
|
||||||
@@ -323,6 +323,8 @@ func (r *Runner) RunUnlock(ctx context.Context, jobID string) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
r.sendFinished(jobID, finishedAt, err, nil)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("runner unlock: %w", err)
|
return fmt.Errorf("runner unlock: %w", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -143,6 +143,32 @@ esac
|
|||||||
t.Fatalf("RunCheck: %v", err)
|
t.Fatalf("RunCheck: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Assert envelope ordering: job.started → log.stream → repo.stats → job.finished.
|
||||||
|
order := envelopeOrder(tx.envs)
|
||||||
|
wantTypes := []api.MessageType{api.MsgJobStarted, api.MsgLogStream, api.MsgRepoStats, api.MsgJobFinished}
|
||||||
|
positions := map[api.MessageType]int{}
|
||||||
|
for i, mt := range order {
|
||||||
|
if _, seen := positions[mt]; !seen {
|
||||||
|
positions[mt] = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i := 0; i < len(wantTypes)-1; i++ {
|
||||||
|
a, b := wantTypes[i], wantTypes[i+1]
|
||||||
|
pa, aOK := positions[a]
|
||||||
|
pb, bOK := positions[b]
|
||||||
|
if !aOK {
|
||||||
|
t.Errorf("envelope type %q not found in output %v", a, order)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !bOK {
|
||||||
|
t.Errorf("envelope type %q not found in output %v", b, order)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if pa >= pb {
|
||||||
|
t.Errorf("expected %q before %q but positions are %d >= %d (order: %v)", a, b, pa, pb, order)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
statsEnv := firstEnvOfType(t, tx.envs, api.MsgRepoStats)
|
statsEnv := firstEnvOfType(t, tx.envs, api.MsgRepoStats)
|
||||||
var p api.RepoStatsPayload
|
var p api.RepoStatsPayload
|
||||||
if err := statsEnv.UnmarshalPayload(&p); err != nil {
|
if err := statsEnv.UnmarshalPayload(&p); err != nil {
|
||||||
@@ -180,6 +206,33 @@ esac
|
|||||||
t.Fatalf("RunCheck: %v", err)
|
t.Fatalf("RunCheck: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Assert envelope ordering: job.started → repo.stats → job.finished.
|
||||||
|
// (No log.stream expected here because check exits immediately.)
|
||||||
|
order := envelopeOrder(tx.envs)
|
||||||
|
wantTypes := []api.MessageType{api.MsgJobStarted, api.MsgRepoStats, api.MsgJobFinished}
|
||||||
|
positions := map[api.MessageType]int{}
|
||||||
|
for i, mt := range order {
|
||||||
|
if _, seen := positions[mt]; !seen {
|
||||||
|
positions[mt] = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i := 0; i < len(wantTypes)-1; i++ {
|
||||||
|
a, b := wantTypes[i], wantTypes[i+1]
|
||||||
|
pa, aOK := positions[a]
|
||||||
|
pb, bOK := positions[b]
|
||||||
|
if !aOK {
|
||||||
|
t.Errorf("envelope type %q not found in output %v", a, order)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !bOK {
|
||||||
|
t.Errorf("envelope type %q not found in output %v", b, order)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if pa >= pb {
|
||||||
|
t.Errorf("expected %q before %q but positions are %d >= %d (order: %v)", a, b, pa, pb, order)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
statsEnv := firstEnvOfType(t, tx.envs, api.MsgRepoStats)
|
statsEnv := firstEnvOfType(t, tx.envs, api.MsgRepoStats)
|
||||||
var p api.RepoStatsPayload
|
var p api.RepoStatsPayload
|
||||||
if err := statsEnv.UnmarshalPayload(&p); err != nil {
|
if err := statsEnv.UnmarshalPayload(&p); err != nil {
|
||||||
@@ -210,6 +263,32 @@ esac
|
|||||||
t.Fatalf("RunUnlock: %v", err)
|
t.Fatalf("RunUnlock: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Assert envelope ordering: job.started → log.stream → repo.stats → job.finished.
|
||||||
|
order := envelopeOrder(tx.envs)
|
||||||
|
wantTypes := []api.MessageType{api.MsgJobStarted, api.MsgLogStream, api.MsgRepoStats, api.MsgJobFinished}
|
||||||
|
positions := map[api.MessageType]int{}
|
||||||
|
for i, mt := range order {
|
||||||
|
if _, seen := positions[mt]; !seen {
|
||||||
|
positions[mt] = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for i := 0; i < len(wantTypes)-1; i++ {
|
||||||
|
a, b := wantTypes[i], wantTypes[i+1]
|
||||||
|
pa, aOK := positions[a]
|
||||||
|
pb, bOK := positions[b]
|
||||||
|
if !aOK {
|
||||||
|
t.Errorf("envelope type %q not found in output %v", a, order)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !bOK {
|
||||||
|
t.Errorf("envelope type %q not found in output %v", b, order)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if pa >= pb {
|
||||||
|
t.Errorf("expected %q before %q but positions are %d >= %d (order: %v)", a, b, pa, pb, order)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
statsEnv := firstEnvOfType(t, tx.envs, api.MsgRepoStats)
|
statsEnv := firstEnvOfType(t, tx.envs, api.MsgRepoStats)
|
||||||
var p api.RepoStatsPayload
|
var p api.RepoStatsPayload
|
||||||
if err := statsEnv.UnmarshalPayload(&p); err != nil {
|
if err := statsEnv.UnmarshalPayload(&p); err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user