psycopg3: a first report

2020-03-26

In the most unusual conditions I started hacking on psycopg3 roughly following the plan sketched in Thinking psycopg3.

The first step was to create a package, psycopg3.pq, allowing a low level access to the libpq from Python. The package is a façade to several implementations: the one I started implementing is a ctypes-based one, so pure Python. Others can be implemented using CFFI or pure C, but for the moment we can already access the libpq without external dependencies (useful when you hack from a van without internet connection and you only have offline Python documentation available).

The package is pure mechanism, no policy, so it is entirely based on bytes instead of unicode strings. Among its responsibilities there is to free() the libpq C structures on __del__(), so that resources such as connections and results are tied to Python objects refcount, in RAII style. It is not complete yet, but it already exposes everything needed for the psycopg3 implementation so far, and wrapping further functions is straightforward.

>>> from psycopg3 import pq
>>> pqconn = pq.PGconn.connect(b"dbname=psycopg3_test")
>>> result = pqconn.exec_(b"select 'hello' || ' world!'")
>>> result.get_value(0, 0)
b'hello world!'

Among the libpq functions exposed there are the non-blocking ones for connecting and for querying. The blocking ones are exposed too, but the idea is to avoid using them altogether in "real code", leaving to Python the responsibility to coordinate and wait. This would allow writing the whole adapter without juggling with the GIL and without losing control during long operations. In other words, Ctrl-C would just work, no issue #333.

As explained in the previous article, both a synchronous and an asynchronous interface should be exposed: the sync one blocking as per DBAPI specs, the async one based on asyncio and coroutines. What I don't want to do instead is to have to write everything twice.

About the async interface, there is no guidance from the DBAPI about what it should present, but the most natural way (which is what AIOPG exposes, in my understanding) is to replicate exactly the blocking interface, awaiting whenever it would block. So what may it look like is:

>>> cnn = await psycopg3.connect_async("")
>>> cur = cnn.cursor()
>>> await cur.execute("select 'hello' || ' mum!'")
# >>> await cur.fetchone()  # TODO: not implemented yet
# but the result is on the client already
>>> cur._result.get_value(0, 0)
b'hello mum!'

The async interaction with the libpq is a delicate matter and surely not something I would like to repeat with different flavours in several parts of the code. So the guiding forces of the design are:

  • there are different types of connections and cursors: a blocking sync one you can use as cur.execute(query) and an async def one you must call as await cur.execute(query), so they must actually be two different methods;
  • the intricate async machinery of the libpq, and the state keeping it needs between calls, should be implemented only once;
  • how to wait should be selectable (no Unix pun intended) and injected around the libpq according to the different Python environment.

So, hum... how to do a thing, ask someone else to wait, then go back at work?

Coroutines!

More precisely, because in modern Python coroutine acquired a different and more specific meaning, what I refer to is to the old plain generators, i.e. the functions with yield, not the functions with async def. So the exclamation should be:

Generators!

This is the idea: showing a simplified connection procedure (the query procedure is more complicated but it follow a similar collaboration pattern):

# simplified `psycopg3.connection.BaseConnection._connect_gen()`
def connect_gen(conninfo):
    """
    Generator to create a database connection without blocking.

    Yield pairs (fileno, `Wait`) whenever an operation would block. The
    generator can be restarted sending the appropriate `Ready` state when
    the file descriptor is ready.
    """
    conn = pq.PGconn.connect_start(conninfo)
    while 1:
        status = conn.connect_poll()
        if status == pq.PollingStatus.OK:
            break
        elif status == pq.PollingStatus.READING:
            yield conn.socket, Wait.R
        elif status == pq.PollingStatus.WRITING:
            yield conn.socket, Wait.W

    return conn

This generator yields as soon as it would block, asking the consumer to be awaken as soon as the connection is ready to read or write. This generator can be consumed in a blocking way using a procedure such as:

from select import select

# simplified `psycopg3.waiting.wait_select()`
def wait_select(gen):
    """
    Wait on the behalf of a generator using select().

    *gen* is expected to generate tuples (fd, status). consume it and block
    according to the status until fd is ready. Send back the ready state
    to the generator.

    Return what the generator eventually returned.
    """
    try:
        fd, s = next(gen)
        while 1:
            if s == Wait.R:
                rf, wf, xf = select([fd], [], [])
                ready = Ready.R
            elif s == Wait.W:
                rf, wf, xf = select([], [fd], [])
                ready = Ready.W
            elif s == Wait.RW:
                rf, wf, xf = select([fd], [fd], [])
                ready = Ready.W if rw else Ready.R

            fd, s = gen.send(ready)

    except StopIteration as exc:
        return exc.args[0]

With these two functions happily collaborating the blocking call psycopg3.connect() can be implemented with something like:

def connect(dsn):
    gen = connect_gen(dsn)
    pgconn = wait_select(gen)   # the low-level libpq connection
    return Connection(pgconn)   # the high-level DBAPI connection

What about the async connection? I'm relatively new to asyncio, so I'm not sure if this is the most idiomatic way to do it, but at the moment it looks like the following block. add_reader/add_writer() are the mechanism to wait collaboratively for a file descriptor to become ready, but they only take a callback, they don't block the coroutine. So we block it against an Event, and use the callback to free it.

from asyncio import get_event_loop, Event

# simplified `psycopg3.waiting.wait_async()`
async def wait_async(gen):
    ev = Event()
    loop = get_event_loop()
    ready = None

    def wakeup(state):
        nonlocal ready
        ready = state
        ev.set()

    try:
        fd, s = next(gen)
        while 1:
            ev.clear()
            if s == Wait.R:
                loop.add_reader(fd, wakeup, Ready.R)
                await ev.wait()
                loop.remove_reader(fd)
            elif s == Wait.W:
                loop.add_writer(fd, wakeup, Ready.W)
                await ev.wait()
                loop.remove_writer(fd)
            elif s == Wait.RW:
                loop.add_reader(fd, wakeup, Ready.R)
                loop.add_writer(fd, wakeup, Ready.W)
                await ev.wait()
                loop.remove_reader(fd)
                loop.remove_writer(fd)

            fd, s = gen.send(ready)

    except StopIteration as exc:
        return exc.args[0]

which allows to implement an asyncio-friendly connection function with:

async def connect_async(dsn):
    gen = connect_gen(dsn)
    pgconn = await wait_async(gen)  # the low-level libpq connection
    return AsyncConnection(pgconn)  # an AIOPG-style high-level connection

The same wait_*() functions are used to wrap a send-query-fetch-result generator underlying cursor.execute() and friends:

class Cursor(BaseCursor):
    def execute(self, query):   # ignoring values for now
        with self.conn.lock:
            gen = self._execute_send(query)
            results = wait_select(gen)
            self._execute_results(results)  # process the result to python

class AsyncCursor(BaseCursor):
    async def execute(self, query):
        async with self.conn.lock:
            gen = self._execute_send(query)
            results = await wait_async(gen)
            self._execute_results(results)

Minimal code duplication, different high level interface. Symmetry. I like it!

Furthermore, unpacking the query mechanism allowed the library to gain a new feature: it is now possible with a single roundtrip to return more than one result, implementing the nextset() method, which isn't available in psycopg2:

>>> cur.execute("select 10; select 20")
>>> cur._result.get_value(0, 0)
b'10'
>>> cur.nextset()
True
>>> cur._result.get_value(0, 0)
b'20'

What's left? Well, a lot! Now that the basic machinery is in place, and Python can send and retrieve bytes to and from Postgres, it's time to attack the adaptation layer.

Feedback is welcome: as someone suggested it would be useful to be friendly with other async frameworks such as trio; maybe my async code is not the best around and AIOPG or asyncpg hackers have something to suggest. Please let me know: the code so far is online already, with 141 tests in a grid testing Python from 3.6 to 3.8 and PostgreSQL from 9.5 to 12.

I would be extremely grateful if you would like to support the development of the project! Please make me feel your warmth in this antipodean month of lockdown with your development sponsorship, thank you! 💜