FastAPI SQLAlchemy 연동하며 발생한 문제 정리
개요
기존과 다른 패러다임을 가진 비동기 프레임워크 FastAPI와 SQLAlchemy를 연동하는 과정에서 발생했던 문제들과 어떠한 형태로 해결했는지에 대해 다뤄본다.
사전 지식
SQLAlchemy는 scoped_session()
함수를 활용하여 Thread-local한 세션을 생성할 수 있다. 이 때 scopefunc
라는 인자에 특정한 함수를 콜백 형태로 넣어주게 된다면 세션 사용 시 아래와 같은 로직을 통해 세션을 관리하게 된다.
session.registry.registry
에 키/밸류 쌍의 딕셔너리를 생성한다.scopefunc
에 넣어준 함수를 통해 키를 결정하고(가져오고) 해당 값을 통해session.registry.registry
에 접근하여 사용할 세션을 가져와서 사용한다.- 추후
session.remove()
시에도scopefunc
에 넣어준 함수를 통해 키를 가져오고session.registry.registry
에서 삭제한다.
요약하자면, scopefunc
에 인자를 넣어준다면 앞으로 세션 사용 시 registry
를 통해 세션을 가져오게 되기때문에 사용자가 보다 직접적으로 세션관리에 관여하겠다는 뜻으로 생각할 수 있겠다.
def __init__(self, session_factory, scopefunc=None):
"""Construct a new :class:`.scoped_session`.
:param session_factory: a factory to create new :class:`.Session`
instances. This is usually, but not necessarily, an instance
of :class:`.sessionmaker`.
:param scopefunc: optional function which defines
the current scope. If not passed, the :class:`.scoped_session`
object assumes "thread-local" scope, and will use
a Python ``threading.local()`` in order to maintain the current
:class:`.Session`. If passed, the function should return
a hashable token; this token will be used as the key in a
dictionary in order to store and retrieve the current
:class:`.Session`.
"""
self.session_factory = session_factory
if scopefunc:
self.registry = ScopedRegistry(session_factory, scopefunc)
else:
self.registry = ThreadLocalRegistry(session_factory)
위 코드는 scoped_session()
의 __init__
부분이다. 위에 나와있듯이 scopefunc
에 인자가 들어온다면, 해당 함수의 리턴값을 딕셔너리(registry)의 키값으로 사용하여 현재 사용할 세션을 가져온다고 쓰여있다.
def __call__(self):
key = self.scopefunc()
try:
return self.registry[key]
except KeyError:
return self.registry.setdefault(key, self.createfunc())
위 코드는 ScopedRegistry
클래스의 __call__
부분이다. scopefunc
를 통해 key값을 정하고 해당 값을 통해 내부 registry 딕셔너리에 접근하여 세션을 꺼내온다.
문제 및 해결 방법
첫 번째 문제
def get_request_id():
raise NotImplementedError
engine = create_engine(config.DB_URL, pool_recycle=3600, echo=True)
session: Union[Session, scoped_session] = scoped_session(
sessionmaker(autocommit=True, autoflush=False, bind=engine),
scopefunc=get_session_id,
)
위와 같이 세션을 생성하고 scopefunc
에 더미 함수를 하나 넣어준다.
class SQLAlchemyMiddleware(BaseHTTPMiddleware):
def __init__(self, app):
super().__init__(app)
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint,
):
request_id = str(uuid4())
def get_request_id():
return request_id
session.registry.scopefunc = get_request_id
try:
response = await call_next(request)
except Exception as e:
session.rollback()
raise e
finally:
session.remove()
return response
FastAPI의 미들웨어를 거칠 때 dispatch()
함수 내부에서 각 request에 맞는 request_id를 생성하고 해당 값을 리턴하는 get_request_id()
함수를 생성한다. 그리고 해당 함수를 scopefunc
에 콜백으로 넣어준다.
@home_router.get("/")
async def home():
session.query(User).first()
print(f"{session.registry.registry} / {session.registry.scopefunc()}")
await asyncio.sleep(4)
return {"status": True}
테스트를 진행하기 위해 간단한 엔드포인트를 하나 생성한다. registry
에 담긴 세션들과 현재 scopefunc
를 통해 얻은 request_id를 출력하고 4초간 출력한다. 이를 연속으로 2번 요청하면 다음과 같다.
{'1b4cb18b-d40a-4a2b-8c0b-1aac8f0285b4': <sqlalchemy.orm.session.Session object at 0x107372820>} / 1b4cb18b-d40a-4a2b-8c0b-1aac8f0285b4
{'1b4cb18b-d40a-4a2b-8c0b-1aac8f0285b4': <sqlalchemy.orm.session.Session object at 0x107372820>, '429f8a64-4023-40bd-b631-36e41daaef58': <sqlalchemy.orm.session.Session object at 0x107387220>} / 429f8a64-4023-40bd-b631-36e41daaef58
각 리퀘스트에 해당하는 세션들이 별도로 담겼다. 여기까지는 큰 문제가 없어보인다. 한번 더 요청을 날려보자.
{'1b4cb18b-d40a-4a2b-8c0b-1aac8f0285b4': <sqlalchemy.orm.session.Session object at 0x107372820>, 'b20df67b-7c18-4759-ad98-8dc46a1b6bfd': <sqlalchemy.orm.session.Session object at 0x107372070>} / b20df67b-7c18-4759-ad98-8dc46a1b6bfd
가장 최초 요청 시 생성한 세션이 삭제되지 않았다. 왜 이런걸까? 코루틴의 기본이 되는 Concurrency의 특성에 대해 잘 알고 있다면 아마 쉽게 이해가 갈 것이다. 간단하게 요약하자면 아래와 같다.
- 첫 번째 요청이 들어올 때 미들웨어를 거치고 세션을 생성한다. 그 다음 4초간 sleep한다.
- 두 번째 요청이 들어올 때도 마찬가지로 미들웨어를 거치고 세션을 생성한다. 세션의 scopefunc를 새롭게 덮어쓴다.
- 첫 번째 요청의 sleep이 끝나고 최종적으로
session.remove()
를 통해 세션을 지우려고하지만 2번의 요청에서scopefunc
를 새롭게 덮어썼기 때문에 첫 번째 세션의 키값을 가져오지 못한다. 고로 삭제할 수 없다.
부록으로 session.remove()
의 소스를 확인해본다.
def remove(self):
if self.registry.has():
self.registry().close()
self.registry.clear()
def has(self):
return hasattr(self.registry, "value")
해결 방법
위 문제는 ContextVar
를 사용하여 해결할 수 있다.
session_context: ContextVar[str] = ContextVar("session_context")
class SQLAlchemyMiddleware(BaseHTTPMiddleware):
def __init__(self, app):
super().__init__(app)
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint,
):
request_id = str(uuid4())
session_context.set(request_id)
def get_request_id():
return session_context.get()
session.registry.scopefunc = get_request_id
try:
response = await call_next(request)
except Exception as e:
session.rollback()
raise e
finally:
session.remove()
return response
그리고 마찬가지로 4초 sleep하는 코드를 돌려보면 아래와 같다.
# 최초 2번 요청
{'cc316a7e-3617-477e-9936-672b9938e6b3': <sqlalchemy.orm.session.Session object at 0x1060817c0>} / cc316a7e-3617-477e-9936-672b9938e6b3
{'cc316a7e-3617-477e-9936-672b9938e6b3': <sqlalchemy.orm.session.Session object at 0x1060817c0>, '3ec8900d-6aeb-4d9d-b8f0-5f638fd16525': <sqlalchemy.orm.session.Session object at 0x106094100>} / 3ec8900d-6aeb-4d9d-b8f0-5f638fd16525
# 이후 요청
{'bf16c71e-3c07-43e5-b9ed-69f78de1e813': <sqlalchemy.orm.session.Session object at 0x106081160>} / bf16c71e-3c07-43e5-b9ed-69f78de1e813
모든 세션이 정상적으로 삭제되고 또 다른 새로운 세션이 생성되었음을 확인할 수 있다.
위처럼 각 요청에 해당하는 request_id를 ContextVar
에 담아주고 해당 값을 사용하는 형태로 수정한다면 Context-local을 보장할 수 있다. 코루틴은 하나의 스레드에서 Concurrency하게 돌아가기 때문에 작업들 간 서로 제어권을 주고받으며 동작한다. 이는 A라는 작업이 동작하다가 완료되지 않았음에도 불구하고 B라는 작업이 추가적으로 실행될 수 있음을 뜻한다. 이 때 ContextVar
를 사용하면 A작업에서 session_context
변수에 접근할때와 B작업에서 접근할 때 각 Context별로 다른 값을 가질 수 있음을 말한다. Thread-local에 대해 생각해보면 쉽게 이해할 수 있으리라 생각된다.
따라서 위처럼 ContextVar
에 각 요청에 해당하는 request_id를 담아주고 해당 값을 사용한다면 scopefunc가 덮어씌워져도 각 요청에 맞는 id를 가져올 수 있게된다.
두 번째 문제
사실 이 문제는 첫 번째 문제와 동일한데, FastAPI에서 제공하는 기능들과 연동하다가 발생한 문제라고 보는게 맞을 것 같다. FastAPI에는 클라이언트에게 응답을 뿌려준 후 특정 작업을 수행할 수 있도록 해주는 BackgroundTasks
라는 기능이 존재한다. 만약 BackgroundTasks
를 통해 수행하는 작업이 세션을 필요로 하는 작업이라면 어떻게 해줘야할까? 현재 구조는 미들웨어에서 세션에 관한 설정을 진행해주기 때문에 바로 사용이 불가능한 형태이다. 따라서 아래와 같은 데코레이터를 만들어줬다.
def create_session(func):
async def wrapper(*args, **kwargs):
is_rollback = False
request_id = str(uuid4())
def get_request_id():
return request_id
session.registry.scopefunc = get_request_id
try:
await func(*args, **kwargs)
추가적으로 다른 코드가 하나 있다. 첫 번째 문제에서는 scopefunc에 콜백 함수를 미들웨어의 dispatch()
함수에서 설정해줬지만, 이번에는 __init__
함수에서 설정해줬다. 여기서 문제가 발생했다.
- 미들웨어의 __init__ 은 최초 서버가 뜰때만 실행된다. 요청이 들어올 때 마다 타는 로직은 dispatch() 에 설정해줘야한다.
- 그런데 현재 미들웨어에서는 __init__ 에서 설정해주고 있다. 그 이후 create_session 데코레이터에서 다시 한번 scopefunc에 콜백 함수를 교체한다. 따라서 미들웨어에서 설정한 get_request_id()함수가 실행되는게 아닌 create_session 데코레이터의 get_request_id() 가 실행된다.
따라서 첫 번째 문제에서 명시했던 상황과 정확히 동일한 상황이 발생됐다.
해결 방법
첫 번째 문제와 동일하게 ContextVar
를 사용하면 된다. 또한 미들웨어의 __init__
에서 scopefunc
를 교체하지말고 dispatch()
에서 교체한다.
세 번째 문제
현재 세션을 초기화하고 설정해주는 코드를 보면 알겠지만, 미들웨어를 거치지 않고서는 사용할 수 없다. 세션은 미들웨어를 통해 설정되기 때문이다. 따라서 테스트 코드를 작성하거나 Background task처럼 미들웨어를 통하지 않는 경우에는 세션을 사용할 수 없다.
해결 방법
def create_session(func):
async def _create_session(*args, **kwargs):
session_id = str(uuid4())
context = set_session_context(session_id=session_id)
try:
await func(*args, **kwargs)
except Exception as e:
session.rollback()
raise e
finally:
session.remove()
reset_session_context(context=context)
return _create_session
따라서 독립적으로 세션을 설정해주는 추가적인 작업이 필요했고 위처럼 데코레이터를 생성하여 사용하는것으로 해결했다.