python: sqlalchemy - how do I ensure connection not stale using new event system

codingknob picture codingknob · Apr 2, 2013 · Viewed 8.9k times · Source

I am using the sqlalchemy package in python. I have an operation that takes some time to execute after I perform an autoload on an existing table. This causes the following error when I attempt to use the connection:

sqlalchemy.exc.OperationalError: (OperationalError) (2006, 'MySQL server has gone away')

I have a simple utility function that performs an insert many:

def insert_data(data_2_insert, table_name):
    engine = create_engine('mysql://blah:blah123@localhost/dbname')
    # Metadata is a Table catalog. 
    metadata = MetaData()
    table = Table(table_name, metadata, autoload=True, autoload_with=engine)
    for c in mytable.c:
        print c
    column_names = tuple(c.name for c in mytable.c)
    final_data = [dict(zip(column_names, x)) for x in data_2_insert]
    ins = mytable.insert()
    conn = engine.connect()
    conn.execute(ins, final_data)
    conn.close()

It is the following line that times long time to execute since 'data_2_insert' has 677,161 rows.

final_data = [dict(zip(column_names, x)) for x in data_2_insert]

I came across this question which refers to a similar problem. However I am not sure how to implement the connection management suggested by the accepted answer because robots.jpg pointed this out in a comment:

Note for SQLAlchemy 0.7 - PoolListener is deprecated, but the same solution can be implemented using the new event system.

If someone can please show me a couple of pointers on how I could go about integrating the suggestions into the way I use sqlalchemy I would be very appreciative. Thank you.

Answer

Palasaty picture Palasaty · Apr 2, 2013

I think you are looking for something like this:

from sqlalchemy import exc, event
from sqlalchemy.pool import Pool

@event.listens_for(Pool, "checkout")
def check_connection(dbapi_con, con_record, con_proxy):
    '''Listener for Pool checkout events that pings every connection before using.
    Implements pessimistic disconnect handling strategy. See also:
    http://docs.sqlalchemy.org/en/rel_0_8/core/pooling.html#disconnect-handling-pessimistic'''

    cursor = dbapi_con.cursor()
    try:
        cursor.execute("SELECT 1")  # could also be dbapi_con.ping(),
                                    # not sure what is better
    except exc.OperationalError, ex:
        if ex.args[0] in (2006,   # MySQL server has gone away
                          2013,   # Lost connection to MySQL server during query
                          2055):  # Lost connection to MySQL server at '%s', system error: %d
            # caught by pool, which will retry with a new connection
            raise exc.DisconnectionError()
        else:
            raise

If you wish to trigger this strategy conditionally, you should avoid use of decorator here and instead register listener using listen() function:

# somewhere during app initialization
if config.check_connection_on_checkout:
    event.listen(Pool, "checkout", check_connection)

More info: