Answer a question

I have a reactor that fetches messages from a RabbitMQ broker and triggers worker methods to process these messages in a process pool, something like this:

Reactor

This is implemented using python asyncio, loop.run_in_executor() and concurrent.futures.ProcessPoolExecutor.

Now I want to access the database in the worker methods using SQLAlchemy. Mostly the processing will be very straightforward and quick CRUD operations.

The reactor will process 10-50 messages per second in the beginning, so it is not acceptable to open a new database connection for every request. Rather I would like to maintain one persistent connection per process.

My questions are: How can I do this? Can I just store them in a global variable? Will the SQA connection pool handle this for me? How to clean up when the reactor stops?

[Update]

  • The database is MySQL with InnoDB.

Why choosing this pattern with a process pool?

The current implementation uses a different pattern where each consumer runs in its own thread. Somehow this does not work very well. There are already about 200 consumers each running in their own thread, and the system is growing quickly. To scale better, the idea was to separate concerns and to consume messages in an I/O loop and delegate the processing to a pool. Of course, the performance of the whole system is mainly I/O bound. However, CPU is an issue when processing large result sets.

The other reason was "ease of use." While the connection handling and consumption of messages is implemented asynchronously, the code in the worker can be synchronous and simple.

Soon it became evident that accessing remote systems through persistent network connections from within the worker are an issue. This is what the CommunicationChannels are for: Inside the worker, I can grant requests to the message bus through these channels.

One of my current ideas is to handle DB access in a similar way: Pass statements through a queue to the event loop where they are sent to the DB. However, I have no idea how to do this with SQLAlchemy. Where would be the entry point? Objects need to be pickled when they are passed through a queue. How do I get such an object from an SQA query? The communication with the database has to work asynchronously in order not to block the event loop. Can I use e.g. aiomysql as a database driver for SQA?

Answers

Your requirement of one database connection per process-pool process can be easily satisfied if some care is taken on how you instantiate the session, assuming you are working with the orm, in the worker processes.

A simple solution would be to have a global session which you reuse across requests:

# db.py
engine = create_engine("connection_uri", pool_size=1, max_overflow=0)
DBSession = scoped_session(sessionmaker(bind=engine)) 

And on the worker task:

# task.py
from db import engine, DBSession
def task():
    DBSession.begin() # each task will get its own transaction over the global connection
    ...
    DBSession.query(...)
    ...
    DBSession.close() # cleanup on task end

Arguments pool_size and max_overflow customize the default QueuePool used by create_engine.pool_size will make sure your process only keeps 1 connection alive per process in the process pool.

If you want it to reconnect you can use DBSession.remove() which will remove the session from the registry and will make it reconnect at the next DBSession usage. You can also use the recycle argument of Pool to make the connection reconnect after the specified amount of time.

During development/debbuging you can use AssertionPool which will raise an exception if more than one connection is checked-out from the pool, see switching pool implementations on how to do that.

Logo

Python社区为您提供最前沿的新闻资讯和知识内容

更多推荐