You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							382 lines
						
					
					
						
							12 KiB
						
					
					
				
			
		
		
	
	
							382 lines
						
					
					
						
							12 KiB
						
					
					
				import random
 | 
						|
 | 
						|
from . import testing
 | 
						|
from .. import config
 | 
						|
from .. import fixtures
 | 
						|
from .. import util
 | 
						|
from ..assertions import eq_
 | 
						|
from ..assertions import is_false
 | 
						|
from ..assertions import is_true
 | 
						|
from ..config import requirements
 | 
						|
from ..schema import Table
 | 
						|
from ... import CheckConstraint
 | 
						|
from ... import Column
 | 
						|
from ... import ForeignKeyConstraint
 | 
						|
from ... import Index
 | 
						|
from ... import inspect
 | 
						|
from ... import Integer
 | 
						|
from ... import schema
 | 
						|
from ... import String
 | 
						|
from ... import UniqueConstraint
 | 
						|
 | 
						|
 | 
						|
class TableDDLTest(fixtures.TestBase):
 | 
						|
    __backend__ = True
 | 
						|
 | 
						|
    def _simple_fixture(self, schema=None):
 | 
						|
        return Table(
 | 
						|
            "test_table",
 | 
						|
            self.metadata,
 | 
						|
            Column("id", Integer, primary_key=True, autoincrement=False),
 | 
						|
            Column("data", String(50)),
 | 
						|
            schema=schema,
 | 
						|
        )
 | 
						|
 | 
						|
    def _underscore_fixture(self):
 | 
						|
        return Table(
 | 
						|
            "_test_table",
 | 
						|
            self.metadata,
 | 
						|
            Column("id", Integer, primary_key=True, autoincrement=False),
 | 
						|
            Column("_data", String(50)),
 | 
						|
        )
 | 
						|
 | 
						|
    def _table_index_fixture(self, schema=None):
 | 
						|
        table = self._simple_fixture(schema=schema)
 | 
						|
        idx = Index("test_index", table.c.data)
 | 
						|
        return table, idx
 | 
						|
 | 
						|
    def _simple_roundtrip(self, table):
 | 
						|
        with config.db.begin() as conn:
 | 
						|
            conn.execute(table.insert().values((1, "some data")))
 | 
						|
            result = conn.execute(table.select())
 | 
						|
            eq_(result.first(), (1, "some data"))
 | 
						|
 | 
						|
    @requirements.create_table
 | 
						|
    @util.provide_metadata
 | 
						|
    def test_create_table(self):
 | 
						|
        table = self._simple_fixture()
 | 
						|
        table.create(config.db, checkfirst=False)
 | 
						|
        self._simple_roundtrip(table)
 | 
						|
 | 
						|
    @requirements.create_table
 | 
						|
    @requirements.schemas
 | 
						|
    @util.provide_metadata
 | 
						|
    def test_create_table_schema(self):
 | 
						|
        table = self._simple_fixture(schema=config.test_schema)
 | 
						|
        table.create(config.db, checkfirst=False)
 | 
						|
        self._simple_roundtrip(table)
 | 
						|
 | 
						|
    @requirements.drop_table
 | 
						|
    @util.provide_metadata
 | 
						|
    def test_drop_table(self):
 | 
						|
        table = self._simple_fixture()
 | 
						|
        table.create(config.db, checkfirst=False)
 | 
						|
        table.drop(config.db, checkfirst=False)
 | 
						|
 | 
						|
    @requirements.create_table
 | 
						|
    @util.provide_metadata
 | 
						|
    def test_underscore_names(self):
 | 
						|
        table = self._underscore_fixture()
 | 
						|
        table.create(config.db, checkfirst=False)
 | 
						|
        self._simple_roundtrip(table)
 | 
						|
 | 
						|
    @requirements.comment_reflection
 | 
						|
    @util.provide_metadata
 | 
						|
    def test_add_table_comment(self, connection):
 | 
						|
        table = self._simple_fixture()
 | 
						|
        table.create(connection, checkfirst=False)
 | 
						|
        table.comment = "a comment"
 | 
						|
        connection.execute(schema.SetTableComment(table))
 | 
						|
        eq_(
 | 
						|
            inspect(connection).get_table_comment("test_table"),
 | 
						|
            {"text": "a comment"},
 | 
						|
        )
 | 
						|
 | 
						|
    @requirements.comment_reflection
 | 
						|
    @util.provide_metadata
 | 
						|
    def test_drop_table_comment(self, connection):
 | 
						|
        table = self._simple_fixture()
 | 
						|
        table.create(connection, checkfirst=False)
 | 
						|
        table.comment = "a comment"
 | 
						|
        connection.execute(schema.SetTableComment(table))
 | 
						|
        connection.execute(schema.DropTableComment(table))
 | 
						|
        eq_(
 | 
						|
            inspect(connection).get_table_comment("test_table"), {"text": None}
 | 
						|
        )
 | 
						|
 | 
						|
    @requirements.table_ddl_if_exists
 | 
						|
    @util.provide_metadata
 | 
						|
    def test_create_table_if_not_exists(self, connection):
 | 
						|
        table = self._simple_fixture()
 | 
						|
 | 
						|
        connection.execute(schema.CreateTable(table, if_not_exists=True))
 | 
						|
 | 
						|
        is_true(inspect(connection).has_table("test_table"))
 | 
						|
        connection.execute(schema.CreateTable(table, if_not_exists=True))
 | 
						|
 | 
						|
    @requirements.index_ddl_if_exists
 | 
						|
    @util.provide_metadata
 | 
						|
    def test_create_index_if_not_exists(self, connection):
 | 
						|
        table, idx = self._table_index_fixture()
 | 
						|
 | 
						|
        connection.execute(schema.CreateTable(table, if_not_exists=True))
 | 
						|
        is_true(inspect(connection).has_table("test_table"))
 | 
						|
        is_false(
 | 
						|
            "test_index"
 | 
						|
            in [
 | 
						|
                ix["name"]
 | 
						|
                for ix in inspect(connection).get_indexes("test_table")
 | 
						|
            ]
 | 
						|
        )
 | 
						|
 | 
						|
        connection.execute(schema.CreateIndex(idx, if_not_exists=True))
 | 
						|
 | 
						|
        is_true(
 | 
						|
            "test_index"
 | 
						|
            in [
 | 
						|
                ix["name"]
 | 
						|
                for ix in inspect(connection).get_indexes("test_table")
 | 
						|
            ]
 | 
						|
        )
 | 
						|
 | 
						|
        connection.execute(schema.CreateIndex(idx, if_not_exists=True))
 | 
						|
 | 
						|
    @requirements.table_ddl_if_exists
 | 
						|
    @util.provide_metadata
 | 
						|
    def test_drop_table_if_exists(self, connection):
 | 
						|
        table = self._simple_fixture()
 | 
						|
 | 
						|
        table.create(connection)
 | 
						|
 | 
						|
        is_true(inspect(connection).has_table("test_table"))
 | 
						|
 | 
						|
        connection.execute(schema.DropTable(table, if_exists=True))
 | 
						|
 | 
						|
        is_false(inspect(connection).has_table("test_table"))
 | 
						|
 | 
						|
        connection.execute(schema.DropTable(table, if_exists=True))
 | 
						|
 | 
						|
    @requirements.index_ddl_if_exists
 | 
						|
    @util.provide_metadata
 | 
						|
    def test_drop_index_if_exists(self, connection):
 | 
						|
        table, idx = self._table_index_fixture()
 | 
						|
 | 
						|
        table.create(connection)
 | 
						|
 | 
						|
        is_true(
 | 
						|
            "test_index"
 | 
						|
            in [
 | 
						|
                ix["name"]
 | 
						|
                for ix in inspect(connection).get_indexes("test_table")
 | 
						|
            ]
 | 
						|
        )
 | 
						|
 | 
						|
        connection.execute(schema.DropIndex(idx, if_exists=True))
 | 
						|
 | 
						|
        is_false(
 | 
						|
            "test_index"
 | 
						|
            in [
 | 
						|
                ix["name"]
 | 
						|
                for ix in inspect(connection).get_indexes("test_table")
 | 
						|
            ]
 | 
						|
        )
 | 
						|
 | 
						|
        connection.execute(schema.DropIndex(idx, if_exists=True))
 | 
						|
 | 
						|
 | 
						|
class FutureTableDDLTest(fixtures.FutureEngineMixin, TableDDLTest):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class LongNameBlowoutTest(fixtures.TestBase):
 | 
						|
    """test the creation of a variety of DDL structures and ensure
 | 
						|
    label length limits pass on backends
 | 
						|
 | 
						|
    """
 | 
						|
 | 
						|
    __backend__ = True
 | 
						|
 | 
						|
    def fk(self, metadata, connection):
 | 
						|
        convention = {
 | 
						|
            "fk": "foreign_key_%(table_name)s_"
 | 
						|
            "%(column_0_N_name)s_"
 | 
						|
            "%(referred_table_name)s_"
 | 
						|
            + (
 | 
						|
                "_".join(
 | 
						|
                    "".join(random.choice("abcdef") for j in range(20))
 | 
						|
                    for i in range(10)
 | 
						|
                )
 | 
						|
            ),
 | 
						|
        }
 | 
						|
        metadata.naming_convention = convention
 | 
						|
 | 
						|
        Table(
 | 
						|
            "a_things_with_stuff",
 | 
						|
            metadata,
 | 
						|
            Column("id_long_column_name", Integer, primary_key=True),
 | 
						|
            test_needs_fk=True,
 | 
						|
        )
 | 
						|
 | 
						|
        cons = ForeignKeyConstraint(
 | 
						|
            ["aid"], ["a_things_with_stuff.id_long_column_name"]
 | 
						|
        )
 | 
						|
        Table(
 | 
						|
            "b_related_things_of_value",
 | 
						|
            metadata,
 | 
						|
            Column(
 | 
						|
                "aid",
 | 
						|
            ),
 | 
						|
            cons,
 | 
						|
            test_needs_fk=True,
 | 
						|
        )
 | 
						|
        actual_name = cons.name
 | 
						|
 | 
						|
        metadata.create_all(connection)
 | 
						|
 | 
						|
        if testing.requires.foreign_key_constraint_name_reflection.enabled:
 | 
						|
            insp = inspect(connection)
 | 
						|
            fks = insp.get_foreign_keys("b_related_things_of_value")
 | 
						|
            reflected_name = fks[0]["name"]
 | 
						|
 | 
						|
            return actual_name, reflected_name
 | 
						|
        else:
 | 
						|
            return actual_name, None
 | 
						|
 | 
						|
    def pk(self, metadata, connection):
 | 
						|
        convention = {
 | 
						|
            "pk": "primary_key_%(table_name)s_"
 | 
						|
            "%(column_0_N_name)s"
 | 
						|
            + (
 | 
						|
                "_".join(
 | 
						|
                    "".join(random.choice("abcdef") for j in range(30))
 | 
						|
                    for i in range(10)
 | 
						|
                )
 | 
						|
            ),
 | 
						|
        }
 | 
						|
        metadata.naming_convention = convention
 | 
						|
 | 
						|
        a = Table(
 | 
						|
            "a_things_with_stuff",
 | 
						|
            metadata,
 | 
						|
            Column("id_long_column_name", Integer, primary_key=True),
 | 
						|
            Column("id_another_long_name", Integer, primary_key=True),
 | 
						|
        )
 | 
						|
        cons = a.primary_key
 | 
						|
        actual_name = cons.name
 | 
						|
 | 
						|
        metadata.create_all(connection)
 | 
						|
        insp = inspect(connection)
 | 
						|
        pk = insp.get_pk_constraint("a_things_with_stuff")
 | 
						|
        reflected_name = pk["name"]
 | 
						|
        return actual_name, reflected_name
 | 
						|
 | 
						|
    def ix(self, metadata, connection):
 | 
						|
        convention = {
 | 
						|
            "ix": "index_%(table_name)s_"
 | 
						|
            "%(column_0_N_name)s"
 | 
						|
            + (
 | 
						|
                "_".join(
 | 
						|
                    "".join(random.choice("abcdef") for j in range(30))
 | 
						|
                    for i in range(10)
 | 
						|
                )
 | 
						|
            ),
 | 
						|
        }
 | 
						|
        metadata.naming_convention = convention
 | 
						|
 | 
						|
        a = Table(
 | 
						|
            "a_things_with_stuff",
 | 
						|
            metadata,
 | 
						|
            Column("id_long_column_name", Integer, primary_key=True),
 | 
						|
            Column("id_another_long_name", Integer),
 | 
						|
        )
 | 
						|
        cons = Index(None, a.c.id_long_column_name, a.c.id_another_long_name)
 | 
						|
        actual_name = cons.name
 | 
						|
 | 
						|
        metadata.create_all(connection)
 | 
						|
        insp = inspect(connection)
 | 
						|
        ix = insp.get_indexes("a_things_with_stuff")
 | 
						|
        reflected_name = ix[0]["name"]
 | 
						|
        return actual_name, reflected_name
 | 
						|
 | 
						|
    def uq(self, metadata, connection):
 | 
						|
        convention = {
 | 
						|
            "uq": "unique_constraint_%(table_name)s_"
 | 
						|
            "%(column_0_N_name)s"
 | 
						|
            + (
 | 
						|
                "_".join(
 | 
						|
                    "".join(random.choice("abcdef") for j in range(30))
 | 
						|
                    for i in range(10)
 | 
						|
                )
 | 
						|
            ),
 | 
						|
        }
 | 
						|
        metadata.naming_convention = convention
 | 
						|
 | 
						|
        cons = UniqueConstraint("id_long_column_name", "id_another_long_name")
 | 
						|
        Table(
 | 
						|
            "a_things_with_stuff",
 | 
						|
            metadata,
 | 
						|
            Column("id_long_column_name", Integer, primary_key=True),
 | 
						|
            Column("id_another_long_name", Integer),
 | 
						|
            cons,
 | 
						|
        )
 | 
						|
        actual_name = cons.name
 | 
						|
 | 
						|
        metadata.create_all(connection)
 | 
						|
        insp = inspect(connection)
 | 
						|
        uq = insp.get_unique_constraints("a_things_with_stuff")
 | 
						|
        reflected_name = uq[0]["name"]
 | 
						|
        return actual_name, reflected_name
 | 
						|
 | 
						|
    def ck(self, metadata, connection):
 | 
						|
        convention = {
 | 
						|
            "ck": "check_constraint_%(table_name)s"
 | 
						|
            + (
 | 
						|
                "_".join(
 | 
						|
                    "".join(random.choice("abcdef") for j in range(30))
 | 
						|
                    for i in range(10)
 | 
						|
                )
 | 
						|
            ),
 | 
						|
        }
 | 
						|
        metadata.naming_convention = convention
 | 
						|
 | 
						|
        cons = CheckConstraint("some_long_column_name > 5")
 | 
						|
        Table(
 | 
						|
            "a_things_with_stuff",
 | 
						|
            metadata,
 | 
						|
            Column("id_long_column_name", Integer, primary_key=True),
 | 
						|
            Column("some_long_column_name", Integer),
 | 
						|
            cons,
 | 
						|
        )
 | 
						|
        actual_name = cons.name
 | 
						|
 | 
						|
        metadata.create_all(connection)
 | 
						|
        insp = inspect(connection)
 | 
						|
        ck = insp.get_check_constraints("a_things_with_stuff")
 | 
						|
        reflected_name = ck[0]["name"]
 | 
						|
        return actual_name, reflected_name
 | 
						|
 | 
						|
    @testing.combinations(
 | 
						|
        ("fk",),
 | 
						|
        ("pk",),
 | 
						|
        ("ix",),
 | 
						|
        ("ck", testing.requires.check_constraint_reflection.as_skips()),
 | 
						|
        ("uq", testing.requires.unique_constraint_reflection.as_skips()),
 | 
						|
        argnames="type_",
 | 
						|
    )
 | 
						|
    def test_long_convention_name(self, type_, metadata, connection):
 | 
						|
        actual_name, reflected_name = getattr(self, type_)(
 | 
						|
            metadata, connection
 | 
						|
        )
 | 
						|
 | 
						|
        assert len(actual_name) > 255
 | 
						|
 | 
						|
        if reflected_name is not None:
 | 
						|
            overlap = actual_name[0 : len(reflected_name)]
 | 
						|
            if len(overlap) < len(actual_name):
 | 
						|
                eq_(overlap[0:-5], reflected_name[0 : len(overlap) - 5])
 | 
						|
            else:
 | 
						|
                eq_(overlap, reflected_name)
 | 
						|
 | 
						|
 | 
						|
__all__ = ("TableDDLTest", "FutureTableDDLTest", "LongNameBlowoutTest")
 |