본문으로 바로가기
반응형

개요

이전 포스팅(https://hides.tistory.com/1101)에서 SQLAlchemy 1.4부터 지원하는 AsyncSession을 사용하여 비동기 세션을 적용하는 방법에 대해 다뤄봤었다. 관련하여 테스트를 진행하던 도중 문제점을 하나 발견했고 나름 크리티컬하다고 볼 수있다고 생각하여 문제점에 대해 다뤄보고 해결방법에 대해 기술한다.

asyncio.current_task와 문제점

SQLAlchemy 공식문서에서 async_scoped_session() 메소드 관련 문서를 찾아보면 위와 같은 글을 찾을 수 있다. 주목할점은 scopefunc인자에 asyncio의 current_task를 넣어준다는 점이다. scoped_session()의 scopefunc인자에 값을 대입하는 경우 registry라는 딕셔너리를 통해 세션을 관리하게 된다. 이 때 scopefunc에 인자로 넣어준 함수의 리턴값을 registry 딕셔너리의 키값으로 대입하여 사용할 세션을 결정한다.

위 문서를 보면 scopefunc에 asyncio에서 제공하는 current_task를 넣으라고 나와있다. current_task는 현재 실행중인 Task인스턴스를 반환하는 메소드이다. 따라서 세션이 사용될 때 각 컨텍스트에 맞는 태스크값을 통해 세션을 가져옴으로써 Context-local을 보장한다. current_task를 단순 출력해보면 아래와 같은 내용을 확인할 수 있다.

<Task pending name='Task-3' coro=<RequestResponseCycle.run_asgi() running at /Users/hide/.local/share/virtualenvs/fastapi-boilerplate-toX-v08U/lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py:372> cb=[set.discard()]>

코루틴은 싱글 스레드에서 여러개의 Context 별로 코드가 실행되는 형태이기에 위처럼 현재 태스크에 맞게 세션을 사용하도록 설정해준다면 별다른 문제가 없을 것이라 생각했었다. 그런데 여기서 asyncio.gather()를 통해 한번에 여러개의 코드를 실행한다면 어떻게 될까?

class UserService:
    async def test(self):
        import asyncio
        print(asyncio.current_task())

먼저 test()라는 메소드를 하나 생성하고 현재 태스크를 찍도록 만들었다.

async def home():
    import asyncio
    print(asyncio.current_task())
    from app.services.user import UserService
    await asyncio.gather(
        UserService().test(),
        UserService().test(),
    )

그리고 라우터단에서 현재 태스크를 한번 찍어주고 gather()를 통해 위에서 생성한 test() 메소드 2개를 동시에 실행시켜줬다.

<Task pending name='Task-3' coro=<RequestResponseCycle.run_asgi() running at /Users/hide/.local/share/virtualenvs/fastapi-boilerplate-toX-v08U/lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py:372> cb=[set.discard()]>
<Task pending name='Task-4' coro=<UserService.test() running at /Users/hide/fastapi-boilerplate/app/services/user.py:66> cb=[gather.<locals>._done_callback() at /Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py:769]>
<Task pending name='Task-5' coro=<UserService.test() running at /Users/hide/fastapi-boilerplate/app/services/user.py:66> cb=[gather.<locals>._done_callback() at /Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py:769]>

첫 번째 라인이 라우터에서 실행한 태스크이고 나머지 라인이 test() 메소드에서 출력한 현재의 태스크이다. 출력물을 보면 알겠지만 동일한 태스크가 아닌 새로운 태스크가 생성되어 돌아가게 된다. 

여기서 문제가 하나 발생한다. 현재 작업을 하나의 트랜잭션으로 묶기 위해 @Transactional() 이라는 데코레이터를 하나 만들어서 사용하고 있다.

class Transactional:
    def __init__(self, propagation: Propagation = Propagation.REQUIRED):
        self.propagation = propagation

    def __call__(self, function):
        @wraps(function)
        async def decorator(*args, **kwargs):
            try:
                result = await function(*args, **kwargs)
                await session.commit()
            except Exception as e:
                await session.rollback()
                raise e
            finally:
                await session.remove()

            return result

        return decorator

데코레이터는 위와 같이 생겼으며 코드를 보면 알겠지만 하나의 메소드를 데코레이터로 감싼 후 메소드의 실행이 종료된 이후에 commit()과 같은 Database persist 작업을 진행해주는 역할을 한다. 

@Transactional()
async def exception_add(self):
    import asyncio

    print(asyncio.current_task())
    user = User(email="email", password="password1", nickname="nickname")
    session.add(user)
    await asyncio.sleep(3)
    raise DuplicateEmailOrNicknameException

@Transactional()
async def add(self):
    import asyncio

    print(asyncio.current_task())
    user = User(email="email", password="password1", nickname="nickname")
    session.add(user)

먼저 위와 같은 코드를 작성해준다. exception_add()는 세션에 모델을 하나 추가하고 3초 후 예외를 터뜨리는 역할을 한다. add()는 단순히 세션에 모델만 추가한다.

async def home():
    import asyncio
    print(asyncio.current_task())
    from app.services.user import UserService
    await asyncio.gather(
        UserService().exception_add(),
        UserService().add(),
    )
    return {"status": True}

이렇게 만든 두개의 메소드를 gather()를 통해 실행시켜주면 결과는 다음과 같다.

<Task pending name='Task-3' coro=<RequestResponseCycle.run_asgi() running at /Users/hide/.local/share/virtualenvs/fastapi-boilerplate-toX-v08U/lib/python3.8/site-packages/uvicorn/protocols/http/httptools_impl.py:372> cb=[set.discard()]>
<Task pending name='Task-4' coro=<UserService.exception_add() running at /Users/hide/fastapi-boilerplate/core/db/transactional.py:20> cb=[gather.<locals>._done_callback() at /Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py:769]>
<Task pending name='Task-5' coro=<UserService.add() running at /Users/hide/fastapi-boilerplate/core/db/transactional.py:20> cb=[gather.<locals>._done_callback() at /Applications/Xcode.app/Contents/Developer/Library/Frameworks/Python3.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py:769]>
INFO:     127.0.0.1:59173 - "GET / HTTP/1.1" 400 Bad Request

위에서 설명한 것 처럼 모두 각기 다른 태스크를 통해 실행된다. 여기서 데이터베이스를 한번 확인해보면,

mysql> select * from users;
+-----+-----------+-------+----------+----------+---------------------+---------------------+
| id  | password  | email | nickname | is_admin | created_at          | updated_at          |
+-----+-----------+-------+----------+----------+---------------------+---------------------+
| 123 | password1 | email | nickname |        0 | 2022-03-21 16:12:22 | 2022-03-21 16:12:22 |
+-----+-----------+-------+----------+----------+---------------------+---------------------+
1 row in set (0.00 sec)

하나의 로우가 추가되어있다. Task-4(exception_add())에서 예외가 발생하여 전체적으로 서버의 응답은 400 Bad Request로 나갔지만 Task-5(add())는 정상적으로 실행이 되었기 때문이다. @Transactional() 데코레이터는 세션을 통한 DB persist 작업을 하나의 트랜잭션으로 묶기 위한 것인데 이렇게 된다면 해당 데코레이터가 의도적으로 동작할 수가 없다. 처음에는 gather()를 통해 동시에 작업을 실행시켜도 exception propagation으로 인해 문제가 없을 것이라 생각했다. 하지만 그렇게 동작하지 않았고 공식 문서를 통해 살펴보니 아래와 같은 내용을 찾을 수 있었다.

지금까지 구현한 내용을 통해 생각했고 바라던 내용을 그림으로 나타내자면 아래와 같다.

하나의 Request가 들어온 후 내부 비즈니스 로직을 거치고 기타 작업들을 진행하는 모든 과정을 하나의 Context로 담고 싶었고 그렇게 동작한다고 생각했다.

하지만 결과적으로 위와 같이 동작했다. gather()를 통해 여러개의 작업을 한번에 실행하면 내부적으로 새로운 태스크를 생성하여 실행되기 때문에 Task-3, Task-4, Task-5 각기 다른 Context를 가진다. 따라서 asyncio.current_task()를 scopefunc에 넣어준다면 Context-local을 보장할 수 없다.

async def home():
    import asyncio
    print(asyncio.current_task())
    from app.services.user import UserService
    task1 = asyncio.create_task(UserService().exception_add())
    task2 = asyncio.create_task(UserService().add())
    await task1
    await task2
    return {"status": True}

참고로 말하자면 위처럼 create_task를 통해 작업해도 동일하게 동작한다.

해결 방법

결과만 말하자면, 1.4 비동기 지원 전에 하던 방법으로 세션을 다루기로 했다. 하나의 Request가 들어올 때 uuid4 난수값을 생성하고 미들웨어에서 ContextVar에 해당 값을 세팅해준다. 그리고 scopefunc에 ContextVar에서 값을 꺼내어 세션을 결정하게 만드는 형태이다. 이 방법을 사용하면 하나의 Request가 들어올 때 새로운 세션을 생성하고 해당 Request에서는 모두 동일한 세션을 사용한다. 관련 코드는 아래 소스를 참고한다.

https://github.com/teamhide/fastapi-boilerplate/blob/master/core/db/session.py

https://github.com/teamhide/fastapi-boilerplate/blob/master/core/fastapi/middlewares/sqlalchemy.py

생각해볼 점

이번 문제는 위에서 설명했던 것 처럼 @Transactional 데코레이터를 통해 하나의 트랜잭션으로 묶는 형태로 구성하여 사용하고 있기 때문에 발생하는 문제이다. 글을 작성하고 나니 하나의 Request를 하나의 Context로 여기는 작업이 과연 필요할까라는 생각이 들었다. 각각 @Transactional 데코레이터가 붙은 메소드를 동시에 실행할 때 하나가 실패하면 다른 하나도 실패로 여기는게 과연 맞을까? 그리고 그런 작업이 과연 많을까 라는 생각 또한 든다. 내부적으로 Domain Model Pattern이 아닌 Transaction Script Pattern을 사용하고 있기에 이런 의문점이 드는 것 같기도 한다. 이 부분은 추가적인 고찰이 필요할 것으로 보인다.

반응형