-
Notifications
You must be signed in to change notification settings - Fork 14
chore: workflow manager unit testing #766
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
📝 WalkthroughWalkthroughThe pull request integrates job agent registry functionality into the workflow manager, refactoring the job dispatch flow to look up and instantiate jobs with proper configurations. Tests validate the complete workflow lifecycle including step continuation and active job tracking. Workflow views now track per-step jobs and expose active job status determination. Changes
Sequence DiagramsequenceDiagram
actor User
participant Manager
participant Store
participant Registry as JobAgentRegistry
participant Workflow as Workflow State
User->>Manager: CreateWorkflow(templateId, inputs)
Manager->>Store: GetWorkflowTemplate
Store-->>Manager: template with steps
Manager->>Workflow: Create workflow instance
Manager->>Manager: ReconcileWorkflow
Manager->>Workflow: GetNextStep
Workflow-->>Manager: step 1
Manager->>Store: GetJobAgent(agentId)
Store-->>Manager: jobAgent config
Manager->>Manager: mergeJobAgentConfig
Manager->>Registry: dispatchStep(step, job)
Registry-->>Manager: job created & dispatched
Workflow->>Workflow: Update with active job
Note over Workflow: Step in progress...
User->>Manager: ReconcileWorkflow
Manager->>Workflow: HasActiveJobs()
Workflow-->>Manager: true
Manager->>Manager: Wait - step still active
Note over Workflow: Step completes...
User->>Manager: ReconcileWorkflow
Manager->>Workflow: HasActiveJobs()
Workflow-->>Manager: false
Manager->>Workflow: GetNextStep
Workflow-->>Manager: step 2
Manager->>Registry: dispatchStep(step 2, job)
Registry-->>Manager: job created & dispatched
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Tip 🧪 Unit Test Generation v2 is now available!We have significantly improved our unit test generation capabilities. To enable: Add this to your reviews:
finishing_touches:
unit_tests:
enabled: trueTry it out by using the Have feedback? Share your thoughts on our Discord thread! Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
apps/workspace-engine/pkg/workspace/workflowmanager/manager.go (1)
27-55: Propagate reconciliation errors to the caller.
CreateWorkflowignoresReconcileWorkflowfailures, which can hide dispatch/config problems and return a “successful” workflow that never started. At minimum, surface the error so callers can react.🛠️ Suggested change
- w.ReconcileWorkflow(ctx, workflow) - return workflow, nil + if err := w.ReconcileWorkflow(ctx, workflow); err != nil { + return workflow, err + } + return workflow, nil
🤖 Fix all issues with AI agents
In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager.go`:
- Around line 59-90: The dispatchStep function persists a Job as Status=Pending
before calling m.jobAgentRegistry.Dispatch, which can leave a stuck pending job
if Dispatch fails; change the flow to avoid or clean up that state: either (A)
perform m.jobAgentRegistry.Dispatch(ctx, job) before calling
m.store.Jobs.Upsert(ctx, job) so the job is only persisted after a successful
dispatch, or (B) if pre-persist is required, ensure you handle errors from
jobAgentRegistry.Dispatch by updating the persisted job to a terminal/failed
status or deleting it (use m.store.Jobs.Upsert or a delete/update helper) so
HasActiveJobs() won’t block reconciliation. Update the dispatchStep
implementation accordingly (refer to dispatchStep, m.store.Jobs.Upsert, and
m.jobAgentRegistry.Dispatch).
🧹 Nitpick comments (1)
apps/workspace-engine/pkg/workspace/store/workflow_step.go (1)
51-53: Make ordering deterministic when Index values collide.Sorting only by
Indexleaves equal-index ordering nondeterministic (map iteration order), which can cause flaky step sequencing if duplicates ever occur. Consider a stable sort with a secondary key (e.g.,Id) or enforce unique indices.🔧 Suggested tweak for deterministic ordering
- sort.Slice(steps, func(i, j int) bool { - return steps[i].Index < steps[j].Index - }) + sort.SliceStable(steps, func(i, j int) bool { + if steps[i].Index == steps[j].Index { + return steps[i].Id < steps[j].Id + } + return steps[i].Index < steps[j].Index + })
| func (m *Manager) dispatchStep(ctx context.Context, step *oapi.WorkflowStep) error { | ||
| jobAgent, ok := m.store.JobAgents.Get(step.JobAgent.Id) | ||
| if !ok { | ||
| return fmt.Errorf("job agent %s not found", step.JobAgent.Id) | ||
| } | ||
|
|
||
| mergedConfig, err := mergeJobAgentConfig( | ||
| jobAgent.Config, | ||
| step.JobAgent.Config, | ||
| ) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to merge job agent config: %w", err) | ||
| } | ||
|
|
||
| job := &oapi.Job{ | ||
| Id: uuid.New().String(), | ||
| WorkflowStepId: step.Id, | ||
| JobAgentId: step.JobAgent.Id, | ||
| JobAgentConfig: mergedConfig, | ||
| CreatedAt: time.Now(), | ||
| UpdatedAt: time.Now(), | ||
| Metadata: make(map[string]string), | ||
| Status: oapi.JobStatusPending, | ||
| } | ||
|
|
||
| m.store.Jobs.Upsert(ctx, job) | ||
| if err := m.jobAgentRegistry.Dispatch(ctx, job); err != nil { | ||
| return fmt.Errorf("failed to dispatch job: %w", err) | ||
| } | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid leaving a pending job when dispatch fails.
The job is persisted as pending before dispatch. If Dispatch fails, HasActiveJobs() will block further reconciliation forever, even though nothing was actually dispatched. Consider dispatching first or rolling back/marking terminal on error.
🧭 One possible approach (if Dispatch doesn’t require pre-persisted jobs)
- m.store.Jobs.Upsert(ctx, job)
- if err := m.jobAgentRegistry.Dispatch(ctx, job); err != nil {
- return fmt.Errorf("failed to dispatch job: %w", err)
- }
+ if err := m.jobAgentRegistry.Dispatch(ctx, job); err != nil {
+ return fmt.Errorf("failed to dispatch job: %w", err)
+ }
+ m.store.Jobs.Upsert(ctx, job)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (m *Manager) dispatchStep(ctx context.Context, step *oapi.WorkflowStep) error { | |
| jobAgent, ok := m.store.JobAgents.Get(step.JobAgent.Id) | |
| if !ok { | |
| return fmt.Errorf("job agent %s not found", step.JobAgent.Id) | |
| } | |
| mergedConfig, err := mergeJobAgentConfig( | |
| jobAgent.Config, | |
| step.JobAgent.Config, | |
| ) | |
| if err != nil { | |
| return fmt.Errorf("failed to merge job agent config: %w", err) | |
| } | |
| job := &oapi.Job{ | |
| Id: uuid.New().String(), | |
| WorkflowStepId: step.Id, | |
| JobAgentId: step.JobAgent.Id, | |
| JobAgentConfig: mergedConfig, | |
| CreatedAt: time.Now(), | |
| UpdatedAt: time.Now(), | |
| Metadata: make(map[string]string), | |
| Status: oapi.JobStatusPending, | |
| } | |
| m.store.Jobs.Upsert(ctx, job) | |
| if err := m.jobAgentRegistry.Dispatch(ctx, job); err != nil { | |
| return fmt.Errorf("failed to dispatch job: %w", err) | |
| } | |
| return nil | |
| } | |
| func (m *Manager) dispatchStep(ctx context.Context, step *oapi.WorkflowStep) error { | |
| jobAgent, ok := m.store.JobAgents.Get(step.JobAgent.Id) | |
| if !ok { | |
| return fmt.Errorf("job agent %s not found", step.JobAgent.Id) | |
| } | |
| mergedConfig, err := mergeJobAgentConfig( | |
| jobAgent.Config, | |
| step.JobAgent.Config, | |
| ) | |
| if err != nil { | |
| return fmt.Errorf("failed to merge job agent config: %w", err) | |
| } | |
| job := &oapi.Job{ | |
| Id: uuid.New().String(), | |
| WorkflowStepId: step.Id, | |
| JobAgentId: step.JobAgent.Id, | |
| JobAgentConfig: mergedConfig, | |
| CreatedAt: time.Now(), | |
| UpdatedAt: time.Now(), | |
| Metadata: make(map[string]string), | |
| Status: oapi.JobStatusPending, | |
| } | |
| if err := m.jobAgentRegistry.Dispatch(ctx, job); err != nil { | |
| return fmt.Errorf("failed to dispatch job: %w", err) | |
| } | |
| m.store.Jobs.Upsert(ctx, job) | |
| return nil | |
| } |
🤖 Prompt for AI Agents
In `@apps/workspace-engine/pkg/workspace/workflowmanager/manager.go` around lines
59 - 90, The dispatchStep function persists a Job as Status=Pending before
calling m.jobAgentRegistry.Dispatch, which can leave a stuck pending job if
Dispatch fails; change the flow to avoid or clean up that state: either (A)
perform m.jobAgentRegistry.Dispatch(ctx, job) before calling
m.store.Jobs.Upsert(ctx, job) so the job is only persisted after a successful
dispatch, or (B) if pre-persist is required, ensure you handle errors from
jobAgentRegistry.Dispatch by updating the persisted job to a terminal/failed
status or deleting it (use m.store.Jobs.Upsert or a delete/update helper) so
HasActiveJobs() won’t block reconciliation. Update the dispatchStep
implementation accordingly (refer to dispatchStep, m.store.Jobs.Upsert, and
m.jobAgentRegistry.Dispatch).
Summary by CodeRabbit
New Features
Bug Fixes
Tests
✏️ Tip: You can customize this high-level summary in your review settings.