SQLAlchemy와 AsyncIO를 사용할 때 발생한 문제점
개요
SQLAlchemy 1.4 비동기 세션을 통해 작업을 진행하던 중 아래와 같은 에러가 발생하였다.
if self.session.twophase and self._parent is None:
AttributeError: 'NoneType' object has no attribute 'twophase'
이에 대해 조사한 내용을 간단하게 정리한다.
환경
PostgreSQL
SQLAlchemy 1.4.42
FastAPI 0.87.0
asyncpg 0.26.0
사전 지식
본 코드에서는 아래와 같은 형태로 비동기 세션을 사용하기 위한 환경을 구성하였다.
SQLAlchemyMiddleware
import uuid4
class SQLAlchemyMiddleware(BaseHTTPMiddleware):
def __init__(self, app: ASGIApp):
super().__init__(app)
async def dispatch(
self,
request: Request,
call_next: RequestResponseEndpoint,
) -> Response:
request_id = str(uuid4())
context = set_session_context(request_id)
try:
return await call_next(request)
except Exception as e:
raise e
finally:
await session.remove()
reset_session_context(context)
미들웨어를 통해 Request가 들어올 때 unique한 값을 생성하여 컨텍스트 변수에 설정한다.
AsyncSession
from contextvars import ContextVar, Token
session_context: ContextVar[str] = ContextVar("session_context", default="")
def get_session_context() -> str:
return session_context.get()
async_session_factory = sessionmaker(
class_=AsyncSession,
sync_session_class=RoutingSession,
)
session: async_scoped_session = async_scoped_session(
session_factory=async_session_factory,
scopefunc=get_session_context,
)
scopefunc에 contextvar에서 컨텍스트 변수를 가져오게 함으로써 Request per session을 구성해줬다. 이에 관한 자세한 내용은 본 포스팅의 주제가 아니므로 스킵한다. 참고: https://www.hides.kr/1103
@Transactional
class Transactional:
def __call__(
self,
func: Callable[P, Awaitable[T]],
) -> Callable[P, Coroutine[Any, Any, T]]:
@wraps(func)
async def _transactional(*args: P.args, **kwargs: P.kwargs) -> T:
try:
result = await func(*args, **kwargs)
await session.commit()
except Exception as e:
await session.rollback()
raise e
return result
return _transactional
특정 메소드에서 사용하는 여러개의 DB작업을 하나의 트랜잭션으로 묶어주기위해 사용하는 데코레이터이다.
문제 상황 재현
@Transactional()
async def test():
stmt = select(User)
(await session.execute(stmt)).scalars().first()
async def main():
await asyncio.gather(
test(),
test(),
test(),
)
asyncio.gather() 메소드를 사용하면 여러개의 작업을 concurrency하게 실행시킬 수 있다. 위 코드를 실행하면 개요에서 설명한 AttributeError: 'NoneType' object has no attribute 'twophase' 오류를 만나볼 수 있다.
사전 지식 섹션을 통해 알겠지만 현재 구성은 Request per session이다. 하지만 하나의 세션에서 모두 동일한 DB커넥션을 사용하는것이 아니라 각 커넥션이 block될 때 마다 새로운 커넥션을 풀에서 꺼내와서(또는 새로 생성해서) 사용하게 된다. 위 코드를 실행한 이후 데이터베이스에서 커넥션 리스트를 확인해보면,
사진처럼 총 3개의 커넥션을 맺고 있는 모습을 살펴볼 수 있다.
문제 원인
원인은 나름 간단했다. https://docs.sqlalchemy.org/en/20/changelog/whatsnew_20.html 의 Session raises proactively when illegal concurrent or reentrant access is detected¶ 섹션을 살펴보면 아래와 같은 문장이 나온다.
One error that’s been known to occur when a Session is used in multiple threads simultaneously is AttributeError: 'NoneType' object has no attribute 'twophase', which is completely cryptic. This error occurs when a thread calls Session.commit() which internally invokes the SessionTransaction.close() method to end the transactional context, at the same time that another thread is in progress running a query as from Session.execute(). Within Session.execute(), the internal method that acquires a database connection for the current transaction first begins by asserting that the session is “active”, but after this assertion passes, the concurrent call to Session.close() interferes with this state which leads to the undefined condition above.
한마디로 요약하면 session.commit()을 수행하면 내부적으로 현재 컨텍스트의 트랜잭션을 종료하기 위해 SessionTransaction.close() 메소드를 실행하는데, 그와 동시에 session.execute()가 실행될 때 문제가 발생한다고 한다. gather()를 통해 실행시키면 새로운 Task가 생성되게되고, 해당 Task별로 동일 세션이지만 다른 커넥션을 가져간다고 생각했는데 어떠한 문제점이 있던걸까?
검증
import random
async def test1(name):
stmt = select(User)
(await session.execute(stmt)).scalars().first()
conn = await session.connection()
print(f"{name} | conn:: {conn}")
await asyncio.sleep(random.randrange(1, 3))
stmt = select(User)
(await session.execute(stmt)).scalars().first()
conn = await session.connection()
print(f"{name} | conn:: {conn}")
async def main():
await asyncio.gather(
test1("first"),
test1("second"),
test1("third"),
)
위 코드를 보면 조회 쿼리를 하나 실행하고 현재 커넥션의 정보를 출력한 후 랜덤한 초동안 sleep한다. 이는 일부러 제어권을 이벤트 루프에 반납해주기 위함이다. 그리고 다시 동일하게 조회 쿼리를 실행시키고 사용한 커넥션의 정보를 출력한다. 이렇게 된다면 첫 번째 사용한 커넥션의 값과 두 번째 사용한 커넥션의 값이 동일한지, 동일하지 않은지 확인할 수 있다. 실행결과는 아래와 같다.
second | conn:: <sqlalchemy.ext.asyncio.engine.AsyncConnection object at 0x111afef70>
first | conn:: <sqlalchemy.ext.asyncio.engine.AsyncConnection object at 0x111aff510>
third | conn:: <sqlalchemy.ext.asyncio.engine.AsyncConnection object at 0x111affab0>
first | conn:: <sqlalchemy.ext.asyncio.engine.AsyncConnection object at 0x111affab0>
second | conn:: <sqlalchemy.ext.asyncio.engine.AsyncConnection object at 0x111affab0>
third | conn:: <sqlalchemy.ext.asyncio.engine.AsyncConnection object at 0x111affab0>
"""
0x111afef70 - second
0x111aff510 - first
0x111affab0 - first, second, third
"""
결과를 통해 알 수 있겠지만 각 메소드는 처음 사용한 커넥션과 이벤트 루프에서 제어권을 다시 넘겨받은 후 사용한 커넥션이 다르다. 문제 원인 섹션에서 설명한 것 처럼, 커넥션을 공유하기에 특정 커넥션에서는 이미 트랜잭션을 닫았지만 다른 작업에서 동일한 커넥션을 사용하여 execute() 등의 메소드를 통해 쿼리를 실행하는 문제가 발생한다.
정리
비동기는 상당히 다루기 까다롭다. 특히 파이썬의 경우 비동기에 대한 지원이 예전보다 많아지고 있지만 아직도 미흡한 부분이 많고 관련 문서들도 없다. 따라서 개발자 입장에서도 이해하기가 어렵다.
위 코드에서는 자세하게 쓰지 않았지만 내부적으로 reader/writer분기를 위해 Session의 get_bind() 메소드를 오버라이드하여 구현한 RoutingSession을 사용하고 있다. 만약 특정 메소드에서 reader로 붙는 코드와 writer로 붙는 코드가 동시에 존재하는 경우 이를 하나의 트랜잭션으로 묶어주는 건 논리적으로 이상하다. 그리고 여러개의 커넥션을 코드 상 트랜잭션으로 묶는 행위 또한 올바르지 않아보이기도 한다.
하지만 트랜잭션은 중요하다. ACID특성은 굉장히 중요한 부분이기에 쉽게 넘어갈 수 없는 부분이다. 현재로써는 특별한 해결방법이 보이지는 않는다. gather()의 사용을 지양하면 해당 문제는 발생하지 않기도 한다. 하지만 성능적인 이점을 가져갈 수 있는 gather()를 사용하지 않는것도 조금 곤란하다. 조금 더 연구를 해보고, 해결방법이 도출된다면 본 포스팅에 내용을 추가할 예정이다.