Streaming SQL results from SQLALchemy via a FastAPI endpoint

2 minute read

I was asked to create an endpoint that gets an SQL query and replies with a JSON list of the results. The prototype was ready in 10 minutes:

@app.post("/sql")
async def sql(
    query: str = Form(),
    user: OpenID = Depends(get_logged_user),
):
    async with session_maker() as session:
        async with session.begin():
            try:
                result = await session.execute(text(query))
            except Exception as e:
                raise HTTPException(status_code=400, detail=str(e))

            keys = result.keys()
            rows = result.fetchall()
            return [
                dict(zip(keys, row))
                for row in rows
            ]

Which was nice, and then promptly crashed the process when we sent a query with a ginormous result set and the Python process tried to fetch all of the rows into memory. One can argue about the validity of large reult sets and mention paging and whatnot, but these were “research” related one-time queries, and therefor merited large result sets and resisted paging etc. I wanted to move into streaming the results (“server side cursors” is the applicable SQLism, I think), but ran into problems where the transaction ended before the streaming response started.

This did not work:

@app.post("/sql")
async def sql(
    query: str = Form(),
    user: OpenID = Depends(get_logged_user),
):
    async with session_maker() as session:
        async with session.begin():
            try:
                result = await session.stream(text(query))
            except Exception as e:
                raise HTTPException(status_code=400, detail=str(e))

            keys = result.keys()
            async def streamy():
                first = True
                yield '['
                for row in result: 
                    if first:
                        first=False
                    else:
                        yield ','
                    yield json.dumps(dict(zip(keys, row)))
                yield ']'
            return StreamingResponse(streamy(), media_type="application/json")

Since the actual invocation of streamy happend outside the with block for the session, SQLAlchemy would complain.

The next step was moving the session into the streamed function:

async def streamy(query):
    async with session_maker() as session:
        async with session.begin():
            result = await session.stream(text(query))
            keys = result.keys()
            first = True
            yield '['
            for row in result: 
                if first:
                    yield ','
                else:
                    first=True
                yield json.dumps(dict(zip(keys, row)))
            yield ']'

@app.post("/sql")
async def sql(
    query: str = Form(),
    user: OpenID = Depends(get_logged_user),
):
    return StreamingResponse(streamy(query), media_type="application/json")

The above worked, but only when the SQL query was successful. Once streaming was started, the response is going to be a 200 response even if an error was thrown. There was no way to report an error back to the user, aside from putting it inside the json response which will be surprising for unsuspecting clients.

The version I was happy with gave up on the with block, and instead explicitly opened and closed the SQLAlchemy session. The closing only happened inside the streaming function, which ensured the session was available for the duration of the stream. Executing the query before returning a StreamingResponse allows me to still return a 400 if the query runs into trouble

@app.post("/sql")
async def sql(
    query: str = Form(),
    user: OpenID = Depends(get_logged_user),
):
    session = session_maker()
    await session.begin()
    try:
        result = await session.stream(text(query))
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

    keys = result.keys()
    async def streamy():
        try:
            yield '['
            first = True
            async for row in result:
            for row in result: 
                if first:
                    first=False
                else:
                    yield ','
                yield json.dumps(dict(zip(keys, row)))
            yield ']'
        finally:
            await session.close()

    return StreamingResponse(streamy(), media_type="application/json")

Happy days