⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content

Conversation

@mkmeral
Copy link
Contributor

@mkmeral mkmeral commented Jan 9, 2026

Description

Picking up from @awsarron's work in #1174 to address review feedback.

A2AAgent makes it simple to consume remote A2A agents and invoke them like any other Strands Agent. This PR addresses the open review comments from the original implementation.

Changes from original PR:

  • Changed logger.info to logger.debug for agent card discovery and message sending
  • Simplified factory creation with _create_default_factory() helper method
  • Fixed invoke_async to delegate to stream_async (ensures consistent behavior)
  • Added comprehensive docstring for A2AStreamEvent explaining when events are emitted
  • Expanded integration tests to cover sync, async, streaming, and custom client config scenarios
  • Added unit tests for __del__ cleanup behavior

Example usage (unchanged from original):

from strands.agent.a2a_agent import A2AAgent

a2a_agent = A2AAgent(endpoint="http://localhost:9000")
result = a2a_agent("Show me 10 ^ 6")
# AgentResult(stop_reason='end_turn', message={'role': 'assistant', 'content': [{'text': '10^6 = 1,000,000'}]}, ...)

Follow-ups:

Related Issues

#907

Resolves review comments from #1174

Documentation PR

TODO

Type of Change

New feature

Testing

  • I ran hatch run prepare

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • [TODO] I have updated the documentation accordingly
  • [TODO] I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.


diff of a2a agent compared to original PR

diff --git a/src/strands/agent/a2a_agent.py b/src/strands/agent/a2a_agent.py
index 1f2e015..7c30269 100644
--- a/src/strands/agent/a2a_agent.py
+++ b/src/strands/agent/a2a_agent.py
@@ -88,9 +88,19 @@ class A2AAgent:
         if not self.description and self._agent_card.description:
             self.description = self._agent_card.description
 
-        logger.info("agent=<%s>, endpoint=<%s> | discovered agent card", self.name, self.endpoint)
+        logger.debug("agent=<%s>, endpoint=<%s> | discovered agent card", self.name, self.endpoint)
         return self._agent_card
 
+    def _create_default_factory(self) -> ClientFactory:
+        """Create default A2A client factory with non-streaming config.
+
+        Returns:
+            Configured ClientFactory instance.
+        """
+        httpx_client = self._get_httpx_client()
+        config = ClientConfig(httpx_client=httpx_client, streaming=False)
+        return ClientFactory(config)
+
     async def _get_a2a_client(self) -> Client:
         """Get or create the A2A client for this agent.
 
@@ -99,16 +109,7 @@ class A2AAgent:
         """
         if self._a2a_client is None:
             agent_card = await self._get_agent_card()
-
-            if self._a2a_client_factory is not None:
-                # Use provided factory
-                factory = self._a2a_client_factory
-            else:
-                # Create default factory
-                httpx_client = self._get_httpx_client()
-                config = ClientConfig(httpx_client=httpx_client, streaming=False)
-                factory = ClientFactory(config)
-
+            factory = self._a2a_client_factory or self._create_default_factory()
             self._a2a_client = factory.create(agent_card)
         return self._a2a_client
 
@@ -130,7 +131,7 @@ class A2AAgent:
         client = await self._get_a2a_client()
         message = convert_input_to_message(prompt)
 
-        logger.info("agent=<%s>, endpoint=<%s> | sending message", self.name, self.endpoint)
+        logger.debug("agent=<%s>, endpoint=<%s> | sending message", self.name, self.endpoint)
         return client.send_message(message)
 
     def _is_complete_event(self, event: A2AResponse) -> bool:
@@ -174,6 +175,8 @@ class A2AAgent:
     ) -> AgentResult:
         """Asynchronously invoke the remote A2A agent.
 
+        Delegates to stream_async and returns the final result.
+
         Args:
             prompt: Input to the agent (string, message list, or content blocks).
             **kwargs: Additional arguments (ignored).
@@ -185,10 +188,15 @@ class A2AAgent:
             ValueError: If prompt is None.
             RuntimeError: If no response received from agent.
         """
-        async for event in await self._send_message(prompt):
-            return convert_response_to_agent_result(event)
+        result: AgentResult | None = None
+        async for event in self.stream_async(prompt, **kwargs):
+            if "result" in event:
+                result = event["result"]
+
+        if result is None:
+            raise RuntimeError("No response received from A2A agent")
 
-        raise RuntimeError("No response received from A2A agent")
+        return result
 
     def __call__(
         self,
@@ -244,7 +252,7 @@ class A2AAgent:
             yield AgentResultEvent(result)
 
     def __del__(self) -> None:
-        """Clean up resources when agent is garbage collected."""
+        """Best-effort cleanup on garbage collection."""
         if self._owns_client and self._httpx_client is not None:
             try:
                 client = self._httpx_client


REV2

  • Moved to streaming=True by default with the client. Otherwise streaming would require both custom client and using stream async. Right now both stream async and invoke async would work normally.

awsarron and others added 12 commits November 18, 2025 14:09
- Fix invoke_async to delegate to stream_async (prevents returning first incomplete event)
- Add async context manager support (__aenter__/__aexit__) and explicit aclose() method
- Improve __del__ cleanup to handle event loop edge cases
- Change logger.info to logger.debug for consistency with project standards
- Simplify factory creation with _create_default_factory() helper method
- Add comprehensive documentation to A2AStreamEvent
- Improve test fixture pattern with pytest fixture for subprocess management
- Add comprehensive e2e tests for invoke_async, stream_async, and context manager

Addresses PR strands-agents#1174 review comments:
- Comment strands-agents#2: Critical bug - invoke_async now waits for complete events
- Comment strands-agents#5: Code duplication - invoke_async delegates to stream_async
- Comment strands-agents#6: Async cleanup - proper async context manager pattern
- Comment strands-agents#3: Logging level - changed to debug
- Comment strands-agents#4: Factory simplification - extracted helper method
- Comment strands-agents#12: Documentation - documented A2AStreamEvent behavior
- Comment strands-agents#9: Test fixture - using pytest fixture pattern
- Comment strands-agents#10: Test coverage - added comprehensive e2e tests
@codecov
Copy link

codecov bot commented Jan 9, 2026

Codecov Report

❌ Patch coverage is 91.22807% with 15 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/strands/multiagent/a2a/converters.py 85.96% 0 Missing and 8 partials ⚠️
src/strands/agent/__init__.py 33.33% 4 Missing ⚠️
src/strands/agent/a2a_agent.py 97.02% 0 Missing and 3 partials ⚠️

📢 Thoughts on this report? Let us know!

@dbschmigelski dbschmigelski self-requested a review January 9, 2026 20:14
@cagataycali
Copy link
Member

Code Review: A2AAgent Implementation

Thanks for addressing the feedback from the original PR! This is a solid implementation that follows SDK patterns well. I've done a thorough review and have some observations and suggestions below.


✅ Strengths

1. Excellent Test Coverage

The PR includes comprehensive testing across multiple levels:

  • Unit tests (test_a2a_agent.py): 268 lines covering initialization, agent card discovery, streaming, error handling, and cleanup
  • Converter tests (test_converters.py): Testing type conversions between Strands and A2A formats
  • Integration tests (test_multiagent_a2a.py): End-to-end testing with a real A2A server for sync/async/streaming scenarios

This is exemplary test coverage for a new feature.

2. Clean API Design

The class adheres to the AgentBase protocol perfectly:

  • invoke_async() - async invocation
  • __call__() - sync wrapper
  • stream_async() - streaming support

The API is intuitive and consistent with existing SDK agents.

3. Good Documentation

  • Clear docstrings following Google style
  • The A2AStreamEvent documentation explaining event emission behavior is particularly helpful
  • Good inline comments explaining complex logic (e.g., _is_complete_event)

4. Proper Resource Management

  • Handles httpx client lifecycle correctly
  • Distinguishes between owned vs. external clients
  • Best-effort cleanup in __del__

5. Code Quality Improvements

The changes from the original PR are all positive:

  • logger.debug instead of logger.info
  • _create_default_factory() helper reduces duplication ✓
  • invoke_async now properly delegates to stream_async
  • Comprehensive event documentation ✓

🔍 Issues & Suggestions

Critical: Missing Public API Export

A2AAgent is not exported in src/strands/agent/__init__.py, making it inaccessible via the standard import path.

Current state:

# This DOESN'T work:
from strands.agent import A2AAgent  # ImportError!

# Users must do:
from strands.agent.a2a_agent import A2AAgent  # Awkward

Fix needed in src/strands/agent/__init__.py:

from .a2a_agent import A2AAgent

__all__ = [
    "Agent",
    "AgentBase",
    "AgentResult",
    "A2AAgent",  # Add this
    # ... rest
]

Optionally, also export from top-level src/strands/__init__.py if A2AAgent is considered a core feature.

Medium: Inconsistent Streaming Default

The PR description mentions:

REV2: Moved to streaming=True by default with the client.

This is implemented in _create_default_factory(), but the reasoning isn't clear from the code:

def _create_default_factory(self) -> ClientFactory:
    """Create default A2A client factory with streaming config."""
    config = ClientConfig(httpx_client=httpx_client, streaming=True)
    return ClientFactory(config)

Questions:

  1. Does the A2A protocol require streaming=True for both invoke_async and stream_async to work?
  2. What's the behavior difference between streaming=True and streaming=False?

Suggestion: Add a comment explaining why streaming=True is required by default, or update the docstring to clarify. For example:

def _create_default_factory(self) -> ClientFactory:
    """Create default A2A client factory with streaming enabled.
    
    Streaming must be enabled for both invoke_async and stream_async
    to receive incremental updates from the remote agent.
    """

Minor: Type Annotation Inconsistency

In invoke_async, the result variable uses union syntax:

result: AgentResult | None = None

While in other places, the code uses Optional. For consistency across the SDK, verify which style is preferred and stick to it.

Minor: Potential Race Condition in __del__

The cleanup logic is reasonable but has a subtle issue:

def __del__(self) -> None:
    """Best-effort cleanup on garbage collection."""
    if self._owns_client and self._httpx_client is not None:
        try:
            client = self._httpx_client
            run_async(lambda: client.aclose())
        except Exception:
            pass

Issue: If an exception occurs in run_async (e.g., event loop closed), the exception is silently swallowed. While this is acceptable for __del__, it might mask cleanup issues during testing.

Suggestion: Consider logging the exception at debug level:

try:
    client = self._httpx_client
    run_async(lambda: client.aclose())
except Exception as e:
    logger.debug("endpoint=<%s> | failed to close httpx client: %s", self.endpoint, e)

This helps with debugging while maintaining best-effort semantics.

Minor: Integration Test Fixture Cleanup

The a2a_server fixture has a 5-second hardcoded sleep:

time.sleep(5)  # Wait for A2A server to start

Suggestion: Consider polling the health endpoint instead of sleeping:

import time
import httpx

max_wait = 10
start_time = time.time()
while time.time() - start_time < max_wait:
    try:
        response = httpx.get(f"{base_url}/.well-known/agent.json", timeout=1.0)
        if response.status_code == 200:
            break
    except httpx.RequestError:
        pass
    time.sleep(0.5)
else:
    raise RuntimeError("A2A server failed to start")

This makes tests more reliable and faster.

Documentation: Missing Example in PR Body

The PR body shows usage example, but it would be helpful to show:

  1. Custom client configuration example
  2. Streaming example
  3. Error handling example

These would help reviewers understand the full API surface.


📋 Compatibility Check

Follows SDK Patterns:

  • Implements AgentBase protocol correctly
  • Uses run_async for sync wrappers
  • Follows structured logging format
  • Type annotations throughout
  • Google-style docstrings

Error Handling:

  • Validates prompt is not None
  • Raises appropriate exceptions (ValueError, RuntimeError)
  • Handles cleanup exceptions gracefully

Multi-Agent Integration:

  • Compatible with existing multi-agent patterns
  • Converter functions properly translate between Strands and A2A types
  • Integrates with A2AServer for bidirectional communication

🎯 Verdict

This is high-quality work with comprehensive testing and good API design. The implementation addresses all the review comments from the original PR effectively.

Recommendation: Approve with minor changes

Required Changes:

  1. Export A2AAgent from strands.agent.__init__.py (blocking - API discoverability)

Suggested Improvements (non-blocking):

  1. Add comment explaining why streaming=True is the default
  2. Consider logging exceptions in __del__ at debug level
  3. Improve integration test server startup with polling instead of sleep
  4. Add more usage examples to PR description

Once the export issue is fixed, this is ready to merge! Great work @mkmeral 🎉

cagataycali

This comment was marked as off-topic.

cagataycali

This comment was marked as off-topic.

@cagataycali

This comment was marked as off-topic.

cagataycali pushed a commit to cagataycali/sdk-python that referenced this pull request Jan 12, 2026
Replace hardcoded time.sleep(5) with proper health check polling that:
- Polls the /.well-known/agent.json endpoint until server responds
- Uses 30 second timeout (vs fixed 5s sleep)
- Provides clear error message if server fails to start
- Runs faster when server starts quickly

This fixes flaky integration tests in CI environments where 5 seconds
may not be sufficient for the A2A server to fully initialize.

Related to: strands-agents#1441
@cagataycali
Copy link
Member

Hi @mkmeral! 👋

I noticed the CI failures here might be related to the flaky time.sleep(5) in the A2A integration test fixture. I've created PR #1457 to fix this by replacing the hardcoded sleep with proper health check polling.

The fix:

  • Polls /.well-known/agent.json endpoint until server responds
  • 30 second timeout with 0.5s polling interval
  • Clear error message if server fails to start
  • Faster tests when server starts quickly

PR #1457 has all tests passing (14/15 - only check-access-and-checkout fails which is expected for fork PRs).

Once #1457 is merged, it should help stabilize the CI for this PR. 🦆

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants