Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pg14 = ["pgrx/pg14", "pgrx-tests/pg14" ]
pg15 = ["pgrx/pg15", "pgrx-tests/pg15" ]
pg16 = ["pgrx/pg16", "pgrx-tests/pg16" ]
pg17 = ["pgrx/pg17", "pgrx-tests/pg17" ]
pg_test = ["tempfile"]
pg_test = ["tempfile", "sqllogictest"]

[dependencies]
pgrx = "=0.14.3"
Expand All @@ -34,10 +34,12 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
chrono = "0.4"
base64 = "0.22"
sqllogictest = { version = "=0.29.0", optional = true }

[dev-dependencies]
pgrx-tests = "=0.14.3"
tempfile = "3.8"
sqllogictest = "=0.29.0"

[dependencies.tempfile]
version = "3.8"
Expand Down
119 changes: 1 addition & 118 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,124 +48,7 @@ fn lance_import(
}

#[cfg(any(test, feature = "pg_test"))]
#[pg_schema]
mod tests {
use arrow::array::{
Array, BooleanArray, Float32Array, Int32Array, ListBuilder, StringArray, StructArray,
};
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use lance_rs::Dataset;
use pgrx::prelude::*;
use std::sync::Arc;
use tempfile::TempDir;

struct LanceTestDataGenerator {
temp_dir: TempDir,
}

impl LanceTestDataGenerator {
fn new() -> Result<Self, Box<dyn std::error::Error>> {
Ok(Self {
temp_dir: TempDir::new()?,
})
}

fn create_table_with_struct_and_list(
&self,
) -> Result<std::path::PathBuf, Box<dyn std::error::Error>> {
let table_path = self.temp_dir.path().join("fdw_table");

let id_array = Int32Array::from(vec![1, 2, 3]);
let name_array = StringArray::from(vec!["Alice", "Bob", "Charlie"]);
let active_array = BooleanArray::from(vec![true, false, true]);

let mut emb_builder = ListBuilder::new(arrow::array::Float32Builder::new());
for embedding in [
vec![0.1, 0.2, 0.3],
vec![0.4, 0.5, 0.6],
vec![0.7, 0.8, 0.9],
] {
for v in embedding {
emb_builder.values().append_value(v);
}
emb_builder.append(true);
}
let emb_array = emb_builder.finish();

let meta_score = Float32Array::from(vec![1.0, 2.0, 3.0]);
let meta_tag = StringArray::from(vec!["a", "b", "c"]);
let meta_struct = StructArray::from(vec![
(
Arc::new(Field::new("score", DataType::Float32, false)),
Arc::new(meta_score) as _,
),
(
Arc::new(Field::new("tag", DataType::Utf8, false)),
Arc::new(meta_tag) as _,
),
]);

let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
Field::new("active", DataType::Boolean, false),
Field::new(
"embedding",
DataType::List(Arc::new(Field::new("item", DataType::Float32, true))),
false,
),
Field::new("meta", meta_struct.data_type().clone(), false),
]));

let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(id_array),
Arc::new(name_array),
Arc::new(active_array),
Arc::new(emb_array),
Arc::new(meta_struct),
],
)?;

let reader = arrow::record_batch::RecordBatchIterator::new(vec![Ok(batch)], schema);
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
Dataset::write(reader, table_path.to_str().unwrap(), None).await
})?;

Ok(table_path)
}
}

#[pg_test]
fn test_fdw_import_and_scan() {
let gen = LanceTestDataGenerator::new().expect("generator");
let path = gen
.create_table_with_struct_and_list()
.expect("create table");
let uri = path.to_str().unwrap();

Spi::run("CREATE SERVER lance_srv FOREIGN DATA WRAPPER lance_fdw").expect("create server");

let import_sql = format!(
"SELECT lance_import('lance_srv', 'public', 't_fdw', '{}', NULL)",
uri.replace('\'', "''")
);
Spi::run(&import_sql).expect("lance_import");

let cnt = Spi::get_one::<i64>("SELECT count(*) FROM public.t_fdw")
.expect("count")
.expect("count value");
assert_eq!(cnt, 3);

let v = Spi::get_one::<String>("SELECT name FROM public.t_fdw WHERE id = 2")
.expect("select")
.expect("value");
assert_eq!(v, "Bob");
}
}
mod tests;

#[cfg(test)]
pub mod pg_test {
Expand Down
Loading
Loading