Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions alembic/operations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,9 @@ def batch_alter_table(
:ref:`batch_migrations`

"""
if naming_convention is None:
naming_convention = self.schema_obj.metadata().naming_convention

impl = batch.BatchOperationsImpl(
self,
table_name,
Expand Down
14 changes: 12 additions & 2 deletions alembic/operations/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ def _adjust_self_columns_for_partial_reordering(self) -> None:
def _transfer_elements_to_new_table(self) -> None:
assert self.new_table is None, "Can only create new table once"

m = MetaData()
m = MetaData(naming_convention=self.table.metadata.naming_convention)
schema = self.table.schema

if self.partial_reordering or self.add_col_ordering:
Expand Down Expand Up @@ -537,6 +537,11 @@ def alter_column(

existing.type = type_

if isinstance(existing.type, SchemaEventTarget):
existing.type._create_events = ( # type:ignore[attr-defined]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems like a good place for a sqla_compat function. there's not a fixed way to remove the events from a SchemaType and it might change.

existing.type.create_constraint # type:ignore[attr-defined] # noqa
) = False

# we *dont* however set events for the new type, because
# alter_column is invoked from
# Operations.implementation_for(alter_column) which already
Expand Down Expand Up @@ -618,7 +623,12 @@ def add_column(
)
# we copy the column because operations.add_column()
# gives us a Column that is part of a Table already.
self.columns[column.name] = _copy(column, schema=self.table.schema)
new_column = _copy(column, schema=self.table.schema)
if isinstance(new_column.type, SchemaEventTarget):
new_column.type._create_events = ( # type:ignore[attr-defined]
new_column.type.create_constraint # type:ignore[attr-defined] # noqa
) = False
self.columns[new_column.name] = new_column
self.column_transfers[column.name] = {}

def drop_column(
Expand Down
34 changes: 34 additions & 0 deletions tests/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,40 @@ def test_rename_col_boolean(self):
1,
)

def test_add_column_boolean_convention(self):
m = MetaData(
naming_convention={"ck": "ck_%(table_name)s_%(constraint_name)s"}
)
t = Table(
"tname",
m,
Column("id", Integer, primary_key=True),
)
impl = ApplyBatchImpl(self.impl, t, (), {}, False)
impl.add_column(
"tname",
Column("flag", Boolean(create_constraint=True, name="ck1")),
)
ck = CheckConstraint("flag IN (0, 1)", name="ck1")
t.append_constraint(ck)
impl.add_constraint(ck)
new_table = self._assert_impl(
impl,
ddl_contains="CONSTRAINT ck_tname_ck1 CHECK (flag IN (0, 1))",
colnames=["id", "flag"],
dialect="sqlite",
)
eq_(
len(
[
const
for const in new_table.constraints
if isinstance(const, CheckConstraint)
]
),
1,
)

def test_change_type_schematype_to_non(self):
impl = self._boolean_fixture()
impl.alter_column("tname", "flag", type_=Integer)
Expand Down
Loading