본문 바로가기
Coding/Python

SQLAlchemy AsyncSession으로 비동기 적용하기

by Hide­ 2022. 3. 18.
반응형

개요

현재 서버 프레임워크로 FastAPI를 사용하고 있었고 SQLAlchemy는 1.3대 버전을 사용중이었다. 따라서 DB쪽은 여전히 동기형태로 작업이 진행되기 때문에 비동기 프레임워크의 장점을 제대로 살리지 못했다. 물론 database, peewee등 비동기 ORM이 존재하지만 팀 내 구성원간의 러닝커브등을 고려하여 기존에 사용하던 SQLAlchemy를 그대로 사용중이었다. 하지만 SQLAlchemy 1.4버전부터 비동기 처리를 지원하기 시작했고 그에 따라 마이그레이션을 준비하며 테스트하는 과정을 간단하게 담아본다.

create_async_engine

from sqlalchemy.ext.asyncio import create_async_engine


engine = create_async_engine(
    "mysql+aiomysql://fastapi:fastapi@localhost:3306/fastapi",
    pool_recycle=3600,
)

기존 동기의 경우 create_engine() 메소드를 사용했다. 하지만 비동기의 경우 ext.asyncio에 있는 create_async_engine을 사용한다. 또한 데이터베이스 접속 라이브러리로 보통 pymysql을 사용할텐데, 이제는 비동기를 지원하는 aiomysql을 사용해야한다.

sessionmaker

from sqlalchemy.ext.asyncio import (
    AsyncSession,
    create_async_engine,
)
from sqlalchemy.orm import sessionmaker


engine = create_async_engine(config.DB_URL, pool_recycle=3600)
async_session_factory = sessionmaker(bind=engine, class_=AsyncSession)

다음으로 sessionmaker를 통해 세션을 생성해준다. 차이점은 class_ 인자로 AsyncSession을 넣어줘야 한다는 점이다.

async_scoped_session

from asyncio import current_task

from sqlalchemy.ext.asyncio import (
    AsyncSession,
    async_scoped_session,
)
from sqlalchemy.orm import sessionmaker


async_session_factory = sessionmaker(bind=engine, class_=AsyncSession)
session = async_scoped_session(
    session_factory=async_session_factory,
    scopefunc=current_task,
)

기존에는 scoped_session() 메소드를 통해 Thread-local 세션을 만들어줬다. 하지만 비동기의 경우 코루틴을 사용하기 때문에 Thread-local이 아닌 Context-local을 보장해줘야 한다. (Context-local에 대한 자세한 내용은 https://hides.tistory.com/1081 첫 번째 문제의 해결방법을 참고한다) 웹 서버의 경우 일반적으로 Request per session으로 동작하도록 해야하는데 그 이유는, 예외 발생등의 이슈가 발생했을 때 해당 예외가 타 Request 및 session에 전파되지 말아야 하기 때문이다. 

async_scoped_session()의 scopefunc인자를 보면 current_task를 넣어주고 있다. scopefunc는 인자로 함수를 받고 추후 사용할 세션을 결정할 때 해당 인자로 들어온 함수를 통해 내부적으로 구현된 딕셔너리에서 세션을 가져오게 된다. 다음으로 current_task() 메소드는 asyncio 라이브러리에서 제공하는 메소드이며 현재 실행중인 Task 인스턴스를 반환해주는 메소드이다. 정확하게 말하자면 위 설명은 일반적인 scoped_session() 메소드의 경우인데 async_scoped_session()의 경우도 별반 다르지 않다. 공식문서에 따르면 async_scoped_session()은 동일한 세션을 사용하지만 프록시 형태로 구현되어있어 현재의 Context별로 세션을 사용할 수 있다고 나와있다.

한마디로 scopefunc에 current_task() 메소드를 넣어주게 된다면, 세션이 사용되는 시점에 scopefunc에 들어온 메소드를 사용하여 현재 사용해야할 세션의 Context를 결정하게 되고 위에서 설명했던 것 처럼 current_task()를 통해 현재 실행되는 컨텍스트의 Task 인스턴스를 키값으로하여 사용할 Context를 가져온다. 이는 즉, 각 컨텍스트별로 Context-local을 보장함을 뜻한다.

전체 코드

from asyncio import current_task

from sqlalchemy.ext.asyncio import (
    AsyncSession,
    create_async_engine,
    async_scoped_session,
)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session


engine = create_async_engine(
    "mysql+aiomysql://fastapi:fastapi@localhost:3306/fastapi",
    pool_recycle=3600,
)
async_session_factory = sessionmaker(bind=engine, class_=AsyncSession)
session = async_scoped_session(
    session_factory=async_session_factory,
    scopefunc=current_task,
)
Base = declarative_base()

위에서 설명한 설정 관련 전체 코드를 한번에 작성하자면 위와 같다. 참고로 Base의 경우 동기/비동기 모두 동일한 방법으로 생성한다.

리스트 조회

Async SQLAlchemy의 경우 기존과 다른 문법을 가진다. 2.0 스타일의 문법이라고 하는데 간단하게 몇가지 살펴보자.

from sqlalchemy import select

query = select(User).where(User.id < prev).limit(limit)
result = await session.execute(query)
return result.scalars().all()

일반적으로 where조건절을 작성할 때 filter() 메소드를 사용했다. 하지만 이제는 where() 를 사용해야한다. 내부 인자는 기존과 같으며 limit()등도 동일하게 사용한다. 다른점은 session.execute() 메소드를 통해 작성한 쿼리를 실행해주는 부분이다. (await를 통해 실행해야한다는 점을 기억하자) 또한 결과를 가져오려면 .scalars()를 통해 가져와야 한다. 모든 데이터를 가져오거나 하나만 가져오는 부분은 기존과 동일하게 .all() 또는 .first()를 사용한다.

데이터 추가

데이터 추가의 경우 기존과 동일하게 모델 객체를 생성하고 session.add() 메소드를 통해 추가해주면 된다. 물론 session.commit()의 경우 await를 붙여줘야 동작한다.

session.remove()

세션을 모두 사용했다면 remove() 메소드를 통해 현재 Context에서 사용한 세션을 삭제해줘야 한다. 그렇지 않다면 사용한 세션 객체가 삭제되지 않고 지속적으로 남아있게 된다. 이를 직접적으로 확인하기 위해서는 session.registry.registry를 확인해보면 된다. 

self.session_factory = session_factory
self.registry = ScopedRegistry(session_factory, scopefunc)

async_scoped_session의 init코드를 확인해보면 위와 같은 코드를 발견할 수 있다. ScopedRegistry 객체를 따라들어가보면, 

def __init__(self, createfunc, scopefunc):
    """Construct a new :class:`.ScopedRegistry`.

    :param createfunc:  A creation function that will generate
      a new value for the current scope, if none is present.

    :param scopefunc:  A function that returns a hashable
      token representing the current scope (such as, current
      thread identifier).

    """
    self.createfunc = createfunc
    self.scopefunc = scopefunc
    self.registry = {}

def __call__(self):
    key = self.scopefunc()
    try:
        return self.registry[key]
    except KeyError:
        return self.registry.setdefault(key, self.createfunc())

위의 코드가 나오게 되는데 내부적으로 registry라는 딕셔너리를 가지고 있고 scopefunc에 넣어준 메소드를 통해 해당 딕셔너리의 키값을 결정하여 사용할 세션을 가져오는 모습을 확인할 수 있다. 

async def remove(self):
    """Dispose of the current :class:`.AsyncSession`, if present.

    Different from scoped_session's remove method, this method would use
    await to wait for the close method of AsyncSession.

    """

    if self.registry.has():
        await self.registry().close()
    self.registry.clear()

remove() 메소드의 경우 코드를 확인해보면 위와 같다. 설명했던 것 처럼 registry 딕셔너리에서 키값이 존재한다면 삭제해주는 역할을 진행한다. 따라서 세션을 사용한 이후에는 꼭 session.remove()를 통해 제거해줘야 한다.

주의 사항

모든 소스는 https://github.com/teamhide/fastapi-boilerplate 레포지토리에 올라가있다. 참고로 위 문서를 통해서도 Async SQLAlchemy를 구현할 수 있지만 몇가지 문제점이 존재한다. 관련 사항은 https://hides.tistory.com/1102 에서 확인할 수 있다.