P2 redesign Phase 5 — prune/check/unlock + maintenance ticker + repo stats + pending-runs queue #3
@@ -276,7 +276,6 @@ func (r *Runner) RunCheck(ctx context.Context, jobID string, subsetPct int) erro
|
||||
var seq atomic.Int64
|
||||
res, err := env.RunCheck(ctx, subsetPct, r.streamHandler(jobID, &seq))
|
||||
finishedAt := time.Now().UTC()
|
||||
r.sendFinished(jobID, finishedAt, err, nil)
|
||||
|
||||
// Determine check status string.
|
||||
checkStatus := "ok"
|
||||
@@ -297,6 +296,8 @@ func (r *Runner) RunCheck(ctx context.Context, jobID string, subsetPct int) erro
|
||||
slog.Warn("runner: stats.report after check failed", "job_id", jobID, "err", rerr)
|
||||
}
|
||||
|
||||
r.sendFinished(jobID, finishedAt, err, nil)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("runner check: %w", err)
|
||||
}
|
||||
@@ -313,7 +314,6 @@ func (r *Runner) RunUnlock(ctx context.Context, jobID string) error {
|
||||
var seq atomic.Int64
|
||||
err := env.RunUnlock(ctx, r.streamHandler(jobID, &seq))
|
||||
finishedAt := time.Now().UTC()
|
||||
r.sendFinished(jobID, finishedAt, err, nil)
|
||||
|
||||
if err == nil {
|
||||
lockFalse := false
|
||||
@@ -323,6 +323,8 @@ func (r *Runner) RunUnlock(ctx context.Context, jobID string) error {
|
||||
}
|
||||
}
|
||||
|
||||
r.sendFinished(jobID, finishedAt, err, nil)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("runner unlock: %w", err)
|
||||
}
|
||||
|
||||
@@ -143,6 +143,32 @@ esac
|
||||
t.Fatalf("RunCheck: %v", err)
|
||||
}
|
||||
|
||||
// Assert envelope ordering: job.started → log.stream → repo.stats → job.finished.
|
||||
order := envelopeOrder(tx.envs)
|
||||
wantTypes := []api.MessageType{api.MsgJobStarted, api.MsgLogStream, api.MsgRepoStats, api.MsgJobFinished}
|
||||
positions := map[api.MessageType]int{}
|
||||
for i, mt := range order {
|
||||
if _, seen := positions[mt]; !seen {
|
||||
positions[mt] = i
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(wantTypes)-1; i++ {
|
||||
a, b := wantTypes[i], wantTypes[i+1]
|
||||
pa, aOK := positions[a]
|
||||
pb, bOK := positions[b]
|
||||
if !aOK {
|
||||
t.Errorf("envelope type %q not found in output %v", a, order)
|
||||
continue
|
||||
}
|
||||
if !bOK {
|
||||
t.Errorf("envelope type %q not found in output %v", b, order)
|
||||
continue
|
||||
}
|
||||
if pa >= pb {
|
||||
t.Errorf("expected %q before %q but positions are %d >= %d (order: %v)", a, b, pa, pb, order)
|
||||
}
|
||||
}
|
||||
|
||||
statsEnv := firstEnvOfType(t, tx.envs, api.MsgRepoStats)
|
||||
var p api.RepoStatsPayload
|
||||
if err := statsEnv.UnmarshalPayload(&p); err != nil {
|
||||
@@ -180,6 +206,33 @@ esac
|
||||
t.Fatalf("RunCheck: %v", err)
|
||||
}
|
||||
|
||||
// Assert envelope ordering: job.started → repo.stats → job.finished.
|
||||
// (No log.stream expected here because check exits immediately.)
|
||||
order := envelopeOrder(tx.envs)
|
||||
wantTypes := []api.MessageType{api.MsgJobStarted, api.MsgRepoStats, api.MsgJobFinished}
|
||||
positions := map[api.MessageType]int{}
|
||||
for i, mt := range order {
|
||||
if _, seen := positions[mt]; !seen {
|
||||
positions[mt] = i
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(wantTypes)-1; i++ {
|
||||
a, b := wantTypes[i], wantTypes[i+1]
|
||||
pa, aOK := positions[a]
|
||||
pb, bOK := positions[b]
|
||||
if !aOK {
|
||||
t.Errorf("envelope type %q not found in output %v", a, order)
|
||||
continue
|
||||
}
|
||||
if !bOK {
|
||||
t.Errorf("envelope type %q not found in output %v", b, order)
|
||||
continue
|
||||
}
|
||||
if pa >= pb {
|
||||
t.Errorf("expected %q before %q but positions are %d >= %d (order: %v)", a, b, pa, pb, order)
|
||||
}
|
||||
}
|
||||
|
||||
statsEnv := firstEnvOfType(t, tx.envs, api.MsgRepoStats)
|
||||
var p api.RepoStatsPayload
|
||||
if err := statsEnv.UnmarshalPayload(&p); err != nil {
|
||||
@@ -210,6 +263,32 @@ esac
|
||||
t.Fatalf("RunUnlock: %v", err)
|
||||
}
|
||||
|
||||
// Assert envelope ordering: job.started → log.stream → repo.stats → job.finished.
|
||||
order := envelopeOrder(tx.envs)
|
||||
wantTypes := []api.MessageType{api.MsgJobStarted, api.MsgLogStream, api.MsgRepoStats, api.MsgJobFinished}
|
||||
positions := map[api.MessageType]int{}
|
||||
for i, mt := range order {
|
||||
if _, seen := positions[mt]; !seen {
|
||||
positions[mt] = i
|
||||
}
|
||||
}
|
||||
for i := 0; i < len(wantTypes)-1; i++ {
|
||||
a, b := wantTypes[i], wantTypes[i+1]
|
||||
pa, aOK := positions[a]
|
||||
pb, bOK := positions[b]
|
||||
if !aOK {
|
||||
t.Errorf("envelope type %q not found in output %v", a, order)
|
||||
continue
|
||||
}
|
||||
if !bOK {
|
||||
t.Errorf("envelope type %q not found in output %v", b, order)
|
||||
continue
|
||||
}
|
||||
if pa >= pb {
|
||||
t.Errorf("expected %q before %q but positions are %d >= %d (order: %v)", a, b, pa, pb, order)
|
||||
}
|
||||
}
|
||||
|
||||
statsEnv := firstEnvOfType(t, tx.envs, api.MsgRepoStats)
|
||||
var p api.RepoStatsPayload
|
||||
if err := statsEnv.UnmarshalPayload(&p); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user