-
Notifications
You must be signed in to change notification settings - Fork 109
feat: Implement MainEventBus architecture for event queue processing #611
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Introduces centralized event processing with single background thread to guarantee event persistence before client visibility and eliminate race conditions in concurrent task updates. Key Changes: - MainEventBus: Central LinkedBlockingDeque for all events - MainEventBusProcessor: Single background thread ensuring serial processing (TaskStore.save() -> PushNotificationSender.send() -> distributeToChildren()) - Two-level queue cleanup protection via TaskStateProvider.isTaskFinalized() to prevent premature cleanup for fire-and-forget tasks - Deterministic blocking calls: waitForTaskFinalization() ensures TaskStore persistence completes before returning to client - Streaming closure: agentCompleted flag via EnhancedRunnable.DoneCallback for graceful drain when agent completes - SseFormatter utility: Framework-agnostic SSE formatting in server-common - Executor pool improvements: Bounded EventConsumerExecutor pool (size 15) prevents exhaustion during high concurrency - Logging: Separate thread stats category, reduced noise Architecture Impact: - All events flow through MainEventBus before distribution to ChildQueues - Clients never see unpersisted events (persistence-before-distribution) - Fire-and-forget tasks supported: queues stay open for non-final states - Late resubscription enabled: queues persist until task finalization - Test synchronization: MainEventBusProcessorCallback for deterministic testing
Summary of ChangesHello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request refactors the core event processing architecture to introduce a robust, centralized event bus. The primary goal is to ensure data consistency and prevent race conditions by enforcing a strict 'persistence-before-distribution' model for all events. This change significantly impacts how events are handled internally, from agent output to client consumption, and includes improvements to thread management and API streaming behavior. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
Javadoc treats <pre> as a block-level element and doesn't allow it to be wrapped in <p> tags. Removed wrapping paragraph tags from code examples in MainEventBusProcessor and ServerCallContext to fix CI build errors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
The pull request introduces a new MainEventBus architecture for centralized event processing, ensuring event persistence before client visibility and eliminating race conditions. This is a significant architectural improvement. The changes refactor event handling across ReplicatedQueueManager, EventQueue, EventConsumer, and DefaultRequestHandler to integrate with the new MainEventBus and MainEventBusProcessor. SSE handling in A2AServerRoutes has also been refactored for better separation of concerns and robustness, including critical buffering controls. Thread pool configuration for agent execution has been improved with bounded queues and core thread timeouts. Many tests have been updated to reflect the asynchronous nature of the new event processing and use more robust synchronization mechanisms like callbacks and latches. Overall, the changes are well-structured and address important architectural and concurrency challenges.
server-common/src/main/java/io/a2a/server/events/EventQueue.java
Outdated
Show resolved
Hide resolved
The kubectl wait command can fail with 'no matching resources found' if executed before the StatefulSet controller has created the pod. This race condition was hitting CI consistently on this branch. Add a retry loop that waits up to 30 seconds for the pod to be created before checking readiness. This matches the pattern already used for Kafka deployment. Fixes intermittent cloud-deployment test failures in CI.
For new tasks, RequestContext.taskId was being set to null instead of using the client-provided taskId from the message. This caused: 1. TaskUpdater to create events with null taskId 2. TaskManager to create tasks with null IDs in TaskStore 3. ResultAggregator to fail with "Could not find a Task/Message for null" The fix changes line 1057 in DefaultRequestHandler.initMessageSend(): - Before: .setTaskId(task == null ? null : task.id()) + After: .setTaskId(task == null ? params.message().taskId() : task.id()) This ensures the client-provided taskId is propagated through the entire request flow for new task creation. Fixes cloud-deployment example test failure.
Fixes intermittent race condition where blocking calls returned before the task reached a final state. The issue: 1. Agent enqueues: startWork() → artifacts → complete() 2. MainEventBusProcessor processes them serially 3. ResultAggregator interrupts on FIRST event (WORKING state) 4. Returns capturedTask with WORKING state 5. DefaultRequestHandler checked if (isFinalEvent(capturedTask)) 6. FALSE → didn't wait for finalization → returned early 7. Test read stale WORKING state from TaskStore The fix: ALWAYS call waitForTaskFinalization() for blocking calls, regardless of which event was captured. waitForTaskFinalization() checks TaskStore first and returns immediately if already finalized, so this is safe and efficient. For fire-and-forget tasks that never reach final state, this will timeout, which is correct behavior for blocking calls. Fixes DefaultRequestHandlerTest.testBlockingCallReturnsCompleteTaskWithArtifacts
The cloud deployment test was using A2A.toUserMessage(text, id) which sets the MESSAGE ID, not the TASK ID. This caused the "start" message to have taskId=null, resulting in the error: "Could not find a Task/Message for null" Changes: 1. Changed sendStartMessage() to use createUserTextMessage(text, contextId, taskId) This properly sets taskId instead of messageId 2. Added TimeoutException catch for consumptionFuture.get() timeout The "process" and "complete" messages already used Message.builder() with .taskId() so they were correct. Fixes cloud-deployment example test.
Introduces centralized event processing with single background thread to guarantee event persistence before client visibility and eliminate race conditions in concurrent task updates.
Key Changes:
Architecture Impact: