Skip to content

Conversation

@kabir
Copy link
Collaborator

@kabir kabir commented Jan 26, 2026

Introduces centralized event processing with single background thread to guarantee event persistence before client visibility and eliminate race conditions in concurrent task updates.

Key Changes:

  • MainEventBus: Central LinkedBlockingDeque for all events
  • MainEventBusProcessor: Single background thread ensuring serial processing (TaskStore.save() -> PushNotificationSender.send() -> distributeToChildren())
  • Two-level queue cleanup protection via TaskStateProvider.isTaskFinalized() to prevent premature cleanup for fire-and-forget tasks
  • Deterministic blocking calls: waitForTaskFinalization() ensures TaskStore persistence completes before returning to client
  • Streaming closure: agentCompleted flag via EnhancedRunnable.DoneCallback for graceful drain when agent completes
  • SseFormatter utility: Framework-agnostic SSE formatting in server-common
  • Executor pool improvements: Bounded EventConsumerExecutor pool (size 15) prevents exhaustion during high concurrency
  • Logging: Separate thread stats category, reduced noise

Architecture Impact:

  • All events flow through MainEventBus before distribution to ChildQueues
  • Clients never see unpersisted events (persistence-before-distribution)
  • Fire-and-forget tasks supported: queues stay open for non-final states
  • Late resubscription enabled: queues persist until task finalization
  • Test synchronization: MainEventBusProcessorCallback for deterministic testing

Introduces centralized event processing with single background thread to
guarantee event persistence before client visibility and eliminate race
conditions in concurrent task updates.

Key Changes:
- MainEventBus: Central LinkedBlockingDeque for all events
- MainEventBusProcessor: Single background thread ensuring serial processing
  (TaskStore.save() -> PushNotificationSender.send() -> distributeToChildren())
- Two-level queue cleanup protection via TaskStateProvider.isTaskFinalized()
  to prevent premature cleanup for fire-and-forget tasks
- Deterministic blocking calls: waitForTaskFinalization() ensures TaskStore
  persistence completes before returning to client
- Streaming closure: agentCompleted flag via EnhancedRunnable.DoneCallback
  for graceful drain when agent completes
- SseFormatter utility: Framework-agnostic SSE formatting in server-common
- Executor pool improvements: Bounded EventConsumerExecutor pool (size 15)
  prevents exhaustion during high concurrency
- Logging: Separate thread stats category, reduced noise

Architecture Impact:
- All events flow through MainEventBus before distribution to ChildQueues
- Clients never see unpersisted events (persistence-before-distribution)
- Fire-and-forget tasks supported: queues stay open for non-final states
- Late resubscription enabled: queues persist until task finalization
- Test synchronization: MainEventBusProcessorCallback for deterministic testing
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @kabir, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request refactors the core event processing architecture to introduce a robust, centralized event bus. The primary goal is to ensure data consistency and prevent race conditions by enforcing a strict 'persistence-before-distribution' model for all events. This change significantly impacts how events are handled internally, from agent output to client consumption, and includes improvements to thread management and API streaming behavior.

Highlights

  • Centralized Event Processing: Introduced MainEventBus and MainEventBusProcessor to centralize event handling. All events now flow through this bus, ensuring persistence in the TaskStore before distribution to client-facing queues.
  • Guaranteed Persistence Before Client Visibility: The MainEventBusProcessor ensures that events are saved to the TaskStore and push notifications are sent before they are distributed to ChildQueues. This eliminates race conditions and guarantees clients only see persisted events.
  • Improved Concurrency and Thread Management: A dedicated EventConsumerExecutor (a cached thread pool) is now used for EventConsumer polling, separating I/O-bound polling from CPU-bound agent execution. The main AsyncExecutor now uses a bounded queue and allows core threads to time out, preventing thread pool exhaustion and improving resource utilization.
  • Deterministic Blocking Calls: Blocking API calls (onMessageSend) now explicitly wait for task finalization to be persisted in the TaskStore via a CountDownLatch and MainEventBusProcessorCallback, ensuring the returned task state is fully up-to-date.
  • Graceful Streaming Closure: Streaming endpoints now use an agentCompleted flag in EventConsumer to allow queues to drain gracefully after an agent finishes, preventing premature closure. Client disconnects are handled by invoking a ServerCallContext callback to immediately stop EventConsumer polling.
  • Framework-Agnostic SSE Formatting: A new SseFormatter utility in server-common provides a reusable way to format A2A responses as Server-Sent Events (SSE), simplifying integration across different HTTP server frameworks (e.g., Quarkus/Vert.x, Jakarta/WildFly).
  • Enhanced Testability: New MainEventBusProcessorCallback and waitForEventProcessing helpers provide deterministic synchronization for tests involving asynchronous event processing, replacing unreliable polling loops.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Javadoc treats <pre> as a block-level element and doesn't allow it to be
wrapped in <p> tags. Removed wrapping paragraph tags from code examples
in MainEventBusProcessor and ServerCallContext to fix CI build errors.
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request introduces a new MainEventBus architecture for centralized event processing, ensuring event persistence before client visibility and eliminating race conditions. This is a significant architectural improvement. The changes refactor event handling across ReplicatedQueueManager, EventQueue, EventConsumer, and DefaultRequestHandler to integrate with the new MainEventBus and MainEventBusProcessor. SSE handling in A2AServerRoutes has also been refactored for better separation of concerns and robustness, including critical buffering controls. Thread pool configuration for agent execution has been improved with bounded queues and core thread timeouts. Many tests have been updated to reflect the asynchronous nature of the new event processing and use more robust synchronization mechanisms like callbacks and latches. Overall, the changes are well-structured and address important architectural and concurrency challenges.

kabir added 5 commits January 26, 2026 16:09
The kubectl wait command can fail with 'no matching resources found' if executed
before the StatefulSet controller has created the pod. This race condition was
hitting CI consistently on this branch.

Add a retry loop that waits up to 30 seconds for the pod to be created before
checking readiness. This matches the pattern already used for Kafka deployment.

Fixes intermittent cloud-deployment test failures in CI.
For new tasks, RequestContext.taskId was being set to null instead of
using the client-provided taskId from the message. This caused:

1. TaskUpdater to create events with null taskId
2. TaskManager to create tasks with null IDs in TaskStore
3. ResultAggregator to fail with "Could not find a Task/Message for null"

The fix changes line 1057 in DefaultRequestHandler.initMessageSend():
- Before: .setTaskId(task == null ? null : task.id())
+ After:  .setTaskId(task == null ? params.message().taskId() : task.id())

This ensures the client-provided taskId is propagated through the entire
request flow for new task creation.

Fixes cloud-deployment example test failure.
Fixes intermittent race condition where blocking calls returned before
the task reached a final state. The issue:

1. Agent enqueues: startWork() → artifacts → complete()
2. MainEventBusProcessor processes them serially
3. ResultAggregator interrupts on FIRST event (WORKING state)
4. Returns capturedTask with WORKING state
5. DefaultRequestHandler checked if (isFinalEvent(capturedTask))
6. FALSE → didn't wait for finalization → returned early
7. Test read stale WORKING state from TaskStore

The fix: ALWAYS call waitForTaskFinalization() for blocking calls,
regardless of which event was captured. waitForTaskFinalization()
checks TaskStore first and returns immediately if already finalized,
so this is safe and efficient.

For fire-and-forget tasks that never reach final state, this will
timeout, which is correct behavior for blocking calls.

Fixes DefaultRequestHandlerTest.testBlockingCallReturnsCompleteTaskWithArtifacts
The cloud deployment test was using A2A.toUserMessage(text, id) which
sets the MESSAGE ID, not the TASK ID. This caused the "start" message
to have taskId=null, resulting in the error:
  "Could not find a Task/Message for null"

Changes:
1. Changed sendStartMessage() to use createUserTextMessage(text, contextId, taskId)
   This properly sets taskId instead of messageId
2. Added TimeoutException catch for consumptionFuture.get() timeout

The "process" and "complete" messages already used Message.builder()
with .taskId() so they were correct.

Fixes cloud-deployment example test.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant