[DebeziumIO] Implement startOffset & offset persistence for Kafka (#28248)#37750
[DebeziumIO] Implement startOffset & offset persistence for Kafka (#28248)#37750Abacn merged 3 commits intoapache:masterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a crucial Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
Assigning reviewers: R: @tvalentyn for label python. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
|
Hi @an2x @johnjcasey @Dippatel98 - could you please help find a reviewer for this? Thanks |
|
@tvalentyn maybe @Abacn can have a look (?) |
|
Thanks, this change allows user to provide an offset that was obtained elsewhere. It now supports to read from an offset, yet not completely resolve #28248, as restarting the pipeline that have set the offset would still start from the same offset. I had a (yet materialized) idea of restartable offset. Basically we can provide an "OffsetRetainer" interface to 1. read offset on pipeline startup; 2. write offset on checkpointing, and expose as a configuration for the IO. And we can then provide a built-in FileSystem based OffsetRetainer for user to use. When pipeline is running, it continuously write committed offset to external location (gcs, kafka, etc) of choice; when pipeline starts or restarts, it tries to read from the same location at first. If you prefer to go for current change for now, let me take a closer look for this PR. Or we can work on a more concrete solution. |
|
@Abacn thank you for comment. I think your solution sounds like a much better vision, and we should aim for that. |
|
@gemini-code-assist summary |
Summary of ChangesThis pull request significantly improves the resilience and usability of DebeziumIO by introducing robust offset management features. It allows users to either explicitly define a starting offset for Debezium connectors or leverage an automatic file system-based mechanism to persist and restore offsets, ensuring seamless pipeline restarts and fault tolerance. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
|
acc816c to
d57e833
Compare
|
I would appreciate a second look and opinion on this @Abacn 🤝 |
Abacn
left a comment
There was a problem hiding this comment.
Sorry for the delay (back from OOO).
9546ec6 to
b09fdc1
Compare
Absolutely no need to apologize! :) @gemini-code-assist summary |
Summary of ChangesThis pull request introduces robust offset management capabilities to DebeziumIO, significantly enhancing its resilience and usability. It provides users with the flexibility to either explicitly define a starting offset for Debezium connectors or leverage an automatic file system-based mechanism to persist and restore offsets. This ensures seamless pipeline restarts and fault tolerance by allowing pipelines to resume data consumption from the last successfully processed point, preventing data loss and unnecessary reprocessing. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
|
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a significant feature to DebeziumIO by implementing startOffset and offset persistence capabilities, which are crucial for creating resumable pipelines. The changes are well-integrated across the Go, Java, and Python SDKs. The implementation includes a new OffsetRetainer interface in Java, with a FileSystemOffsetRetainer for durable storage, and updates the cross-language configuration accordingly. The code is well-documented and includes comprehensive tests. I have one suggestion to improve the robustness of the cross-language configuration handling in Java to prevent potential exceptions with malformed input.
There was a problem hiding this comment.
LGTM pending AI comment
Tested locally the FileSystemOffsetRetainer functionality
Also consider adding an item for https://github.com/apache/beam/blob/master/CHANGES.md
c03274d to
ab7990d
Compare
|
@Abacn Thank you! Implemented the review comment and updated |
|
@Abacn pleasure! |
Implement withStartOffset for Debezium and external offset persistence to handle restarts. (Addresses #28248)
See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.