⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions src/google/adk/agents/invocation_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

from __future__ import annotations

import asyncio
from typing import Any
from typing import Awaitable
from typing import Callable
from typing import Optional
import uuid

Expand Down Expand Up @@ -213,6 +216,19 @@ class InvocationContext(BaseModel):
of this invocation.
"""

_tool_call_cache_lock: asyncio.Lock = PrivateAttr(
default_factory=asyncio.Lock
)
_tool_call_cache: dict[tuple[Any, ...], asyncio.Task] = PrivateAttr(
default_factory=dict
)
"""Caches tool call results within a single invocation.

This is used to prevent redundant tool execution when the model repeats the
same function call (same tool name + same args) multiple times during a single
invocation.
"""

@property
def is_resumable(self) -> bool:
"""Returns whether the current invocation is resumable."""
Expand All @@ -221,6 +237,76 @@ def is_resumable(self) -> bool:
and self.resumability_config.is_resumable
)

@staticmethod
def _canonicalize_tool_args(value: Any) -> Any:
"""Converts a JSON-like structure into a stable, hashable representation."""
if isinstance(value, dict):
return tuple(
(k, InvocationContext._canonicalize_tool_args(v))
for k, v in sorted(value.items())
)
if isinstance(value, list):
return tuple(InvocationContext._canonicalize_tool_args(v) for v in value)
if isinstance(value, (str, int, float, bool)) or value is None:
return value
# Fallback: keep it hashable and stable.
return repr(value)

def _tool_call_cache_key(
self, *, tool_name: str, tool_args: dict[str, Any]
) -> tuple[Any, ...]:
"""Builds a cache key for a tool call within this invocation."""
return (
self.branch,
tool_name,
InvocationContext._canonicalize_tool_args(tool_args),
)

async def get_or_execute_deduped_tool_call(
self,
*,
tool_name: str,
tool_args: dict[str, Any],
execute: Callable[[], Awaitable[Any]],
dedupe: bool = False,
) -> tuple[Any, bool]:
"""Returns cached tool result for identical calls, otherwise executes once.

Args:
tool_name: Tool name.
tool_args: Tool arguments from the model.
execute: A coroutine factory that executes the tool and returns its
response.

Returns:
A tuple of (tool_result, cache_hit).
"""
if not dedupe:
return await execute(), False

key = self._tool_call_cache_key(tool_name=tool_name, tool_args=tool_args)

async with self._tool_call_cache_lock:
task = self._tool_call_cache.get(key)
if task is None:
task = asyncio.create_task(execute())
self._tool_call_cache[key] = task
cache_hit = False
else:
cache_hit = True

try:
result = await task
except Exception:
# If the execution failed, remove from cache so subsequent calls can
# retry instead of returning a cached exception forever.
async with self._tool_call_cache_lock:
if self._tool_call_cache.get(key) is task:
self._tool_call_cache.pop(key, None)
raise

return result, cache_hit

def set_agent_state(
self,
agent_name: str,
Expand Down
13 changes: 13 additions & 0 deletions src/google/adk/agents/run_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,19 @@ class RunConfig(BaseModel):
- Less than or equal to 0: This allows for unbounded number of llm calls.
"""

dedupe_tool_calls: bool = False
"""
Whether to deduplicate identical tool calls (same tool name + same arguments)
within a single invocation.

This helps prevent redundant tool execution when the model repeats the same
function call multiple times (for example, when a tool is slow or the model
does not follow the instruction to call a tool only once).

Note: Only the tool result is reused; tool side effects (state/artifact
deltas) are only applied once from the first execution.
"""

custom_metadata: Optional[dict[str, Any]] = None
"""Custom metadata for the current invocation."""

Expand Down
60 changes: 60 additions & 0 deletions src/google/adk/cli/adk_web_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1648,6 +1648,66 @@ async def run_agent_live(
await websocket.close(code=1002, reason="Session not found")
return

# Determine if this is a live/audio session
# For live sessions, Gemini Live API provides transparent session resumption
# where the model automatically replays its last response. Replaying events
# manually would cause duplicates (Issue #3395).
# For text-only sessions, we need to replay all events (Issue #3573).
def is_live_session(events: list) -> bool:
"""Check if session contains audio/video or transcription data."""
# Check last few events for live session indicators
for event in reversed(events[-5:] if len(events) > 5 else events):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The conditional slicing events[-5:] if len(events) > 5 else events is a bit redundant. Python's list slicing handles this gracefully. You can simplify this to just events[-5:], which works correctly even for lists with fewer than 5 elements.

Suggested change
for event in reversed(events[-5:] if len(events) > 5 else events):
for event in reversed(events[-5:]):

# Check for transcription data (input/output)
if hasattr(event, 'input_transcription') and event.input_transcription:
return True
if hasattr(event, 'output_transcription') and event.output_transcription:
return True
# Check content for audio/video
if event.content:
for part in event.content.parts:
if part.inline_data and (
part.inline_data.mime_type.startswith("audio/")
or part.inline_data.mime_type.startswith("video/")
):
return True
if part.file_data and (
part.file_data.mime_type.startswith("audio/")
or part.file_data.mime_type.startswith("video/")
):
return True
return False

# Replay existing session events for text-only sessions
# Skip replay for live/audio sessions to avoid conflicts with
# Gemini Live API's built-in session resumption
should_replay = session.events and not is_live_session(session.events)

if should_replay:
logger.info(
"Replaying %d existing events for text-only session %s",
len(session.events),
session_id,
)
for event in session.events:
try:
await websocket.send_text(
event.model_dump_json(exclude_none=True, by_alias=True)
)
except Exception as e:
logger.error(
"Failed to replay event %s during session restoration: %s",
event.id,
e,
)
# Continue replaying other events even if one fails
continue
elif session.events and not should_replay:
logger.info(
"Skipping event replay for live/audio session %s (relying on "
"Gemini Live API's transparent session resumption)",
session_id,
)

live_request_queue = LiveRequestQueue()

async def forward_events():
Expand Down
Loading