Coding/설계 | 경험

FastAPI SQLAlchemy 연동하며 발생한 문제 정리

Hide­ 2021. 5. 24. 15:45
반응형

개요

기존과 다른 패러다임을 가진 비동기 프레임워크 FastAPI와 SQLAlchemy를 연동하는 과정에서 발생했던 문제들과 어떠한 형태로 해결했는지에 대해 다뤄본다.

사전 지식

SQLAlchemy는 scoped_session() 함수를 활용하여 Thread-local한 세션을 생성할 수 있다. 이 때 scopefunc 라는 인자에 특정한 함수를 콜백 형태로 넣어주게 된다면 세션 사용 시 아래와 같은 로직을 통해 세션을 관리하게 된다.

  1. session.registry.registry에 키/밸류 쌍의 딕셔너리를 생성한다.
  2. scopefunc에 넣어준 함수를 통해 키를 결정하고(가져오고) 해당 값을 통해 session.registry.registry에 접근하여 사용할 세션을 가져와서 사용한다.
  3. 추후 session.remove()시에도 scopefunc에 넣어준 함수를 통해 키를 가져오고 session.registry.registry에서 삭제한다.

요약하자면, scopefunc에 인자를 넣어준다면 앞으로 세션 사용 시 registry를 통해 세션을 가져오게 되기때문에 사용자가 보다 직접적으로 세션관리에 관여하겠다는 뜻으로 생각할 수 있겠다.

    def __init__(self, session_factory, scopefunc=None):
        """Construct a new :class:`.scoped_session`.

        :param session_factory: a factory to create new :class:`.Session`
         instances. This is usually, but not necessarily, an instance
         of :class:`.sessionmaker`.
        :param scopefunc: optional function which defines
         the current scope.   If not passed, the :class:`.scoped_session`
         object assumes "thread-local" scope, and will use
         a Python ``threading.local()`` in order to maintain the current
         :class:`.Session`.  If passed, the function should return
         a hashable token; this token will be used as the key in a
         dictionary in order to store and retrieve the current
         :class:`.Session`.

        """
        self.session_factory = session_factory

        if scopefunc:
            self.registry = ScopedRegistry(session_factory, scopefunc)
        else:
            self.registry = ThreadLocalRegistry(session_factory)

위 코드는 scoped_session()__init__ 부분이다. 위에 나와있듯이 scopefunc에 인자가 들어온다면, 해당 함수의 리턴값을 딕셔너리(registry)의 키값으로 사용하여 현재 사용할 세션을 가져온다고 쓰여있다.

    def __call__(self):
        key = self.scopefunc()
        try:
            return self.registry[key]
        except KeyError:
            return self.registry.setdefault(key, self.createfunc())

위 코드는 ScopedRegistry 클래스의 __call__ 부분이다. scopefunc를 통해 key값을 정하고 해당 값을 통해 내부 registry 딕셔너리에 접근하여 세션을 꺼내온다.

문제 및 해결 방법

첫 번째 문제

def get_request_id():
    raise NotImplementedError


engine = create_engine(config.DB_URL, pool_recycle=3600, echo=True)
session: Union[Session, scoped_session] = scoped_session(
    sessionmaker(autocommit=True, autoflush=False, bind=engine),
    scopefunc=get_session_id,
)

위와 같이 세션을 생성하고 scopefunc에 더미 함수를 하나 넣어준다.

class SQLAlchemyMiddleware(BaseHTTPMiddleware):
    def __init__(self, app):
        super().__init__(app)

    async def dispatch(
        self, request: Request, call_next: RequestResponseEndpoint,
    ):
        request_id = str(uuid4())
        def get_request_id():
            return request_id
        session.registry.scopefunc = get_request_id

        try:
            response = await call_next(request)
        except Exception as e:
            session.rollback()
            raise e
        finally:
            session.remove()

        return response

FastAPI의 미들웨어를 거칠 때 dispatch() 함수 내부에서 각 request에 맞는 request_id를 생성하고 해당 값을 리턴하는 get_request_id() 함수를 생성한다. 그리고 해당 함수를 scopefunc에 콜백으로 넣어준다.

@home_router.get("/")
async def home():
    session.query(User).first()
    print(f"{session.registry.registry} / {session.registry.scopefunc()}")
    await asyncio.sleep(4)
    return {"status": True}

테스트를 진행하기 위해 간단한 엔드포인트를 하나 생성한다. registry에 담긴 세션들과 현재 scopefunc를 통해 얻은 request_id를 출력하고 4초간 출력한다. 이를 연속으로 2번 요청하면 다음과 같다.

{'1b4cb18b-d40a-4a2b-8c0b-1aac8f0285b4': <sqlalchemy.orm.session.Session object at 0x107372820>} / 1b4cb18b-d40a-4a2b-8c0b-1aac8f0285b4
{'1b4cb18b-d40a-4a2b-8c0b-1aac8f0285b4': <sqlalchemy.orm.session.Session object at 0x107372820>, '429f8a64-4023-40bd-b631-36e41daaef58': <sqlalchemy.orm.session.Session object at 0x107387220>} / 429f8a64-4023-40bd-b631-36e41daaef58

각 리퀘스트에 해당하는 세션들이 별도로 담겼다. 여기까지는 큰 문제가 없어보인다. 한번 더 요청을 날려보자.

{'1b4cb18b-d40a-4a2b-8c0b-1aac8f0285b4': <sqlalchemy.orm.session.Session object at 0x107372820>, 'b20df67b-7c18-4759-ad98-8dc46a1b6bfd': <sqlalchemy.orm.session.Session object at 0x107372070>} / b20df67b-7c18-4759-ad98-8dc46a1b6bfd

가장 최초 요청 시 생성한 세션이 삭제되지 않았다. 왜 이런걸까? 코루틴의 기본이 되는 Concurrency의 특성에 대해 잘 알고 있다면 아마 쉽게 이해가 갈 것이다. 간단하게 요약하자면 아래와 같다.

  1. 첫 번째 요청이 들어올 때 미들웨어를 거치고 세션을 생성한다. 그 다음 4초간 sleep한다.
  2. 두 번째 요청이 들어올 때도 마찬가지로 미들웨어를 거치고 세션을 생성한다. 세션의 scopefunc를 새롭게 덮어쓴다.
  3. 첫 번째 요청의 sleep이 끝나고 최종적으로 session.remove() 를 통해 세션을 지우려고하지만 2번의 요청에서 scopefunc를 새롭게 덮어썼기 때문에 첫 번째 세션의 키값을 가져오지 못한다. 고로 삭제할 수 없다.

부록으로 session.remove()의 소스를 확인해본다.

    def remove(self):
        if self.registry.has():
            self.registry().close()
        self.registry.clear()

    def has(self):
      return hasattr(self.registry, "value")

해결 방법

위 문제는 ContextVar를 사용하여 해결할 수 있다.

session_context: ContextVar[str] = ContextVar("session_context")

class SQLAlchemyMiddleware(BaseHTTPMiddleware):
    def __init__(self, app):
        super().__init__(app)

    async def dispatch(
        self, request: Request, call_next: RequestResponseEndpoint,
    ):
        request_id = str(uuid4())
        session_context.set(request_id)
        def get_request_id():
            return session_context.get()

        session.registry.scopefunc = get_request_id

        try:
            response = await call_next(request)
        except Exception as e:
            session.rollback()
            raise e
        finally:
            session.remove()

        return response

그리고 마찬가지로 4초 sleep하는 코드를 돌려보면 아래와 같다.

# 최초 2번 요청
{'cc316a7e-3617-477e-9936-672b9938e6b3': <sqlalchemy.orm.session.Session object at 0x1060817c0>} / cc316a7e-3617-477e-9936-672b9938e6b3
{'cc316a7e-3617-477e-9936-672b9938e6b3': <sqlalchemy.orm.session.Session object at 0x1060817c0>, '3ec8900d-6aeb-4d9d-b8f0-5f638fd16525': <sqlalchemy.orm.session.Session object at 0x106094100>} / 3ec8900d-6aeb-4d9d-b8f0-5f638fd16525

# 이후 요청
{'bf16c71e-3c07-43e5-b9ed-69f78de1e813': <sqlalchemy.orm.session.Session object at 0x106081160>} / bf16c71e-3c07-43e5-b9ed-69f78de1e813

모든 세션이 정상적으로 삭제되고 또 다른 새로운 세션이 생성되었음을 확인할 수 있다.

위처럼 각 요청에 해당하는 request_id를 ContextVar에 담아주고 해당 값을 사용하는 형태로 수정한다면 Context-local을 보장할 수 있다. 코루틴은 하나의 스레드에서 Concurrency하게 돌아가기 때문에 작업들 간 서로 제어권을 주고받으며 동작한다. 이는 A라는 작업이 동작하다가 완료되지 않았음에도 불구하고 B라는 작업이 추가적으로 실행될 수 있음을 뜻한다. 이 때 ContextVar를 사용하면 A작업에서 session_context변수에 접근할때와 B작업에서 접근할 때 각 Context별로 다른 값을 가질 수 있음을 말한다. Thread-local에 대해 생각해보면 쉽게 이해할 수 있으리라 생각된다.

따라서 위처럼 ContextVar에 각 요청에 해당하는 request_id를 담아주고 해당 값을 사용한다면 scopefunc가 덮어씌워져도 각 요청에 맞는 id를 가져올 수 있게된다.

두 번째 문제

사실 이 문제는 첫 번째 문제와 동일한데, FastAPI에서 제공하는 기능들과 연동하다가 발생한 문제라고 보는게 맞을 것 같다. FastAPI에는 클라이언트에게 응답을 뿌려준 후 특정 작업을 수행할 수 있도록 해주는 BackgroundTasks라는 기능이 존재한다. 만약 BackgroundTasks를 통해 수행하는 작업이 세션을 필요로 하는 작업이라면 어떻게 해줘야할까? 현재 구조는 미들웨어에서 세션에 관한 설정을 진행해주기 때문에 바로 사용이 불가능한 형태이다. 따라서 아래와 같은 데코레이터를 만들어줬다.

def create_session(func):
    async def wrapper(*args, **kwargs):
        is_rollback = False
        request_id = str(uuid4())

        def get_request_id():
            return request_id

        session.registry.scopefunc = get_request_id

        try:
            await func(*args, **kwargs)

추가적으로 다른 코드가 하나 있다. 첫 번째 문제에서는 scopefunc에 콜백 함수를 미들웨어의 dispatch() 함수에서 설정해줬지만, 이번에는 __init__함수에서 설정해줬다. 여기서 문제가 발생했다.

  1. 미들웨어의 __init__ 은 최초 서버가 뜰때만 실행된다. 요청이 들어올 때 마다 타는 로직은 dispatch() 에 설정해줘야한다.
  2. 그런데 현재 미들웨어에서는 __init__ 에서 설정해주고 있다. 그 이후 create_session 데코레이터에서 다시 한번 scopefunc에 콜백 함수를 교체한다. 따라서 미들웨어에서 설정한 get_request_id()함수가 실행되는게 아닌 create_session 데코레이터의 get_request_id() 가 실행된다.

따라서 첫 번째 문제에서 명시했던 상황과 정확히 동일한 상황이 발생됐다.

해결 방법

첫 번째 문제와 동일하게 ContextVar를 사용하면 된다. 또한 미들웨어의 __init__에서 scopefunc를 교체하지말고 dispatch() 에서 교체한다.

세 번째 문제

현재 세션을 초기화하고 설정해주는 코드를 보면 알겠지만, 미들웨어를 거치지 않고서는 사용할 수 없다. 세션은 미들웨어를 통해 설정되기 때문이다. 따라서 테스트 코드를 작성하거나 Background task처럼 미들웨어를 통하지 않는 경우에는 세션을 사용할 수 없다. 

해결 방법

def create_session(func):
    async def _create_session(*args, **kwargs):
        session_id = str(uuid4())
        context = set_session_context(session_id=session_id)

        try:
            await func(*args, **kwargs)
        except Exception as e:
            session.rollback()
            raise e
        finally:
            session.remove()
            reset_session_context(context=context)

    return _create_session

따라서 독립적으로 세션을 설정해주는 추가적인 작업이 필요했고 위처럼 데코레이터를 생성하여 사용하는것으로 해결했다.

전체 소스 : https://github.com/teamhide/fastapi-boilerplate