⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content

Conversation

@PDGGK
Copy link
Contributor

@PDGGK PDGGK commented Jan 30, 2026

Summary

This PR adds support for writing Kafka record headers in the Python SDK by introducing a new cross-language transform.

Changes:

  • Python: Add with_headers parameter to WriteToKafka that accepts beam.Row elements with key, value, and optional headers fields
  • Java: Add WriteWithHeaders class that converts Row to ProducerRecord with headers support
  • Java: Register new URN beam:transform:org.apache.beam:kafka_write_with_headers:v1
  • Test: Add testConstructKafkaWriteWithHeaders in KafkaIOExternalTest

Input Schema

When with_headers=True, input elements must be beam.Row with:

  • key: bytes (required)
  • value: bytes (required)
  • headers: List[Row(key=str, value=bytes)] (optional)
  • topic: str (optional, per-record override)
  • partition: int (optional)
  • timestamp: long (optional)

Example Usage

# Write with headers
records = [
    beam.Row(
        key=b'k1', 
        value=b'v1', 
        headers=[beam.Row(key='h1', value=b'hv1')]
    )
]

p | beam.Create(records) | WriteToKafka(
    producer_config={'bootstrap.servers': 'localhost:9092'},
    topic='my-topic',
    with_headers=True
)

Test plan

  • Java compilation passes
  • testConstructKafkaWriteWithHeaders passes
  • Code reviewed by Codex and Gemini

Related Issues

Fixes #27033


🤖 Generated with Claude Code

PDGGK and others added 2 commits January 20, 2026 19:37
…y configuration

Users need to configure bounded backoff to prevent infinite retry loops.
Making withBackOffSupplier public allows users to set
FluentBackoff.DEFAULT.withMaxRetries(n) and control retry behavior.

Changes:
- Changed withBackOffSupplier() visibility from package-private to public
- Added comprehensive integration test with zero-delay BoundedBackOff
- Test verifies: responses empty, 1 failure emitted, call count = maxRetries+1

The test uses a serializable BoundedBackOff class with assertions on both
PAssert (pipeline outputs) and Metrics (retry counts) to ensure bounded
retry behavior works correctly.

Fixes apache#37198
Related to apache#37176

Co-Authored-By: Claude Sonnet 4.5 <[email protected]>
…pache#27033)

This PR adds support for writing Kafka record headers in the Python SDK
by introducing a new cross-language transform.

Changes:
- Python: Add `with_headers` parameter to `WriteToKafka` that accepts
  `beam.Row` elements with key, value, and optional headers fields
- Java: Add `WriteWithHeaders` class that converts Row to ProducerRecord
  with headers support
- Java: Register new URN `beam:transform:org.apache.beam:kafka_write_with_headers:v1`
- Add test `testConstructKafkaWriteWithHeaders` in KafkaIOExternalTest

When `with_headers=True`, input elements must be `beam.Row` with schema:
- key: bytes (required)
- value: bytes (required)
- headers: List[Row(key=str, value=bytes)] (optional)
- topic: str (optional, per-record override)
- partition: int (optional)
- timestamp: long (optional)

Co-Authored-By: Claude Opus 4.5 <[email protected]>
@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @PDGGK, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly enhances the Apache Beam Python SDK's Kafka integration by adding native support for Kafka record headers. This feature allows users to attach metadata to their Kafka messages, providing greater flexibility and richer context for data processing workflows. The implementation involves a new cross-language transform in Java to handle the conversion of structured Beam Row elements into Kafka ProducerRecords with headers, alongside updates to the Python WriteToKafka API and necessary validation.

Highlights

  • Kafka Headers Support in Python SDK: The Python WriteToKafka transform now includes a with_headers parameter, enabling users to write Kafka records with custom headers.
  • New Cross-Language Transform for Headers: A new Java class, WriteWithHeaders, has been introduced. This class acts as a cross-language transform, converting beam.Row elements (which can include key, value, and headers) into Kafka ProducerRecord objects.
  • URN Registration: A new URN, beam:transform:org.apache.beam:kafka_write_with_headers:v1, has been registered to identify the new WriteWithHeaders external transform.
  • Input Schema for Headers: When with_headers=True, input elements must be beam.Row objects with key (bytes), value (bytes), and an optional headers field (List[Row(key=str, value=bytes)]). Optional topic, partition, and timestamp fields are also supported for per-record overrides.
  • Serializer Restriction: When using with_headers=True, only ByteArraySerializer is supported for both key and value serializers to ensure compatibility with header handling.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions
Copy link
Contributor

Assigning reviewers:

R: @tvalentyn for label python.
R: @chamikaramj for label java.
R: @Dippatel98 for label kafka.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

- Wrap URN_WITH_HEADERS to fit 80 char limit
- Add blank line before docstring list for Sphinx
- Format if statement per yapf style
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature Request]: Add support to write record headers in WriteToKafka in python SDK

1 participant