schemastore: fix cross-schema create view with unqualified source table (#5027)#5317
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
@lidezhu This PR has conflicts, I have hold it. |
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
There was a problem hiding this comment.
Code Review
This pull request introduces query normalization for view creation to resolve issues with cross-schema views referencing unqualified source tables. It adds helper functions to extract table schemas and verify schema references, along with comprehensive unit and integration tests. However, the changes currently contain several unresolved git merge conflict markers in both logservice/schemastore/utils.go and logservice/schemastore/utils_test.go, as well as unused imports in the test file that will cause compilation failures.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| <<<<<<< HEAD | ||
| "github.com/pingcap/tidb/pkg/parser/format" | ||
| ======= | ||
| >>>>>>> 5745770ea (schemastore: fix cross-schema create view with unqualified source table (#5027)) |
| <<<<<<< HEAD | ||
| ======= | ||
|
|
||
| func getIndexIDs(job *model.Job) []int64 { | ||
| if job == nil { | ||
| return nil | ||
| } | ||
|
|
||
| // Anonymous index rewrite only needs IDs for ADD INDEX clauses, and it | ||
| // consumes them in SQL order. Other modify-index subjobs such as DROP INDEX, | ||
| // RENAME INDEX, or ADD PRIMARY KEY would shift that positional mapping and | ||
| // make the downstream rewrite pick the wrong upstream name. | ||
| if job.Type == model.ActionAddIndex { | ||
| return extractAddIndexIDs(job) | ||
| } | ||
|
|
||
| if job.MultiSchemaInfo == nil { | ||
| return nil | ||
| } | ||
|
|
||
| res := make([]int64, 0) | ||
| for idx, subJob := range job.MultiSchemaInfo.SubJobs { | ||
| if subJob.Type != model.ActionAddIndex { | ||
| continue | ||
| } | ||
| proxyJob := subJob.ToProxyJob(job, idx) | ||
| res = append(res, extractAddIndexIDs(&proxyJob)...) | ||
| } | ||
| return res | ||
| } | ||
|
|
||
| func extractAddIndexIDs(job *model.Job) []int64 { | ||
| idxArgs, err := model.GetModifyIndexArgs(job) | ||
| if idxArgs == nil || err != nil { | ||
| return nil | ||
| } | ||
|
|
||
| res := make([]int64, 0, len(idxArgs.IndexArgs)) | ||
| for _, indexArg := range idxArgs.IndexArgs { | ||
| res = append(res, indexArg.IndexID) | ||
| } | ||
| return res | ||
| } | ||
|
|
||
| type tableSchemaExtractor struct { | ||
| schemas []string | ||
| } | ||
|
|
||
| func (e *tableSchemaExtractor) Enter(in ast.Node) (ast.Node, bool) { | ||
| if t, ok := in.(*ast.TableName); ok { | ||
| e.schemas = append(e.schemas, t.Schema.O) | ||
| return in, true | ||
| } | ||
| return in, false | ||
| } | ||
|
|
||
| func (e *tableSchemaExtractor) Leave(in ast.Node) (ast.Node, bool) { | ||
| return in, true | ||
| } | ||
|
|
||
| // extractTableSchemas returns schema qualifiers from all *ast.TableName nodes in | ||
| // AST visit order. Unqualified tables contribute an empty schema name. | ||
| func extractTableSchemas(node ast.Node) []string { | ||
| if node == nil { | ||
| return nil | ||
| } | ||
|
|
||
| extractor := &tableSchemaExtractor{ | ||
| schemas: make([]string, 0), | ||
| } | ||
| node.Accept(extractor) | ||
| return extractor.schemas | ||
| } | ||
| >>>>>>> 5745770ea (schemastore: fix cross-schema create view with unqualified source table (#5027)) |
There was a problem hiding this comment.
Unresolved merge conflict markers found at the end of the file. Please resolve the conflict and keep the helper functions.
func getIndexIDs(job *model.Job) []int64 {
if job == nil {
return nil
}
// Anonymous index rewrite only needs IDs for ADD INDEX clauses, and it
// consumes them in SQL order. Other modify-index subjobs such as DROP INDEX,
// RENAME INDEX, or ADD PRIMARY KEY would shift that positional mapping and
// make the downstream rewrite pick the wrong upstream name.
if job.Type == model.ActionAddIndex {
return extractAddIndexIDs(job)
}
if job.MultiSchemaInfo == nil {
return nil
}
res := make([]int64, 0)
for idx, subJob := range job.MultiSchemaInfo.SubJobs {
if subJob.Type != model.ActionAddIndex {
continue
}
proxyJob := subJob.ToProxyJob(job, idx)
res = append(res, extractAddIndexIDs(&proxyJob)...)
}
return res
}
func extractAddIndexIDs(job *model.Job) []int64 {
idxArgs, err := model.GetModifyIndexArgs(job)
if idxArgs == nil || err != nil {
return nil
}
res := make([]int64, 0, len(idxArgs.IndexArgs))
for _, indexArg := range idxArgs.IndexArgs {
res = append(res, indexArg.IndexID)
}
return res
}
type tableSchemaExtractor struct {
schemas []string
}
func (e *tableSchemaExtractor) Enter(in ast.Node) (ast.Node, bool) {
if t, ok := in.(*ast.TableName); ok {
e.schemas = append(e.schemas, t.Schema.O)
return in, true
}
return in, false
}
func (e *tableSchemaExtractor) Leave(in ast.Node) (ast.Node, bool) {
return in, true
}
// extractTableSchemas returns schema qualifiers from all *ast.TableName nodes in
// AST visit order. Unqualified tables contribute an empty schema name.
func extractTableSchemas(node ast.Node) []string {
if node == nil {
return nil
}
extractor := &tableSchemaExtractor{
schemas: make([]string, 0),
}
node.Accept(extractor)
return extractor.schemas
}| <<<<<<< HEAD | ||
| ======= | ||
| "github.com/pingcap/ticdc/pkg/config/kerneltype" | ||
| ticonfig "github.com/pingcap/tidb/pkg/config" | ||
| "github.com/pingcap/tidb/pkg/disttask/framework/handle" | ||
| "github.com/pingcap/tidb/pkg/meta/model" | ||
| "github.com/pingcap/tidb/pkg/parser" | ||
| >>>>>>> 5745770ea (schemastore: fix cross-schema create view with unqualified source table (#5027)) |
There was a problem hiding this comment.
Unresolved merge conflict markers found in the imports section. Also, several unused imports (kerneltype, ticonfig, handle) were introduced, which will cause compilation errors in Go. Please resolve the conflict and keep only the used imports.
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser"| <<<<<<< HEAD | ||
| ======= | ||
|
|
||
| func TestBuildPersistedDDLEventForMultiSchemaChangeContainsIndexIDs(t *testing.T) { | ||
| helper := commonEvent.NewEventTestHelper(t) | ||
| defer helper.Close() | ||
|
|
||
| helper.Tk().MustExec("use test") | ||
| helper.DDL2Event("create table t (id int primary key, c1 int)") | ||
|
|
||
| job := helper.DDL2Job("alter table t add column c2 int, add index (c1)") | ||
| require.Equal(t, model.ActionMultiSchemaChange, job.Type) | ||
|
|
||
| args := buildPersistedDDLEventFuncArgs{ | ||
| job: job, | ||
| databaseMap: map[int64]*BasicDatabaseInfo{ | ||
| job.SchemaID: { | ||
| Name: "test", | ||
| Tables: map[int64]bool{ | ||
| job.TableID: true, | ||
| }, | ||
| }, | ||
| }, | ||
| tableMap: map[int64]*BasicTableInfo{ | ||
| job.TableID: { | ||
| SchemaID: job.SchemaID, | ||
| Name: "t", | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| event := buildPersistedDDLEventForMultiSchemaChange(args) | ||
| expectedIndexIDs := getIndexIDs(job) | ||
| require.Len(t, expectedIndexIDs, 1) | ||
| require.Equal(t, expectedIndexIDs, event.IndexIDs) | ||
| require.Equal(t, "test", event.SchemaName) | ||
| require.Equal(t, "t", event.TableName) | ||
| } | ||
|
|
||
| func TestGetIndexIDsReturnsAllAddIndexIDsInOrder(t *testing.T) { | ||
| helper := commonEvent.NewEventTestHelper(t) | ||
| defer helper.Close() | ||
|
|
||
| helper.Tk().MustExec("use test") | ||
| helper.DDL2Event("create table t (id int primary key, c1 int)") | ||
|
|
||
| job := helper.DDL2Job("alter table t add index idx_c1(c1), add index (c1)") | ||
| tableInfo := helper.GetModelTableInfo(job) | ||
| require.NotNil(t, tableInfo) | ||
|
|
||
| var namedIndexID int64 | ||
| var anonymousIndexID int64 | ||
| for _, index := range tableInfo.Indices { | ||
| if index == nil { | ||
| continue | ||
| } | ||
| if index.Name.O == "idx_c1" { | ||
| namedIndexID = index.ID | ||
| continue | ||
| } | ||
| if len(index.Columns) == 1 && index.Columns[0].Name.L == "c1" { | ||
| anonymousIndexID = index.ID | ||
| } | ||
| } | ||
| require.NotZero(t, namedIndexID) | ||
| require.NotZero(t, anonymousIndexID) | ||
| require.Equal(t, []int64{namedIndexID, anonymousIndexID}, getIndexIDs(job)) | ||
| } | ||
|
|
||
| func TestGetIndexIDsReturnsAllAddIndexIDsInOrderForMultiSchemaChange(t *testing.T) { | ||
| helper := commonEvent.NewEventTestHelper(t) | ||
| defer helper.Close() | ||
|
|
||
| helper.Tk().MustExec("use test") | ||
| helper.DDL2Event("create table t (id int primary key, c1 int)") | ||
|
|
||
| job := helper.DDL2Job("alter table t add column c2 int, add index idx_c1(c1), add index (c1)") | ||
| require.Equal(t, model.ActionMultiSchemaChange, job.Type) | ||
| tableInfo := helper.GetModelTableInfo(job) | ||
| require.NotNil(t, tableInfo) | ||
|
|
||
| var namedIndexID int64 | ||
| var anonymousIndexID int64 | ||
| for _, index := range tableInfo.Indices { | ||
| if index == nil { | ||
| continue | ||
| } | ||
| if index.Name.O == "idx_c1" { | ||
| namedIndexID = index.ID | ||
| continue | ||
| } | ||
| if len(index.Columns) == 1 && index.Columns[0].Name.L == "c1" { | ||
| anonymousIndexID = index.ID | ||
| } | ||
| } | ||
| require.NotZero(t, namedIndexID) | ||
| require.NotZero(t, anonymousIndexID) | ||
| require.Equal(t, []int64{namedIndexID, anonymousIndexID}, getIndexIDs(job)) | ||
| } | ||
|
|
||
| func TestGetIndexIDsIgnoresDropIndexSubJobsForMultiSchemaChange(t *testing.T) { | ||
| helper := commonEvent.NewEventTestHelper(t) | ||
| defer helper.Close() | ||
|
|
||
| helper.Tk().MustExec("use test") | ||
| helper.DDL2Event("create table t (id int primary key, a int, key idx_old(id))") | ||
|
|
||
| job := helper.DDL2Job("alter table t drop index idx_old, add index (a)") | ||
| require.Equal(t, model.ActionMultiSchemaChange, job.Type) | ||
|
|
||
| tableInfo := helper.GetModelTableInfo(job) | ||
| require.NotNil(t, tableInfo) | ||
|
|
||
| var anonymousIndexID int64 | ||
| for _, index := range tableInfo.Indices { | ||
| if index == nil || index.Primary || len(index.Columns) != 1 { | ||
| continue | ||
| } | ||
| if index.Columns[0].Name.L == "a" { | ||
| anonymousIndexID = index.ID | ||
| break | ||
| } | ||
| } | ||
| require.NotZero(t, anonymousIndexID) | ||
| require.Equal(t, []int64{anonymousIndexID}, getIndexIDs(job)) | ||
| } | ||
|
|
||
| func TestGetIndexIDsIgnoresAddPrimaryKeySubJobsForMultiSchemaChange(t *testing.T) { | ||
| helper := commonEvent.NewEventTestHelper(t) | ||
| defer helper.Close() | ||
|
|
||
| helper.Tk().MustExec("use test") | ||
| helper.DDL2Event("create table t (id int, a int)") | ||
|
|
||
| job := helper.DDL2Job("alter table t add primary key(id), add index (a)") | ||
| require.Equal(t, model.ActionMultiSchemaChange, job.Type) | ||
|
|
||
| tableInfo := helper.GetModelTableInfo(job) | ||
| require.NotNil(t, tableInfo) | ||
|
|
||
| var anonymousIndexID int64 | ||
| for _, index := range tableInfo.Indices { | ||
| if index == nil || index.Primary || len(index.Columns) != 1 { | ||
| continue | ||
| } | ||
| if index.Columns[0].Name.L == "a" { | ||
| anonymousIndexID = index.ID | ||
| break | ||
| } | ||
| } | ||
| require.NotZero(t, anonymousIndexID) | ||
| require.Equal(t, []int64{anonymousIndexID}, getIndexIDs(job)) | ||
| } | ||
|
|
||
| func TestExtractTableSchemas(t *testing.T) { | ||
| cases := []struct { | ||
| name string | ||
| query string | ||
| expected []string | ||
| }{ | ||
| { | ||
| name: "unqualified table", | ||
| query: "SELECT * FROM `t`", | ||
| expected: []string{""}, | ||
| }, | ||
| { | ||
| name: "mixed qualified tables", | ||
| query: "SELECT * FROM `db1`.`t1` JOIN `t2` ON `db1`.`t1`.`id` = `t2`.`id`", | ||
| expected: []string{"db1", ""}, | ||
| }, | ||
| { | ||
| name: "subquery preserves visit order", | ||
| query: "SELECT * FROM `db1`.`t1` WHERE EXISTS (SELECT 1 FROM `db2`.`t2`)", | ||
| expected: []string{"db1", "db2"}, | ||
| }, | ||
| } | ||
|
|
||
| p := parser.New() | ||
| for _, tc := range cases { | ||
| t.Run(tc.name, func(t *testing.T) { | ||
| stmt, err := p.ParseOneStmt(tc.query, "", "") | ||
| require.NoError(t, err) | ||
| require.Equal(t, tc.expected, extractTableSchemas(stmt)) | ||
| }) | ||
| } | ||
|
|
||
| require.Nil(t, extractTableSchemas(nil)) | ||
| } | ||
| >>>>>>> 5745770ea (schemastore: fix cross-schema create view with unqualified source table (#5027)) |
There was a problem hiding this comment.
Unresolved merge conflict markers found in the test functions section. Please resolve the conflict and keep the added test cases.
func TestBuildPersistedDDLEventForMultiSchemaChangeContainsIndexIDs(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()
helper.Tk().MustExec("use test")
helper.DDL2Event("create table t (id int primary key, c1 int)")
job := helper.DDL2Job("alter table t add column c2 int, add index (c1)")
require.Equal(t, model.ActionMultiSchemaChange, job.Type)
args := buildPersistedDDLEventFuncArgs{
job: job,
databaseMap: map[int64]*BasicDatabaseInfo{
job.SchemaID: {
Name: "test",
Tables: map[int64]bool{
job.TableID: true,
},
},
},
tableMap: map[int64]*BasicTableInfo{
job.TableID: {
SchemaID: job.SchemaID,
Name: "t",
},
},
}
event := buildPersistedDDLEventForMultiSchemaChange(args)
expectedIndexIDs := getIndexIDs(job)
require.Len(t, expectedIndexIDs, 1)
require.Equal(t, expectedIndexIDs, event.IndexIDs)
require.Equal(t, "test", event.SchemaName)
require.Equal(t, "t", event.TableName)
}
func TestGetIndexIDsReturnsAllAddIndexIDsInOrder(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()
helper.Tk().MustExec("use test")
helper.DDL2Event("create table t (id int primary key, c1 int)")
job := helper.DDL2Job("alter table t add index idx_c1(c1), add index (c1)")
tableInfo := helper.GetModelTableInfo(job)
require.NotNil(t, tableInfo)
var namedIndexID int64
var anonymousIndexID int64
for _, index := range tableInfo.Indices {
if index == nil {
continue
}
if index.Name.O == "idx_c1" {
namedIndexID = index.ID
continue
}
if len(index.Columns) == 1 && index.Columns[0].Name.L == "c1" {
anonymousIndexID = index.ID
}
}
require.NotZero(t, namedIndexID)
require.NotZero(t, anonymousIndexID)
require.Equal(t, []int64{namedIndexID, anonymousIndexID}, getIndexIDs(job))
}
func TestGetIndexIDsReturnsAllAddIndexIDsInOrderForMultiSchemaChange(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()
helper.Tk().MustExec("use test")
helper.DDL2Event("create table t (id int primary key, c1 int)")
job := helper.DDL2Job("alter table t add column c2 int, add index idx_c1(c1), add index (c1)")
require.Equal(t, model.ActionMultiSchemaChange, job.Type)
tableInfo := helper.GetModelTableInfo(job)
require.NotNil(t, tableInfo)
var namedIndexID int64
var anonymousIndexID int64
for _, index := range tableInfo.Indices {
if index == nil {
continue
}
if index.Name.O == "idx_c1" {
namedIndexID = index.ID
continue
}
if len(index.Columns) == 1 && index.Columns[0].Name.L == "c1" {
anonymousIndexID = index.ID
}
}
require.NotZero(t, namedIndexID)
require.NotZero(t, anonymousIndexID)
require.Equal(t, []int64{namedIndexID, anonymousIndexID}, getIndexIDs(job))
}
func TestGetIndexIDsIgnoresDropIndexSubJobsForMultiSchemaChange(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()
helper.Tk().MustExec("use test")
helper.DDL2Event("create table t (id int primary key, a int, key idx_old(id))")
job := helper.DDL2Job("alter table t drop index idx_old, add index (a)")
require.Equal(t, model.ActionMultiSchemaChange, job.Type)
tableInfo := helper.GetModelTableInfo(job)
require.NotNil(t, tableInfo)
var anonymousIndexID int64
for _, index := range tableInfo.Indices {
if index == nil || index.Primary || len(index.Columns) != 1 {
continue
}
if index.Columns[0].Name.L == "a" {
anonymousIndexID = index.ID
break
}
}
require.NotZero(t, anonymousIndexID)
require.Equal(t, []int64{anonymousIndexID}, getIndexIDs(job))
}
func TestGetIndexIDsIgnoresAddPrimaryKeySubJobsForMultiSchemaChange(t *testing.T) {
helper := commonEvent.NewEventTestHelper(t)
defer helper.Close()
helper.Tk().MustExec("use test")
helper.DDL2Event("create table t (id int, a int)")
job := helper.DDL2Job("alter table t add primary key(id), add index (a)")
require.Equal(t, model.ActionMultiSchemaChange, job.Type)
tableInfo := helper.GetModelTableInfo(job)
require.NotNil(t, tableInfo)
var anonymousIndexID int64
for _, index := range tableInfo.Indices {
if index == nil || index.Primary || len(index.Columns) != 1 {
continue
}
if index.Columns[0].Name.L == "a" {
anonymousIndexID = index.ID
break
}
}
require.NotZero(t, anonymousIndexID)
require.Equal(t, []int64{anonymousIndexID}, getIndexIDs(job))
}
func TestExtractTableSchemas(t *testing.T) {
cases := []struct {
name string
query string
expected []string
}{
{
name: "unqualified table",
query: "SELECT * FROM `t`",
expected: []string{""},
},
{
name: "mixed qualified tables",
query: "SELECT * FROM `db1`.`t1` JOIN `t2` ON `db1`.`t1`.`id` = `t2`.`id`",
expected: []string{"db1", ""},
},
{
name: "subquery preserves visit order",
query: "SELECT * FROM `db1`.`t1` WHERE EXISTS (SELECT 1 FROM `db2`.`t2`)",
expected: []string{"db1", "db2"},
},
}
p := parser.New()
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
stmt, err := p.ParseOneStmt(tc.query, "", "")
require.NoError(t, err)
require.Equal(t, tc.expected, extractTableSchemas(stmt))
})
}
require.Nil(t, extractTableSchemas(nil))
}Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
…ot/ticdc-1 into cherry-pick-5027-to-release-8.5 # Conflicts: # logservice/schemastore/persist_storage_ddl_handlers.go # logservice/schemastore/persist_storage_test.go # logservice/schemastore/utils.go # logservice/schemastore/utils_test.go
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: lidezhu The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
This is an automated cherry-pick of #5027
What problem does this PR solve?
Issue Number: close #5026
What is changed and how it works?
This pull request addresses an issue where cross-schema CREATE VIEW statements could fail or be incorrectly processed due to unqualified source table references. By leveraging the normalized SELECT statement stored in the table metadata, the system can now correctly reconstruct the view definition with fully qualified table names. This ensures consistency and accuracy in downstream replication for views that span multiple schemas.
Highlights
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Tests