Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions sqlite_utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -1905,18 +1905,44 @@ def transform(
column_order=column_order,
keep_table=keep_table,
)
views = self.db.execute(
"select name, sql from sqlite_master where type = 'view' "
"and sql is not null order by rowid"
).fetchall()
view_triggers = []
if views:
view_triggers = self.db.execute(
"select name, sql from sqlite_master where type = 'trigger' "
"and sql is not null and tbl_name in "
"(select name from sqlite_master where type = 'view') order by rowid"
).fetchall()
pragma_foreign_keys_was_on = self.db.execute("PRAGMA foreign_keys").fetchone()[
0
]
try:
if pragma_foreign_keys_was_on:
self.db.execute("PRAGMA foreign_keys=0;")
with self.db.conn:
if views and not self.db.conn.in_transaction:
self.db.execute("BEGIN;")
for view_name, _ in views:
self.db.execute("DROP VIEW {}".format(quote_identifier(view_name)))
for sql in sqls:
self.db.execute(sql)
# Run the foreign_key_check before we commit
if pragma_foreign_keys_was_on:
self.db.execute("PRAGMA foreign_key_check;")
for view_name, view_sql in views:
try:
self.db.execute(view_sql)
except sqlite3.OperationalError as e:
raise TransformError(
f"View '{view_name}' could not be recreated after transforming table '{self.name}'. "
"It may reference columns that were dropped or renamed. "
"You must recreate this view manually."
) from e
for _, trigger_sql in view_triggers:
self.db.execute(trigger_sql)
finally:
if pragma_foreign_keys_was_on:
self.db.execute("PRAGMA foreign_keys=1;")
Expand Down
48 changes: 48 additions & 0 deletions tests/test_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,3 +659,51 @@ def test_transform_with_unique_constraint_implicit_index(fresh_db):
"You must manually drop this index prior to running this transformation and manually recreate the new index after running this transformation."
in str(excinfo.value)
)


def test_transform_recreates_views(fresh_db):
dogs = fresh_db["dogs"]
dogs.insert({"id": 1, "name": "Cleo", "age": 5}, pk="id")
fresh_db.create_view("dog_names", "select id, name from dogs")

dogs.transform(drop={"age"})

assert list(fresh_db.query("select * from dog_names")) == [
{"id": 1, "name": "Cleo"}
]


def test_transform_recreates_triggers_on_views(fresh_db):
dogs = fresh_db["dogs"]
dogs.insert({"id": 1, "name": "Cleo", "age": 5}, pk="id")
fresh_db.execute("create table dog_log (id integer primary key, name text)")
fresh_db.create_view("dog_names", "select id, name from dogs")
fresh_db.execute("""
create trigger dog_names_insert
instead of insert on dog_names
begin
insert into dog_log (id, name) values (new.id, new.name);
end;
""")

dogs.transform(drop={"age"})

fresh_db.execute(
"insert into dog_names (id, name) values (?, ?)",
(2, "Mabel"),
)

assert list(fresh_db["dog_log"].rows) == [{"id": 2, "name": "Mabel"}]


def test_transform_does_not_execute_unrelated_views(fresh_db):
dogs = fresh_db["dogs"]
dogs.insert({"id": 1, "name": "Cleo", "age": 5}, pk="id")
fresh_db["other"].insert({"x": 1})
fresh_db.execute(
"create view other_values as select app_func(x) as value from other"
)

dogs.transform(drop={"age"})

assert "other_values" in fresh_db.view_names()
Loading