diff --git a/xdk-gen/src/python/generator.rs b/xdk-gen/src/python/generator.rs index 457a08ce..4d73f5ba 100644 --- a/xdk-gen/src/python/generator.rs +++ b/xdk-gen/src/python/generator.rs @@ -47,6 +47,7 @@ language! { render "main_client" => "xdk/client.py", render "oauth2_auth" => "xdk/oauth2_auth.py", render "paginator" => "xdk/paginator.py", + render "streaming" => "xdk/streaming.py", render "init_py" => "xdk/__init__.py", render "pyproject_toml" => "pyproject.toml", render "sphinx_conf" => "conf.py", diff --git a/xdk-gen/templates/python/client_class.j2 b/xdk-gen/templates/python/client_class.j2 index 3cd69d55..4aab1533 100644 --- a/xdk-gen/templates/python/client_class.j2 +++ b/xdk-gen/templates/python/client_class.j2 @@ -3,7 +3,8 @@ Auto-generated {{ tag.display_name }} client for the X API. This module provides a client for interacting with the {{ tag.display_name }} endpoints of the X API. -{% if operations | selectattr('is_streaming') | list %}Real-time streaming operations return generators that yield data as it arrives.{% endif %} +{% if operations | selectattr('is_streaming') | list %}Real-time streaming operations return generators that yield data as it arrives. +Streaming connections are automatically managed with exponential backoff retry logic for robust handling.{% endif %} All methods, parameters, and response models are generated from the OpenAPI specification. Generated automatically - do not edit manually. @@ -13,7 +14,9 @@ from __future__ import annotations from typing import Dict, List, Optional, Any, Union, cast, TYPE_CHECKING, Iterator{% if operations | selectattr('is_streaming') | list %}, Generator{% endif %}{% if operations and (operations | selectattr('parameters') | selectattr('original_name', 'equalto', 'pagination_token') | list | length > 0 or operations | selectattr('parameters') | selectattr('original_name', 'equalto', 'next_token') | list | length > 0) %} # Iterator for pagination{% endif %} import requests import time -{% if operations | selectattr('is_streaming') | list %}import json{% endif %} +{% if operations | selectattr('is_streaming') | list %} +from ..streaming import StreamConfig, StreamError, stream_with_retry +{% endif %} if TYPE_CHECKING: from ..client import Client diff --git a/xdk-gen/templates/python/client_macros.j2 b/xdk-gen/templates/python/client_macros.j2 index f410cfe0..d828b60f 100644 --- a/xdk-gen/templates/python/client_macros.j2 +++ b/xdk-gen/templates/python/client_macros.j2 @@ -17,7 +17,7 @@ body: {{ operation.class_name }}Request{% if operation.parameters | rejectattr(' body: Optional[{{ operation.class_name }}Request] = None{% if operation.is_streaming %}, {% endif %} {%- endif -%} {%- if operation.is_streaming -%} -timeout: Optional[float] = None, chunk_size: int = 1024 +stream_config: Optional[StreamConfig] = None {%- endif -%} {%- endmacro %} @@ -44,6 +44,9 @@ timeout: Optional[float] = None, chunk_size: int = 1024 This is a streaming endpoint that yields data in real-time as it becomes available. Each yielded item represents a single data point from the stream. + The connection is automatically managed with exponential backoff retry logic. + If the stream disconnects, the SDK will automatically reconnect without client intervention. + {% endif -%} {% if operation.parameters -%} Args: @@ -67,15 +70,15 @@ timeout: Optional[float] = None, chunk_size: int = 1024 {% endif -%} {% endif -%} {% if operation.is_streaming -%} - timeout: Request timeout in seconds (default: None for no timeout) - chunk_size: Size of chunks to read from the stream (default: 1024 bytes) + stream_config: Optional StreamConfig for customizing retry behavior, timeouts, and callbacks. + Configure max_retries (-1 for infinite), initial_backoff, max_backoff, and lifecycle callbacks + (on_connect, on_disconnect, on_reconnect, on_error) for monitoring connection state. Yields: {% if operation.responses and "200" in operation.responses or "201" in operation.responses %}{{ operation.class_name }}Response{% else %}Dict[str, Any]{% endif %}: Individual streaming data items Raises: - requests.exceptions.RequestException: If the streaming connection fails - json.JSONDecodeError: If the streamed data is not valid JSON + StreamError: If a non-retryable error occurs (auth errors, client errors) or max retries exceeded. {% elif operation.parameters and (operation.parameters | selectattr('original_name', 'equalto', 'pagination_token') | list | length > 0 or operation.parameters | selectattr('original_name', 'equalto', 'next_token') | list | length > 0) -%} Yields: @@ -205,70 +208,22 @@ headers = {% if operation.is_streaming %}{ _ = params # Check if params exists except NameError: params = {} # Initialize if not defined - try: - # Make streaming request - with self.client.session.{{ operation.method | lower }}( - url, - params=params, - headers=headers, - {% if operation.request_body %}json=json_data,{% endif %} - stream=True, - timeout=timeout, - ) as response: - # Check for HTTP errors - response.raise_for_status() - - # Buffer for incomplete lines - buffer = "" - - # Stream data chunk by chunk - for chunk in response.iter_content(chunk_size=chunk_size, decode_unicode=True): - if chunk: - # Ensure chunk is always a string, not bytes - if isinstance(chunk, bytes): - chunk = chunk.decode('utf-8') - buffer += chunk - - # Process complete lines - while '\n' in buffer: - line, buffer = buffer.split('\n', 1) - line = line.strip() - - if line: - try: - # Parse JSON line - data = json.loads(line) - - # Convert to response model if available - {% if operation.responses and "200" in operation.responses %} - yield {{ operation.class_name }}Response.model_validate(data) - {% else %} - yield data - {% endif %} - - except json.JSONDecodeError: - # Skip invalid JSON lines - continue - except Exception: - # Skip lines that cause processing errors - continue - - # Process any remaining data in buffer - if buffer.strip(): - try: - data = json.loads(buffer.strip()) - {% if operation.responses and "200" in operation.responses %} - yield {{ operation.class_name }}Response.model_validate(data) - {% else %} - yield data - {% endif %} - except json.JSONDecodeError: - # Skip invalid JSON in final buffer - pass - except requests.exceptions.RequestException: - raise - except Exception: - raise + + # Use robust streaming with automatic retry and exponential backoff + yield from stream_with_retry( + session=self.client.session, + method="{{ operation.method | lower }}", + url=url, + config=stream_config, + params=params, + headers=headers, + {% if operation.request_body %}json_data=json_data,{% endif %} + {% if operation.responses and "200" in operation.responses %} + response_model={{ operation.class_name }}Response, + {% else %} + response_model=None, + {% endif %} + ) {%- endmacro %} {# Macro for regular request logic #} diff --git a/xdk-gen/templates/python/init_py.j2 b/xdk-gen/templates/python/init_py.j2 index bb3dee8b..67e836a4 100644 --- a/xdk-gen/templates/python/init_py.j2 +++ b/xdk-gen/templates/python/init_py.j2 @@ -9,6 +9,16 @@ Generated automatically - do not edit manually. from .client import Client from .paginator import Cursor, cursor, PaginationError +from .streaming import StreamConfig, StreamError, StreamErrorType, StreamState __version__ = "0.1.0" -__all__ = ["Client", "Cursor", "cursor", "PaginationError"] \ No newline at end of file +__all__ = [ + "Client", + "Cursor", + "cursor", + "PaginationError", + "StreamConfig", + "StreamError", + "StreamErrorType", + "StreamState", +] diff --git a/xdk-gen/templates/python/streaming.j2 b/xdk-gen/templates/python/streaming.j2 new file mode 100644 index 00000000..1d54c866 --- /dev/null +++ b/xdk-gen/templates/python/streaming.j2 @@ -0,0 +1,513 @@ +""" +Robust streaming utilities for the X API SDK. + +This module provides streaming connection handling with automatic reconnection, +exponential backoff, and comprehensive error handling. Clients can consume +streaming endpoints without worrying about connection management - the SDK +handles all recovery automatically. + +Generated automatically - do not edit manually. +""" + +import json +import time +import logging +import random +from dataclasses import dataclass, field +from enum import Enum +from typing import ( + Any, + Callable, + Dict, + Generator, + Iterator, + Optional, + Type, + TypeVar, + Union, +) + +import requests +from requests.exceptions import ( + ChunkedEncodingError, + ConnectionError, + ReadTimeout, + RequestException, + Timeout, +) + +# Type variable for response models +T = TypeVar('T') + +# Configure module logger +logger = logging.getLogger(__name__) + + +class StreamErrorType(Enum): + """Classification of streaming errors for retry decisions.""" + + # Retryable errors - connection issues that may resolve + CONNECTION_ERROR = "connection_error" + TIMEOUT = "timeout" + SERVER_ERROR = "server_error" # 5xx errors + RATE_LIMITED = "rate_limited" # 429 errors + STREAM_INTERRUPTED = "stream_interrupted" + + # Non-retryable errors - client issues that won't resolve with retry + AUTHENTICATION_ERROR = "authentication_error" # 401/403 + CLIENT_ERROR = "client_error" # Other 4xx errors + FATAL_ERROR = "fatal_error" + + +class StreamError(Exception): + """Exception raised for streaming errors with classification.""" + + def __init__( + self, + message: str, + error_type: StreamErrorType, + original_exception: Optional[Exception] = None, + status_code: Optional[int] = None, + response_body: Optional[str] = None, + ): + super().__init__(message) + self.error_type = error_type + self.original_exception = original_exception + self.status_code = status_code + self.response_body = response_body + + @property + def is_retryable(self) -> bool: + """Check if this error type should be retried.""" + return self.error_type in { + StreamErrorType.CONNECTION_ERROR, + StreamErrorType.TIMEOUT, + StreamErrorType.SERVER_ERROR, + StreamErrorType.RATE_LIMITED, + StreamErrorType.STREAM_INTERRUPTED, + } + + +@dataclass +class StreamConfig: + """Configuration for streaming connections with retry behavior. + + Attributes: + max_retries: Maximum number of reconnection attempts. Set to -1 for infinite retries. + initial_backoff: Initial backoff delay in seconds (default: 1.0). + max_backoff: Maximum backoff delay in seconds (default: 64.0). + backoff_multiplier: Multiplier for exponential backoff (default: 2.0). + jitter: Whether to add random jitter to backoff times (default: True). + timeout: Request timeout in seconds. None for no timeout. + chunk_size: Size of chunks to read from stream (default: 1024). + on_connect: Optional callback when connection is established. + on_disconnect: Optional callback when connection is lost. + on_reconnect: Optional callback when reconnection is attempted. + on_error: Optional callback when an error occurs. + """ + max_retries: int = 10 + initial_backoff: float = 1.0 + max_backoff: float = 64.0 + backoff_multiplier: float = 2.0 + jitter: bool = True + timeout: Optional[float] = None + chunk_size: int = 1024 + + # Callbacks for connection lifecycle events + on_connect: Optional[Callable[[], None]] = None + on_disconnect: Optional[Callable[[Optional[Exception]], None]] = None + on_reconnect: Optional[Callable[[int, float], None]] = None # (attempt, delay) + on_error: Optional[Callable[[StreamError], None]] = None + + +@dataclass +class StreamState: + """Internal state for a streaming connection.""" + retry_count: int = 0 + current_backoff: float = 1.0 + is_connected: bool = False + total_items_received: int = 0 + last_error: Optional[StreamError] = None + + +def _classify_error(exception: Exception, response: Optional[requests.Response] = None) -> StreamError: + """Classify an exception into a StreamError with appropriate type.""" + + # Handle HTTP response errors + if response is not None: + status_code = response.status_code + try: + response_body = response.text + except Exception: + response_body = None + + if status_code == 429: + return StreamError( + f"Rate limited (429): Too many requests", + StreamErrorType.RATE_LIMITED, + exception, + status_code, + response_body, + ) + elif status_code in (401, 403): + return StreamError( + f"Authentication error ({status_code}): Access denied", + StreamErrorType.AUTHENTICATION_ERROR, + exception, + status_code, + response_body, + ) + elif 400 <= status_code < 500: + return StreamError( + f"Client error ({status_code}): Bad request", + StreamErrorType.CLIENT_ERROR, + exception, + status_code, + response_body, + ) + elif status_code >= 500: + return StreamError( + f"Server error ({status_code}): Server unavailable", + StreamErrorType.SERVER_ERROR, + exception, + status_code, + response_body, + ) + + # Handle connection-level errors + if isinstance(exception, (ConnectionError, ConnectionResetError, BrokenPipeError)): + return StreamError( + f"Connection error: {exception}", + StreamErrorType.CONNECTION_ERROR, + exception, + ) + + if isinstance(exception, (Timeout, ReadTimeout)): + return StreamError( + f"Timeout: {exception}", + StreamErrorType.TIMEOUT, + exception, + ) + + if isinstance(exception, ChunkedEncodingError): + return StreamError( + f"Stream interrupted: {exception}", + StreamErrorType.STREAM_INTERRUPTED, + exception, + ) + + if isinstance(exception, requests.HTTPError): + # Extract status code if available + if exception.response is not None: + return _classify_error(exception, exception.response) + return StreamError( + f"HTTP error: {exception}", + StreamErrorType.SERVER_ERROR, + exception, + ) + + # Default to fatal for unknown errors + return StreamError( + f"Unexpected error: {exception}", + StreamErrorType.FATAL_ERROR, + exception, + ) + + +def _calculate_backoff( + retry_count: int, + initial_backoff: float, + max_backoff: float, + multiplier: float, + jitter: bool, +) -> float: + """Calculate backoff time with exponential increase and optional jitter.""" + backoff = initial_backoff * (multiplier ** retry_count) + backoff = min(backoff, max_backoff) + + if jitter: + # Add random jitter between 0% and 25% of the backoff time + jitter_amount = backoff * random.uniform(0, 0.25) + backoff += jitter_amount + + return backoff + + +def stream_with_retry( + session: requests.Session, + method: str, + url: str, + config: Optional[StreamConfig] = None, + params: Optional[Dict[str, Any]] = None, + headers: Optional[Dict[str, str]] = None, + json_data: Optional[Dict[str, Any]] = None, + response_model: Optional[Type[T]] = None, +) -> Generator[Union[T, Dict[str, Any]], None, None]: + """ + Stream data from an endpoint with automatic reconnection and exponential backoff. + + This function handles all connection management, including: + - Automatic reconnection on disconnects + - Exponential backoff with jitter for retry delays + - Classification of errors as retryable vs fatal + - Lifecycle callbacks for monitoring connection state + + Args: + session: The requests Session to use for HTTP calls. + method: HTTP method (typically "get"). + url: The full URL to stream from. + config: StreamConfig with retry and callback settings. + params: Query parameters for the request. + headers: HTTP headers for the request. + json_data: JSON body data for the request. + response_model: Optional Pydantic model class to validate responses. + + Yields: + Parsed JSON objects from the stream, optionally validated as Pydantic models. + + Raises: + StreamError: When a non-retryable error occurs or max retries exceeded. + """ + if config is None: + config = StreamConfig() + + state = StreamState(current_backoff=config.initial_backoff) + + while True: + try: + # Attempt to establish connection + response = _make_stream_request( + session, method, url, config, params, headers, json_data + ) + + # Successfully connected + state.is_connected = True + state.retry_count = 0 + state.current_backoff = config.initial_backoff + + if config.on_connect: + try: + config.on_connect() + except Exception as e: + logger.warning(f"on_connect callback failed: {e}") + + # Stream data from the connection + yield from _process_stream(response, config, state, response_model) + + # Stream ended normally (server closed connection) + logger.info("Stream ended normally, attempting to reconnect...") + state.is_connected = False + + if config.on_disconnect: + try: + config.on_disconnect(None) + except Exception as e: + logger.warning(f"on_disconnect callback failed: {e}") + + except StreamError as e: + state.is_connected = False + state.last_error = e + + if config.on_error: + try: + config.on_error(e) + except Exception as callback_error: + logger.warning(f"on_error callback failed: {callback_error}") + + if config.on_disconnect: + try: + config.on_disconnect(e.original_exception) + except Exception as callback_error: + logger.warning(f"on_disconnect callback failed: {callback_error}") + + if not e.is_retryable: + logger.error(f"Non-retryable error: {e}") + raise + + # Check if we've exceeded max retries + if config.max_retries >= 0 and state.retry_count >= config.max_retries: + logger.error(f"Max retries ({config.max_retries}) exceeded") + raise StreamError( + f"Max retries exceeded after {state.retry_count} attempts. Last error: {e}", + StreamErrorType.FATAL_ERROR, + e.original_exception, + ) + + # Calculate backoff and wait + backoff = _calculate_backoff( + state.retry_count, + config.initial_backoff, + config.max_backoff, + config.backoff_multiplier, + config.jitter, + ) + + state.retry_count += 1 + logger.info( + f"Retrying in {backoff:.2f}s (attempt {state.retry_count}" + f"{f'/{config.max_retries}' if config.max_retries >= 0 else ''})..." + ) + + if config.on_reconnect: + try: + config.on_reconnect(state.retry_count, backoff) + except Exception as callback_error: + logger.warning(f"on_reconnect callback failed: {callback_error}") + + time.sleep(backoff) + + except Exception as e: + # Unexpected error - classify and handle + stream_error = _classify_error(e) + state.is_connected = False + state.last_error = stream_error + + if config.on_error: + try: + config.on_error(stream_error) + except Exception as callback_error: + logger.warning(f"on_error callback failed: {callback_error}") + + if not stream_error.is_retryable: + raise stream_error + + # Check max retries + if config.max_retries >= 0 and state.retry_count >= config.max_retries: + raise StreamError( + f"Max retries exceeded after {state.retry_count} attempts", + StreamErrorType.FATAL_ERROR, + e, + ) + + backoff = _calculate_backoff( + state.retry_count, + config.initial_backoff, + config.max_backoff, + config.backoff_multiplier, + config.jitter, + ) + + state.retry_count += 1 + logger.info(f"Retrying in {backoff:.2f}s after error: {e}") + + if config.on_reconnect: + try: + config.on_reconnect(state.retry_count, backoff) + except Exception as callback_error: + logger.warning(f"on_reconnect callback failed: {callback_error}") + + time.sleep(backoff) + + +def _make_stream_request( + session: requests.Session, + method: str, + url: str, + config: StreamConfig, + params: Optional[Dict[str, Any]], + headers: Optional[Dict[str, str]], + json_data: Optional[Dict[str, Any]], +) -> requests.Response: + """Make the HTTP request for streaming.""" + request_kwargs = { + "params": params or {}, + "headers": headers or {}, + "stream": True, + "timeout": config.timeout, + } + + if json_data: + request_kwargs["json"] = json_data + + request_method = getattr(session, method.lower()) + response = request_method(url, **request_kwargs) + + # Check for HTTP errors - this will raise for 4xx/5xx + try: + response.raise_for_status() + except requests.HTTPError as e: + stream_error = _classify_error(e, response) + raise stream_error + + return response + + +def _process_stream( + response: requests.Response, + config: StreamConfig, + state: StreamState, + response_model: Optional[Type[T]], +) -> Generator[Union[T, Dict[str, Any]], None, None]: + """Process the streaming response, yielding parsed JSON objects.""" + buffer = "" + + try: + for chunk in response.iter_content( + chunk_size=config.chunk_size, + decode_unicode=True, + ): + if not chunk: + continue + + # Ensure chunk is string + if isinstance(chunk, bytes): + chunk = chunk.decode('utf-8') + + buffer += chunk + + # Process complete lines + while '\n' in buffer: + line, buffer = buffer.split('\n', 1) + line = line.strip() + + if not line: + continue + + # Skip keep-alive signals (empty JSON or heartbeats) + if line in ('', '{}'): + continue + + try: + data = json.loads(line) + state.total_items_received += 1 + + if response_model is not None: + yield response_model.model_validate(data) + else: + yield data + + except json.JSONDecodeError as e: + logger.debug(f"Skipping invalid JSON line: {line[:100]}...") + continue + except Exception as e: + logger.debug(f"Error processing line: {e}") + continue + + # Process remaining buffer + if buffer.strip(): + try: + data = json.loads(buffer.strip()) + state.total_items_received += 1 + + if response_model is not None: + yield response_model.model_validate(data) + else: + yield data + except (json.JSONDecodeError, Exception): + pass + + except ChunkedEncodingError as e: + raise StreamError( + "Stream was interrupted", + StreamErrorType.STREAM_INTERRUPTED, + e, + ) + except (ConnectionError, ConnectionResetError) as e: + raise StreamError( + "Connection lost", + StreamErrorType.CONNECTION_ERROR, + e, + ) + except Exception as e: + raise _classify_error(e) +