You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					125 lines
				
				4.2 KiB
			
		
		
			
		
	
	
					125 lines
				
				4.2 KiB
			| 
								 
											3 years ago
										 
									 | 
							
								import time
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								from ... import exc
							 | 
						||
| 
								 | 
							
								from ... import inspect
							 | 
						||
| 
								 | 
							
								from ... import text
							 | 
						||
| 
								 | 
							
								from ...testing import warn_test_suite
							 | 
						||
| 
								 | 
							
								from ...testing.provision import create_db
							 | 
						||
| 
								 | 
							
								from ...testing.provision import drop_all_schema_objects_post_tables
							 | 
						||
| 
								 | 
							
								from ...testing.provision import drop_all_schema_objects_pre_tables
							 | 
						||
| 
								 | 
							
								from ...testing.provision import drop_db
							 | 
						||
| 
								 | 
							
								from ...testing.provision import log
							 | 
						||
| 
								 | 
							
								from ...testing.provision import prepare_for_drop_tables
							 | 
						||
| 
								 | 
							
								from ...testing.provision import set_default_schema_on_connection
							 | 
						||
| 
								 | 
							
								from ...testing.provision import temp_table_keyword_args
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								@create_db.for_db("postgresql")
							 | 
						||
| 
								 | 
							
								def _pg_create_db(cfg, eng, ident):
							 | 
						||
| 
								 | 
							
								    template_db = cfg.options.postgresql_templatedb
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    with eng.execution_options(isolation_level="AUTOCOMMIT").begin() as conn:
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        if not template_db:
							 | 
						||
| 
								 | 
							
								            template_db = conn.exec_driver_sql(
							 | 
						||
| 
								 | 
							
								                "select current_database()"
							 | 
						||
| 
								 | 
							
								            ).scalar()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								        attempt = 0
							 | 
						||
| 
								 | 
							
								        while True:
							 | 
						||
| 
								 | 
							
								            try:
							 | 
						||
| 
								 | 
							
								                conn.exec_driver_sql(
							 | 
						||
| 
								 | 
							
								                    "CREATE DATABASE %s TEMPLATE %s" % (ident, template_db)
							 | 
						||
| 
								 | 
							
								                )
							 | 
						||
| 
								 | 
							
								            except exc.OperationalError as err:
							 | 
						||
| 
								 | 
							
								                attempt += 1
							 | 
						||
| 
								 | 
							
								                if attempt >= 3:
							 | 
						||
| 
								 | 
							
								                    raise
							 | 
						||
| 
								 | 
							
								                if "accessed by other users" in str(err):
							 | 
						||
| 
								 | 
							
								                    log.info(
							 | 
						||
| 
								 | 
							
								                        "Waiting to create %s, URI %r, "
							 | 
						||
| 
								 | 
							
								                        "template DB %s is in use sleeping for .5",
							 | 
						||
| 
								 | 
							
								                        ident,
							 | 
						||
| 
								 | 
							
								                        eng.url,
							 | 
						||
| 
								 | 
							
								                        template_db,
							 | 
						||
| 
								 | 
							
								                    )
							 | 
						||
| 
								 | 
							
								                    time.sleep(0.5)
							 | 
						||
| 
								 | 
							
								            except:
							 | 
						||
| 
								 | 
							
								                raise
							 | 
						||
| 
								 | 
							
								            else:
							 | 
						||
| 
								 | 
							
								                break
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								@drop_db.for_db("postgresql")
							 | 
						||
| 
								 | 
							
								def _pg_drop_db(cfg, eng, ident):
							 | 
						||
| 
								 | 
							
								    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
							 | 
						||
| 
								 | 
							
								        with conn.begin():
							 | 
						||
| 
								 | 
							
								            conn.execute(
							 | 
						||
| 
								 | 
							
								                text(
							 | 
						||
| 
								 | 
							
								                    "select pg_terminate_backend(pid) from pg_stat_activity "
							 | 
						||
| 
								 | 
							
								                    "where usename=current_user and pid != pg_backend_pid() "
							 | 
						||
| 
								 | 
							
								                    "and datname=:dname"
							 | 
						||
| 
								 | 
							
								                ),
							 | 
						||
| 
								 | 
							
								                dict(dname=ident),
							 | 
						||
| 
								 | 
							
								            )
							 | 
						||
| 
								 | 
							
								            conn.exec_driver_sql("DROP DATABASE %s" % ident)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								@temp_table_keyword_args.for_db("postgresql")
							 | 
						||
| 
								 | 
							
								def _postgresql_temp_table_keyword_args(cfg, eng):
							 | 
						||
| 
								 | 
							
								    return {"prefixes": ["TEMPORARY"]}
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								@set_default_schema_on_connection.for_db("postgresql")
							 | 
						||
| 
								 | 
							
								def _postgresql_set_default_schema_on_connection(
							 | 
						||
| 
								 | 
							
								    cfg, dbapi_connection, schema_name
							 | 
						||
| 
								 | 
							
								):
							 | 
						||
| 
								 | 
							
								    existing_autocommit = dbapi_connection.autocommit
							 | 
						||
| 
								 | 
							
								    dbapi_connection.autocommit = True
							 | 
						||
| 
								 | 
							
								    cursor = dbapi_connection.cursor()
							 | 
						||
| 
								 | 
							
								    cursor.execute("SET SESSION search_path='%s'" % schema_name)
							 | 
						||
| 
								 | 
							
								    cursor.close()
							 | 
						||
| 
								 | 
							
								    dbapi_connection.autocommit = existing_autocommit
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								@drop_all_schema_objects_pre_tables.for_db("postgresql")
							 | 
						||
| 
								 | 
							
								def drop_all_schema_objects_pre_tables(cfg, eng):
							 | 
						||
| 
								 | 
							
								    with eng.connect().execution_options(isolation_level="AUTOCOMMIT") as conn:
							 | 
						||
| 
								 | 
							
								        for xid in conn.execute("select gid from pg_prepared_xacts").scalars():
							 | 
						||
| 
								 | 
							
								            conn.execute("ROLLBACK PREPARED '%s'" % xid)
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								@drop_all_schema_objects_post_tables.for_db("postgresql")
							 | 
						||
| 
								 | 
							
								def drop_all_schema_objects_post_tables(cfg, eng):
							 | 
						||
| 
								 | 
							
								    from sqlalchemy.dialects import postgresql
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    inspector = inspect(eng)
							 | 
						||
| 
								 | 
							
								    with eng.begin() as conn:
							 | 
						||
| 
								 | 
							
								        for enum in inspector.get_enums("*"):
							 | 
						||
| 
								 | 
							
								            conn.execute(
							 | 
						||
| 
								 | 
							
								                postgresql.DropEnumType(
							 | 
						||
| 
								 | 
							
								                    postgresql.ENUM(name=enum["name"], schema=enum["schema"])
							 | 
						||
| 
								 | 
							
								                )
							 | 
						||
| 
								 | 
							
								            )
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								@prepare_for_drop_tables.for_db("postgresql")
							 | 
						||
| 
								 | 
							
								def prepare_for_drop_tables(config, connection):
							 | 
						||
| 
								 | 
							
								    """Ensure there are no locks on the current username/database."""
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								    result = connection.exec_driver_sql(
							 | 
						||
| 
								 | 
							
								        "select pid, state, wait_event_type, query "
							 | 
						||
| 
								 | 
							
								        # "select pg_terminate_backend(pid), state, wait_event_type "
							 | 
						||
| 
								 | 
							
								        "from pg_stat_activity where "
							 | 
						||
| 
								 | 
							
								        "usename=current_user "
							 | 
						||
| 
								 | 
							
								        "and datname=current_database() and state='idle in transaction' "
							 | 
						||
| 
								 | 
							
								        "and pid != pg_backend_pid()"
							 | 
						||
| 
								 | 
							
								    )
							 | 
						||
| 
								 | 
							
								    rows = result.all()  # noqa
							 | 
						||
| 
								 | 
							
								    if rows:
							 | 
						||
| 
								 | 
							
								        warn_test_suite(
							 | 
						||
| 
								 | 
							
								            "PostgreSQL may not be able to DROP tables due to "
							 | 
						||
| 
								 | 
							
								            "idle in transaction: %s"
							 | 
						||
| 
								 | 
							
								            % ("; ".join(row._mapping["query"] for row in rows))
							 | 
						||
| 
								 | 
							
								        )
							 |