MySQL Source Versioning V2 - reorganized boogaloo#36333
MySQL Source Versioning V2 - reorganized boogaloo#36333patrickwwbutler wants to merge 3 commits intoMaterializeInc:mainfrom
Conversation
Adds `binlog_full_metadata: bool` to `MySqlSourceExportDetails` (and its proto/storage-types counterparts). During purification, calls `ensure_binlog_full_metadata()` to check MySQL version (≥8.0.1) and that `binlog_row_metadata=FULL` is set — storing the result as a flag that flows through `PurifiedExportDetails::MySql` and the DDL planner into the persisted source export details. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Refactors `pack_mysql_row()` to accept `gtid_set` and `binlog_full_metadata` parameters. When `binlog_full_metadata=true`, columns are matched by name from the wire row to the table descriptor (safe under reordering). When false, falls back to position-based matching (original behavior, required for `binlog_row_metadata=MINIMAL`). Adds diagnostic helpers `decode_error()` and `describe_row_shape()` for richer error context. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…l_metadata=true Adds a `full_metadata: bool` parameter to `MySqlTableDesc::determine_compatibility()`. When true, columns are matched by name (allowing upstream reordering and safe addition of new columns). When false, uses the original positional prefix check. `verify_schemas()` now passes `output.binlog_full_metadata` to drive the choice. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
ublubu
left a comment
There was a problem hiding this comment.
I took a first pass at the "decode binlog rows by column name" commit. I'll look at the tests more carefully tomorrow.
| // For each column in `table_desc` (in descriptor order), resolve its wire | ||
| // index. Non-fallback rows are matched by name so a reordered upstream | ||
| // still decodes correctly; fallback rows have no names and are matched | ||
| // positionally. A `None` here means the upstream row is missing this | ||
| // column and is only tolerated for ignored columns. |
There was a problem hiding this comment.
The reference to "fallback" here confuses me.
The only use of fallback_names is in if binlog_full_metadata && fallback_names { above.
My best guess is that it's being used as a proxy for binlog_full_metadata. But we didn't check for !binlog_full_metadata && !fallback_names, so it's not a perfect one-to-one binlog_full_metadata iff !fallback_names.
| // index. Non-fallback rows are matched by name so a reordered upstream | ||
| // still decodes correctly; fallback rows have no names and are matched |
There was a problem hiding this comment.
Another point of confusion for me:
This reads like you can have "fallback" and "non-fallback" rows side-by-side in the same table.
But the behavior is actually determined by binlog_full_metadata, which is a table attribute.
There was a problem hiding this comment.
Yeah that's a fair confusion, "non-fallback rows" refers not to the table, but the binlog, and whether or not we have metadata (column names), or we need to fallback to indices, but this could use some refactoring for clarity, and should be clearer particularly with marty's suggestion of moving the check up a level, and then only looking at binlog_full_metadata here.
| // positionally. A `None` here means the upstream row is missing this | ||
| // column and is only tolerated for ignored columns. |
There was a problem hiding this comment.
A
Nonehere ... is only tolerated for ignored columns
It's not clear to me what "here" means. At first I thought it meant if col_desc.column_type.is_none() { below, but that doesn't make sense because that's how we define which columns are to be ignored.
| let wire_idx = if !binlog_full_metadata { | ||
| (i < row.len()).then_some(i) | ||
| } else { | ||
| row.columns_ref() | ||
| .iter() | ||
| .position(|wc| wc.name_str() == col_desc.name.as_str()) | ||
| }; |
There was a problem hiding this comment.
It might be clearer to put comments directly inside the branches of this if-else block instead.
| if col_desc.column_type.is_none() { | ||
| // This column is ignored, so don't decode it. | ||
| continue; | ||
| } |
There was a problem hiding this comment.
This might as well happen first in the for loop.
There was a problem hiding this comment.
+1 to rewriting as I also found this hard to follow
| Ok(()) => (), | ||
| let wire_idx = match wire_idx { | ||
| Some(idx) => idx, | ||
| None => { |
There was a problem hiding this comment.
Ah, this is "A None here". This feels like a good spot for that comment.
Or omit the comment. This error message and the // This column is ignored above already tell the story.
| @@ -0,0 +1,193 @@ | |||
| --- | |||
| title: "Guide: Handle upstream schema changes with zero downtime" | |||
There was a problem hiding this comment.
Could we have the user-facing docs be a separate PR that incorporates the reference docs + ingest data pages as well?
Otherwise, this page gets published as soon as this branch is merged ... not when the feature is actually released ... so, that if we skip a release week, etc ... and people have hooked up AI to the docs (or Matty), they'll get the wrong idea.
| // If there are extra columns on the upstream table we can safely ignore them | ||
| break; | ||
| } | ||
| // If a column name begins with '@', then the binlog does not have full row metadata, |
There was a problem hiding this comment.
You can also manually create a table with a column name starting with @, which would throw this logic off.
There was a problem hiding this comment.
deeply upsetting news. I think then we check for @, if it exists, compare it against the expected column name we have for index 0 (possibly check other indices if 0 is an excluded column? we then run into some issues if they do something like ALTER TABLE foo ADD COLUMN @1 FIRST, which is super cursed. I guess worst case scenario we can just check if all the binlog columns follow the pattern @1, @2, ..., @n, but that feels excessive to do on every single row
There was a problem hiding this comment.
I have a related comment to move this check up a level! TableMapEvent optional metadata columns may be of use here https://dev.mysql.com/doc/dev/mysql-server/9.6.0/classmysql_1_1binlog_1_1event_1_1Table__map__event.html#Table_table_map_event_optional_metadata
| let other_column = if full_metadata { | ||
| if self_column.column_type.is_none() { | ||
| // This is an excluded column and can be ignored, as it may not have a | ||
| // corresponding column in `other.columns` if the column was dropped upstream. | ||
| continue; | ||
| } | ||
| other.columns.iter().find(|c| c.name == self_column.name) | ||
| } else { | ||
| other_columns.next() | ||
| }; | ||
| if self_column.column_type.is_none() { | ||
| // This is an excluded column and can be ignored. | ||
| continue; | ||
| } |
There was a problem hiding this comment.
Do we also need this logic for the snapshot path?
There was a problem hiding this comment.
The snapshot path does end up going through here as well, via verify_schemas, so it should be applied the same there
martykulma
left a comment
There was a problem hiding this comment.
Looking good! There are some additional improvements to make; partly my fault - i didn't think through the strict vs. lenient checks in purification (sorry!).
| qualified_table_name: format!("{}.{}", table_desc.schema_name, table_desc.name), | ||
| error: "Table was created with binlog_row_metadata=FULL but binlog_row_metadata has since been set to a different value, meaning we cannot reliably decode the columns".to_string(), | ||
| }); | ||
| } |
There was a problem hiding this comment.
Can we move this up to compare binlog_full_metadata and the TableMapEvent in handle_rows_event? There's optional metadata that's probably set only if FULL.
There was a problem hiding this comment.
I assume you mean we should use the optional metadata to check if there is full metadata, and create an error there rather than in decoding? And then here we can not bother with fallback_names, instead just using the binlog_full_metadata to determine behavior?
|
I just realized that |
478081e to
090552c
Compare
This PR effectively re-implements source versioning for mysql after we reverted the initial implementation last week due to decoding problems with exclude columns resulting in an incident. In doing so, it fixes a number of issues that existed with the first implementation, including:
https://github.com/MaterializeInc/database-issues/issues/11312
https://github.com/MaterializeInc/database-issues/issues/11313
https://github.com/MaterializeInc/database-issues/issues/11315
And provides a safer mechanism for handling schema changes and changes to the
binlog_row_metadataMySQL system variable.The first commit is roughly the changes in #36253 which should be merged first, and is required for the other changes.
Second commit updates the decoding logic based on the binlog metadata setting at source creation
Third commit updates the logic to verify mysql schemas with the schemas in the upstream, allowing for certain types of schema changes when binlog_row_metadata is FULL.
Fourth commit contains docs for how to make schema changes to your mysql source without downtime in materialize.