Skip to content

Commit 6db264c

Browse files
committed
test(spanner): add integration test for inline-begin error handling
Adds an integration test for error handling for inline-begin-transaction. This test uses a gRPC proxy to intercept calls from the client to Spanner to be able to deterministically emulate specific concurrency issues. This test shows how a query that failed during the first attempt, and thereby also failed to start the transaction, could succeed during a retry after the transaction has been started with an explicit BeginTransaction RPC.
1 parent a0cfa91 commit 6db264c

10 files changed

Lines changed: 463 additions & 387 deletions

File tree

Cargo.lock

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deny.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ wrappers = [
115115
# Use in tests is fine.
116116
"grpc-server",
117117
"integration-tests-o11y",
118+
"integration-tests-spanner",
118119
"pubsub-grpc-mock",
119120
"spanner-grpc-mock",
120121
"storage-grpc-mock",

src/spanner/grpc-mock/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ pub mod google {
6464
include!("generated/protos/google.rpc.rs");
6565
}
6666
pub mod spanner {
67+
#[allow(rustdoc::broken_intra_doc_links, rustdoc::bare_urls)]
6768
pub mod v1 {
6869
include!("generated/protos/google.spanner.v1.rs");
6970
}

src/spanner/src/read_only_transaction.rs

Lines changed: 0 additions & 341 deletions
Original file line numberDiff line numberDiff line change
@@ -1425,345 +1425,4 @@ pub(crate) mod tests {
14251425

14261426
Ok(())
14271427
}
1428-
1429-
#[tokio::test]
1430-
async fn inline_begin_failure_retry_success() -> anyhow::Result<()> {
1431-
use crate::value::Value;
1432-
use gaxi::grpc::tonic::Response;
1433-
use gaxi::grpc::tonic::Status;
1434-
1435-
let mut mock = create_session_mock();
1436-
let mut seq = mockall::Sequence::new();
1437-
1438-
// 1. Initial query fails
1439-
mock.expect_execute_streaming_sql()
1440-
.times(1)
1441-
.in_sequence(&mut seq)
1442-
.returning(|_| Err(Status::internal("Internal error")));
1443-
1444-
// 2. Explicit begin transaction succeeds
1445-
mock.expect_begin_transaction()
1446-
.times(1)
1447-
.in_sequence(&mut seq)
1448-
.returning(|req| {
1449-
let req = req.into_inner();
1450-
assert_eq!(
1451-
req.session,
1452-
"projects/p/instances/i/databases/d/sessions/123"
1453-
);
1454-
// Return a transaction with ID
1455-
Ok(Response::new(mock_v1::Transaction {
1456-
id: vec![7, 8, 9],
1457-
read_timestamp: Some(prost_types::Timestamp {
1458-
seconds: 123456789,
1459-
nanos: 0,
1460-
}),
1461-
..Default::default()
1462-
}))
1463-
});
1464-
1465-
// 3. Retry of the query succeeds
1466-
mock.expect_execute_streaming_sql()
1467-
.times(1)
1468-
.in_sequence(&mut seq)
1469-
.returning(|req| {
1470-
let req = req.into_inner();
1471-
// Ensure it uses the new transaction ID
1472-
match req.transaction.unwrap().selector.unwrap() {
1473-
mock_v1::transaction_selector::Selector::Id(id) => {
1474-
assert_eq!(id, vec![7, 8, 9]);
1475-
}
1476-
_ => panic!("Expected Selector::Id"),
1477-
}
1478-
Ok(Response::new(Box::pin(tokio_stream::iter(vec![Ok(
1479-
setup_select1(),
1480-
)]))))
1481-
});
1482-
1483-
let (db_client, _server) = setup_db_client(mock).await;
1484-
let tx = db_client
1485-
.read_only_transaction()
1486-
.with_explicit_begin_transaction(false)
1487-
.build()
1488-
.await?;
1489-
1490-
let mut rs = tx
1491-
.execute_query(Statement::builder("SELECT 1").build())
1492-
.await?;
1493-
1494-
let row = rs
1495-
.next()
1496-
.await
1497-
.ok_or_else(|| anyhow::anyhow!("Expected a row but stream cleanly exhausted"))??;
1498-
assert_eq!(
1499-
row.raw_values(),
1500-
[Value(string_val("1"))],
1501-
"The parsed row value safely matched the underlying stream chunk"
1502-
);
1503-
1504-
Ok(())
1505-
}
1506-
1507-
#[tokio::test]
1508-
async fn inline_begin_failure_retry_failure() -> anyhow::Result<()> {
1509-
use gaxi::grpc::tonic::Response;
1510-
use gaxi::grpc::tonic::Status;
1511-
1512-
let mut mock = create_session_mock();
1513-
let mut seq = mockall::Sequence::new();
1514-
1515-
// 1. Initial query fails
1516-
mock.expect_execute_streaming_sql()
1517-
.times(1)
1518-
.in_sequence(&mut seq)
1519-
.returning(|_| Err(Status::internal("Internal error first")));
1520-
1521-
// 2. Explicit begin transaction succeeds
1522-
mock.expect_begin_transaction()
1523-
.times(1)
1524-
.in_sequence(&mut seq)
1525-
.returning(|_| {
1526-
Ok(Response::new(mock_v1::Transaction {
1527-
id: vec![7, 8, 9],
1528-
read_timestamp: Some(prost_types::Timestamp {
1529-
seconds: 123456789,
1530-
nanos: 0,
1531-
}),
1532-
..Default::default()
1533-
}))
1534-
});
1535-
1536-
// 3. Retry of the query fails again
1537-
mock.expect_execute_streaming_sql()
1538-
.times(1)
1539-
.in_sequence(&mut seq)
1540-
.returning(|_| Err(Status::internal("Internal error second")));
1541-
1542-
let (db_client, _server) = setup_db_client(mock).await;
1543-
let tx = db_client
1544-
.read_only_transaction()
1545-
.with_explicit_begin_transaction(false)
1546-
.build()
1547-
.await?;
1548-
1549-
let rs_result = tx
1550-
.execute_query(Statement::builder("SELECT 1").build())
1551-
.await;
1552-
1553-
assert!(
1554-
rs_result.is_err(),
1555-
"The failed execution bubbled upwards securely"
1556-
);
1557-
let err_str = rs_result.unwrap_err().to_string();
1558-
assert!(
1559-
err_str.contains("Internal error second"),
1560-
"Secondary error message accurately propagates: {}",
1561-
err_str
1562-
);
1563-
1564-
Ok(())
1565-
}
1566-
1567-
#[tokio::test]
1568-
async fn inline_begin_failure_fallback_rpc_fails() -> anyhow::Result<()> {
1569-
use gaxi::grpc::tonic::Status;
1570-
1571-
let mut mock = create_session_mock();
1572-
let mut seq = mockall::Sequence::new();
1573-
1574-
// 1. Initial query fails
1575-
mock.expect_execute_streaming_sql()
1576-
.times(1)
1577-
.in_sequence(&mut seq)
1578-
.returning(|_| Err(Status::internal("Internal error query")));
1579-
1580-
// 2. Explicit begin transaction fails
1581-
mock.expect_begin_transaction()
1582-
.times(1)
1583-
.in_sequence(&mut seq)
1584-
.returning(|_| Err(Status::internal("Internal error begin tx")));
1585-
1586-
let (db_client, _server) = setup_db_client(mock).await;
1587-
let tx = db_client
1588-
.read_only_transaction()
1589-
.with_explicit_begin_transaction(false)
1590-
.build()
1591-
.await?;
1592-
1593-
let rs_result = tx
1594-
.execute_query(Statement::builder("SELECT 1").build())
1595-
.await;
1596-
1597-
assert!(
1598-
rs_result.is_err(),
1599-
"The explicitly errored fallback boot securely propagated outwards"
1600-
);
1601-
let err_str = rs_result.unwrap_err().to_string();
1602-
assert!(
1603-
err_str.contains("Internal error begin tx"),
1604-
"Natively propagated specific BeginTx bounds: {}",
1605-
err_str
1606-
);
1607-
1608-
Ok(())
1609-
}
1610-
1611-
#[tokio::test]
1612-
async fn inline_begin_read_failure_retry_success() -> anyhow::Result<()> {
1613-
use crate::client::{KeySet, ReadRequest};
1614-
use crate::value::Value;
1615-
use gaxi::grpc::tonic::Response;
1616-
use gaxi::grpc::tonic::Status;
1617-
1618-
let mut mock = create_session_mock();
1619-
let mut seq = mockall::Sequence::new();
1620-
1621-
// 1. Initial read fails
1622-
mock.expect_streaming_read()
1623-
.times(1)
1624-
.in_sequence(&mut seq)
1625-
.returning(|_| Err(Status::internal("Internal error")));
1626-
1627-
// 2. Explicit begin transaction succeeds
1628-
mock.expect_begin_transaction()
1629-
.times(1)
1630-
.in_sequence(&mut seq)
1631-
.returning(|_| {
1632-
Ok(Response::new(mock_v1::Transaction {
1633-
id: vec![7, 8, 9],
1634-
read_timestamp: None,
1635-
..Default::default()
1636-
}))
1637-
});
1638-
1639-
// 3. Retry of the read succeeds
1640-
mock.expect_streaming_read()
1641-
.times(1)
1642-
.in_sequence(&mut seq)
1643-
.returning(|req| {
1644-
let req = req.into_inner();
1645-
// Ensure it uses the new transaction ID
1646-
match req.transaction.unwrap().selector.unwrap() {
1647-
mock_v1::transaction_selector::Selector::Id(id) => {
1648-
assert_eq!(id, vec![7, 8, 9]);
1649-
}
1650-
_ => panic!("Expected Selector::Id"),
1651-
}
1652-
Ok(Response::new(Box::pin(tokio_stream::iter(vec![Ok(
1653-
setup_select1(),
1654-
)]))))
1655-
});
1656-
1657-
let (db_client, _server) = setup_db_client(mock).await;
1658-
let tx = db_client
1659-
.read_only_transaction()
1660-
.with_explicit_begin_transaction(false)
1661-
.build()
1662-
.await?;
1663-
1664-
let read = ReadRequest::builder("Users", vec!["Id", "Name"])
1665-
.with_keys(KeySet::all())
1666-
.build();
1667-
let mut rs = tx.execute_read(read).await?;
1668-
1669-
let row = rs
1670-
.next()
1671-
.await
1672-
.ok_or_else(|| anyhow::anyhow!("Expected a row uniquely returned"))??;
1673-
assert_eq!(
1674-
row.raw_values(),
1675-
[Value(string_val("1"))],
1676-
"The macro correctly unpacked read arrays seamlessly"
1677-
);
1678-
1679-
Ok(())
1680-
}
1681-
1682-
#[tokio::test]
1683-
async fn single_use_query_send_error_returns_immediately() -> anyhow::Result<()> {
1684-
use crate::client::Statement;
1685-
use gaxi::grpc::tonic::Status;
1686-
1687-
let mut mock = create_session_mock();
1688-
1689-
mock.expect_execute_streaming_sql()
1690-
.times(1)
1691-
.returning(|_| Err(Status::internal("Internal error single use query")));
1692-
1693-
mock.expect_begin_transaction().never();
1694-
1695-
let (db_client, _server) = setup_db_client(mock).await;
1696-
// single_use creates a Fixed selector
1697-
let tx = db_client.single_use().build();
1698-
1699-
let rs_result = tx
1700-
.execute_query(Statement::builder("SELECT 1").build())
1701-
.await;
1702-
1703-
assert!(rs_result.is_err());
1704-
let err_str = rs_result.unwrap_err().to_string();
1705-
assert!(err_str.contains("Internal error single use query"));
1706-
1707-
Ok(())
1708-
}
1709-
1710-
#[tokio::test]
1711-
async fn inline_begin_already_started_query_send_error_returns_immediately()
1712-
-> anyhow::Result<()> {
1713-
use crate::client::Statement;
1714-
use gaxi::grpc::tonic::Status;
1715-
use spanner_grpc_mock::google::spanner::v1 as mock_v1;
1716-
1717-
let mut mock = create_session_mock();
1718-
let mut seq = mockall::Sequence::new();
1719-
1720-
mock.expect_begin_transaction().never();
1721-
1722-
// 1. First query executes successfully and implicitly starts the transaction.
1723-
mock.expect_execute_streaming_sql()
1724-
.times(1)
1725-
.in_sequence(&mut seq)
1726-
.returning(move |_req| {
1727-
let mut rs = setup_select1();
1728-
rs.metadata.as_mut().unwrap().transaction = Some(mock_v1::Transaction {
1729-
id: vec![4, 5, 6],
1730-
read_timestamp: None,
1731-
..Default::default()
1732-
});
1733-
Ok(gaxi::grpc::tonic::Response::new(Box::pin(
1734-
tokio_stream::iter(vec![Ok(rs)]),
1735-
)))
1736-
});
1737-
1738-
// 2. Second query fails immediately upon send()
1739-
mock.expect_execute_streaming_sql()
1740-
.times(1)
1741-
.in_sequence(&mut seq)
1742-
.returning(|_| Err(Status::internal("Internal error second query")));
1743-
1744-
let (db_client, _server) = setup_db_client(mock).await;
1745-
1746-
let tx = db_client
1747-
.read_only_transaction()
1748-
.with_explicit_begin_transaction(false)
1749-
.build()
1750-
.await?;
1751-
1752-
// Run first query (starts tx)
1753-
let mut rs = tx
1754-
.execute_query(Statement::builder("SELECT 1").build())
1755-
.await?;
1756-
let _ = rs.next().await.expect("has row")?;
1757-
1758-
// Run second query (fails)
1759-
let rs_result = tx
1760-
.execute_query(Statement::builder("SELECT 2").build())
1761-
.await;
1762-
1763-
assert!(rs_result.is_err());
1764-
let err_str = rs_result.unwrap_err().to_string();
1765-
assert!(err_str.contains("Internal error second query"));
1766-
1767-
Ok(())
1768-
}
17691428
}

tests/spanner/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@ google-cloud-test-utils = { workspace = true }
3636
prost-types.workspace = true
3737
reqwest = { workspace = true, features = ["json"] }
3838
serde_json = { workspace = true }
39+
spanner-grpc-mock = { path = "../../src/spanner/grpc-mock" }
3940
tokio = { workspace = true, features = ["sync"] }
41+
tokio-stream = { workspace = true }
42+
tonic = { workspace = true }
4043
tracing.workspace = true
4144

4245
[lints]

0 commit comments

Comments
 (0)