Skip to content

Commit 141ae60

Browse files
authored
feat(storage): add fuse_tag table function to list snapshot tags (#19664)
* feat: support desc table tags * fix * fix
1 parent 9153105 commit 141ae60

File tree

7 files changed

+154
-4
lines changed

7 files changed

+154
-4
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/service/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,6 @@ futures = { workspace = true }
123123
futures-util = { workspace = true }
124124
geo = { workspace = true }
125125
geo-index = { workspace = true }
126-
geozero = { workspace = true }
127126
headers = { workspace = true }
128127
hex = { workspace = true }
129128
http = { workspace = true }

src/query/service/src/table_functions/table_function_factory.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ use super::others::UdfEchoTable;
4848
use crate::storages::fuse::table_functions::ClusteringInformationFunc;
4949
use crate::storages::fuse::table_functions::FuseSegmentFunc;
5050
use crate::storages::fuse::table_functions::FuseSnapshotFunc;
51+
use crate::storages::fuse::table_functions::FuseTagFunc;
5152
use crate::table_functions::TableFunction;
5253
use crate::table_functions::async_crash_me::AsyncCrashMeTable;
5354
use crate::table_functions::cloud::TaskDependentsEnableTable;
@@ -141,6 +142,14 @@ impl TableFunctionFactory {
141142
),
142143
);
143144

145+
creators.insert(
146+
"fuse_tag".to_string(),
147+
(
148+
next_id(),
149+
Arc::new(TableFunctionTemplate::<FuseTagFunc>::create),
150+
),
151+
);
152+
144153
creators.insert(
145154
"fuse_dump_snapshots".to_string(),
146155
(
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::sync::Arc;
16+
17+
use databend_common_catalog::plan::DataSourcePlan;
18+
use databend_common_catalog::table_args::TableArgs;
19+
use databend_common_exception::ErrorCode;
20+
use databend_common_exception::Result;
21+
use databend_common_expression::DataBlock;
22+
use databend_common_expression::FromData;
23+
use databend_common_expression::TableDataType;
24+
use databend_common_expression::TableField;
25+
use databend_common_expression::TableSchema;
26+
use databend_common_expression::TableSchemaRefExt;
27+
use databend_common_expression::types::StringType;
28+
use databend_common_expression::types::TimestampType;
29+
use databend_common_meta_app::schema::ListTableTagsReq;
30+
31+
use crate::operations::check_table_ref_access;
32+
use crate::sessions::TableContext;
33+
use crate::table_functions::SimpleTableFunc;
34+
use crate::table_functions::parse_db_tb_args;
35+
use crate::table_functions::string_literal;
36+
37+
struct FuseTagArgs {
38+
database_name: String,
39+
table_name: String,
40+
}
41+
42+
impl From<&FuseTagArgs> for TableArgs {
43+
fn from(args: &FuseTagArgs) -> Self {
44+
TableArgs::new_positioned(vec![
45+
string_literal(args.database_name.as_str()),
46+
string_literal(args.table_name.as_str()),
47+
])
48+
}
49+
}
50+
51+
pub struct FuseTagFunc {
52+
args: FuseTagArgs,
53+
}
54+
55+
#[async_trait::async_trait]
56+
impl SimpleTableFunc for FuseTagFunc {
57+
fn table_args(&self) -> Option<TableArgs> {
58+
Some((&self.args).into())
59+
}
60+
61+
fn schema(&self) -> Arc<TableSchema> {
62+
TableSchemaRefExt::create(vec![
63+
TableField::new("name", TableDataType::String),
64+
TableField::new("snapshot_location", TableDataType::String),
65+
TableField::new("expire_at", TableDataType::Timestamp.wrap_nullable()),
66+
])
67+
}
68+
69+
async fn apply(
70+
&self,
71+
ctx: &Arc<dyn TableContext>,
72+
_plan: &DataSourcePlan,
73+
) -> Result<Option<DataBlock>> {
74+
check_table_ref_access(ctx.as_ref())?;
75+
76+
let catalog = ctx.get_current_catalog();
77+
let tenant = ctx.get_tenant();
78+
let tbl = ctx
79+
.get_catalog(&catalog)
80+
.await?
81+
.get_table(&tenant, &self.args.database_name, &self.args.table_name)
82+
.await?;
83+
if tbl.engine() != "FUSE" {
84+
return Err(ErrorCode::TableEngineNotSupported(
85+
"Invalid table engine, only FUSE table supports fuse_tag",
86+
));
87+
}
88+
let table_id = tbl.get_id();
89+
90+
let tags = ctx
91+
.get_catalog(&catalog)
92+
.await?
93+
.list_table_tags(ListTableTagsReq {
94+
table_id,
95+
include_expired: true,
96+
})
97+
.await?;
98+
99+
if tags.is_empty() {
100+
return Ok(Some(DataBlock::empty_with_schema(&self.schema().into())));
101+
}
102+
103+
let mut names: Vec<String> = Vec::with_capacity(tags.len());
104+
let mut snapshot_locations: Vec<String> = Vec::with_capacity(tags.len());
105+
let mut expire_ats: Vec<Option<i64>> = Vec::with_capacity(tags.len());
106+
107+
for (tag_name, seq_tag) in &tags {
108+
names.push(tag_name.clone());
109+
snapshot_locations.push(seq_tag.data.snapshot_loc.clone());
110+
expire_ats.push(seq_tag.data.expire_at.map(|t| t.timestamp_micros()));
111+
}
112+
113+
Ok(Some(DataBlock::new_from_columns(vec![
114+
StringType::from_data(names),
115+
StringType::from_data(snapshot_locations),
116+
TimestampType::from_opt_data(expire_ats),
117+
])))
118+
}
119+
120+
fn create(func_name: &str, table_args: TableArgs) -> Result<Self>
121+
where Self: Sized {
122+
let (arg_database_name, arg_table_name) = parse_db_tb_args(&table_args, func_name)?;
123+
Ok(Self {
124+
args: FuseTagArgs {
125+
database_name: arg_database_name,
126+
table_name: arg_table_name,
127+
},
128+
})
129+
}
130+
}

src/query/storages/fuse/src/table_functions/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ mod fuse_page;
2424
mod fuse_segment;
2525
mod fuse_snapshot;
2626
mod fuse_statistic;
27+
mod fuse_tag;
2728
mod fuse_time_travel_size;
2829
mod fuse_vacuum_drop_aggregating_index;
2930
mod fuse_vacuum_drop_inverted_index;
@@ -47,6 +48,7 @@ pub use fuse_page::FusePageFunc;
4748
pub use fuse_segment::FuseSegmentFunc;
4849
pub use fuse_snapshot::FuseSnapshotFunc;
4950
pub use fuse_statistic::FuseStatisticsFunc;
51+
pub use fuse_tag::FuseTagFunc;
5052
pub use fuse_time_travel_size::FuseTimeTravelSize;
5153
pub use fuse_time_travel_size::FuseTimeTravelSizeFunc;
5254
pub use fuse_vacuum_drop_aggregating_index::FuseVacuumDropAggregatingIndex;

tests/sqllogictests/suites/base/06_show/06_0014_show_table_functions.test

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ fuse_page
1717
fuse_segment
1818
fuse_snapshot
1919
fuse_statistic
20+
fuse_tag
2021
fuse_time_travel_size
2122
fuse_vacuum2
2223
fuse_vacuum_drop_aggregating_index

tests/sqllogictests/suites/ee/08_table_ref/08_0000_tag.test

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,25 @@ SELECT SLEEP(2)
102102
statement error 2748
103103
SELECT * FROM t1 at (tag => "temp_tag")
104104

105+
query TB
106+
select name, expire_at is not null from fuse_tag('test_tag','t1') order by name
107+
----
108+
temp_tag 1
109+
v1_0 0
110+
v1_0_copy 0
111+
105112
statement ok
106113
optimize table t1 compact
107114

108115
statement ok
109116
select * from fuse_vacuum2('test_tag', 't1') ignore_result;
110117

111118
## temp_tag was purged.
112-
statement error 2745
113-
SELECT * FROM t1 at (tag => "temp_tag")
119+
query TB
120+
select name, expire_at is not null from fuse_tag('test_tag','t1') order by name
121+
----
122+
v1_0 0
123+
v1_0_copy 0
114124

115125
query IT
116126
SELECT * FROM t1 at (tag => "v1_0") ORDER BY a

0 commit comments

Comments
 (0)