8강. suspending function
suspend 함수는 suspend가 붙은 다른 함수를 호출할 수 있다. 그리고 suspend 함수는 코루틴이 중지되었다가 재개될 수 있는 지점을 의미한다. 중요한건 중지가 될수도 있고 안될수도 있다는 것이다.
fun main(): Unit = runBlocking {
launch {
a()
b()
}
launch {
c()
}
}
suspend fun a() {
printWithThread("A")
}
suspend fun b() {
printWithThread("B")
}
suspend fun c() {
printWithThread("C")
}
// 결과
Thread: main, A
Thread: main, B
Thread: main, C
suspend 함수를 중단 지점이라고만 이해하면 마치 위 코드는 a가 실행됐다가 c가 실행됐다가 b가 실행되는 것 처럼 중단이 일어날 것 같지만 실제 코드를 실행해보면 중단없이 첫 번째 코루틴에 있는 a, b가 실행되고 두 번째 코루틴에 있는 c가 실행되는 모습을 확인할 수 있다. 따라서 suspend 함수는 중지되었다가 재개 될 수 있는 지점이라고 이해하는게 좋다.
fun main(): Unit = runBlocking {
val result1 = async {
call1()
}
val result2 = async {
call2(result1.await())
}
printWithThreads(result2.await())
}
fun call1(): Int {
Thread.sleep(1_000L)
return 100
}
fun call2(num: Int): Int {
Thread.sleep(1_000L)
return num * 2
}
// 결과
Thread: main, 200
main 함수는 runBlocking을 통해 코루틴으로 이루어져있고 내부에는 async로 인한 코루틴 두개가 존재한다. call1(), call2()는 외부 IO호출을 한다고 가정해보자. (원래는 Thread.sleep 자체가 Non-blocking 상황에서 스레드 자체를 막기 때문에 사용하면 안되지만 1초의 대기를 위해 넣었다) 그리고 해당 함수들은 suspend fun이 아니라 일반 fun으로 작성되어있다.
코드를 보면 첫 번째 async를 사용하여 call1()의 결과를 얻어와 call2()를 호출하고 그 결과를 출력했다. 하지만 여기에는 한가지 아쉬운점이 존재하는데 바로 Deferred에 의존하고 있다는 것이다.
async를 사용하면 Job인터페이스 하위 타입인 Deferred가 나온다. 만약 코루틴 async를 쓰는게 아니라 자바에 있는 CompletableFuture를 쓴다거나 Reactor와 같은 다른 비동기 라이브러리 코드로 갈아끼워야 한다면 그 라이브러리 변경에 대한 여파가 메인 함수에까지 미치게 된다. 그래서 이럴 때 suspend fun을 활용해볼 수 있다.
fun main(): Unit = runBlocking {
val result1: Int = call1()
val result2: Int = call2(result1)
printWithThreads(result2)
}
suspend fun call1(): Int {
return CoroutineScope(Dispatchers.Default).async {
Thread.sleep(1_000L)
100
}.await()
}
suspend fun call2(num: Int): Int {
return CompletableFuture.supplyAsync {
Thread.sleep(1_000L)
100
}.await()
}
먼저 메인함수에서 async를 사용해서 호출했던 부분을 직접 호출로 변경한다. 그리고 call1(), call2()를 suspend fun으로 변경했다. call1() 같은 경우 CoroutineScope를 통해 새로운 영역을 만들고 1초 대기 후 100을 반환하도록 수정한다.
call2()는 CompletableFuture를 사용하여 마찬가지로 1초 대기 후 100을 반환하도록 수정했다. 그리고 이를 await()하는 모습을 볼 수 있는데 이는 코루틴이 만들어준 것이다.
public suspend fun <T> CompletionStage<T>.await(): T {
코루틴이 CompletableFuture의 상위 타입인 CompletionStage의 확장함수로써 만들어둔 것이고 이 코루틴은 Future, Reactor등 다양한 비동기 라이브러리에 대한 일종의 어댑터를 지원하고 있다.
결과적으로 이렇게 되면 메인함수 입장에서는 내부 구현은 모르고 단순히 suspend function을 통해 중단될수도 있는 지점을 호출하게 되고 의존은 순수한 코틀린 타입에만 하게 된다.
그렇게 되면 call1()을 구현한 측에서는 코루틴이 async를 사용하건 자바의 CompletableFuture를 사용하건 아니면 또다른 비동기 라이브러리 코드를 사용하건 상관이 없게 된다.
추가적인 suspend 함수들
coroutineScope
launch/async와 같은 경우 만들어진 코드가 바로 실행되지 않았지만 coroutineScope는 추가적인 코루틴을 만들고 주어진 함수 블록이 바로 실행된다. 만들어진 코루틴이 모두 완료되면 다음 코드로 넘어간다.
fun main(): Unit = runBlocking {
printWithThreads("START")
printWithThreads(calculateResult())
printWithThreads("END")
}
suspend fun calculateResult(): Int = coroutineScope {
val num1 = async {
delay(1_000L)
10
}
val num2 = async {
delay(1_000L)
20
}
num1.await() + num2.await()
}
// 결과
Thread: main, START
Thread: main, 30
Thread: main, END
coroutineScope의 특이한점으로는 메인 함수에 launch나 async를 쓰는 경우 START, END가 출력되고 그 이후 launch/async 코드가 실행됐었다. 하지만 코루틴 스코프는 사용된 즉시 코루틴 스코프의 코드가 실행되기 때문에 위 코드는 다른 결과가 출력된 것이다. 또한 코루틴이 모두 완료된 이후 다음 코드로 넘어가기에 START -> 30 -> END 순으로 출력되었다.
coroutineScope는 위처럼 또다른 코루틴들을 동시에 여러개 사용하고 싶은데 그러한 기능을 또 다른 함수로 분리하고 싶을 때 사용된다.
withContext
coroutineScope와 기본적으로 유사한데, context에 변화를 주는 기능이 추가적으로 존재한다.
suspend fun calculateResult(): Int = withContext(Dispatchers.Default) {
val num1 = async {
delay(1_000L)
10
}
val num2 = async {
delay(1_000L)
20
}
num1.await() + num2.await()
}
// 결과
Thread: main, START
Thread: main, 30
Thread: main, END
예를 들어 기존 코드에서 coroutineScope를 withContext로 변경하여 실행할수도 있다. 이는 컨텍스트에 대해 추가적인 원소를 덮어쓰고 싶을 때 사용하는 기능이다.
withTimeout / withTimeoutOrNull
coroutineScope와 기본적으로 유사하다. 주어진 시간안에 새로 생긴 코루틴이 완료되어야 한다. 만약 주어진 시간안에 코루틴이 완료되지 못하면 withTimeout은 예외를 던지고 withTimeoutOrNull은 null을 반환하게 된다.
fun main(): Unit = runBlocking {
val result = withTimeout(1_000L) {
delay(1_500L)
10 + 20
}
printWithThreads(result)
}
// 결과
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms
예를 들어 위처럼 1초의 타임아웃을 두고 1.5초 대기하는 코루틴을 실행하면 TimeoutCancellationException 예외를 던지고 종료된다.
fun main(): Unit = runBlocking {
val result: Int? = withTimeoutOrNull(1_000L) {
delay(1_500L)
10 + 20
}
printWithThreads(result)
}
// 결과
Thread: main, null
withTimeoutOrNull은 예외가 발생하지않고 null을 리턴한다.
9강. 코루틴과 Continuation
class UserService {
private val userProfileRepository = UserProfileRepository()
private val userImageRepository = UserImageRepository()
suspend fun findUser(userId: Long): UserDto {
// 0단계 - 초기 시작
println("프로필 조회")
val profile = userProfileRepository.findProfile(userId)
// 1단계 - 1차 중단 후 재시작
println("이미지 조회")
val image = userImageRepository.findImage(profile)
// 2단계 - 2차 중단 후 재시작
return UserDto(profile, image)
}
}
class UserProfileRepository {
suspend fun findProfile(userId: Long): Profile {
delay(100L)
return Profile()
}
}
class UserImageRepository {
suspend fun findImage(profile: Profile): Image {
delay(100L)
return Image()
}
}
class Profile
class Image
data class UserDto(val profile: Profile, val image: Image)
위 코드를 대상으로 suspend 함수인 findUser가 어떻게 변화하는지 살펴보자. 이제 위에 표시한 각 단계를 라벨이라는 것으로 표시할 것이다.
interface Continuation {
}
그리고 라벨을 보관하기 위한 Continuation이란 인터페이스를 생성한다.
suspend fun findUser(userId: Long): UserDto {
val sm = object : Continuation {
var label = 0
}
다음으로 Continuation을 구현한 익명 클래스를 하나 생성하고 라벨을 0으로 설정한다.
suspend fun findUser(userId: Long): UserDto {
val sm = object : Continuation {
var label = 0
var profile: Profile? = null
var image: Image? = null
}
when (sm.label) {
0 -> { // 0단계 - 초기 시작
println("프로필 조회")
val profile = userProfileRepository.findProfile(userId)
}
1 -> { // 1단계 - 1차 중단 후 재시작
println("이미지 조회")
val image = userImageRepository.findImage(sm.profile!!)
}
2 -> { // 2단계 - 2차 중단 후 재시작
return UserDto(sm.profile!!, sm.image!!)
}
}
}
그리고 바로 위에서 생성한 sm을 사용하여 라벨의 값에 따라 실행되는 코드를 다르게 설정한다. 현재 상태로는 0번만 호출이 된다. 이제 0번 호출 후 1번이, 1번 이후에는 2번이 호출되도록 바꿔보자.
suspend fun findUser(userId: Long): UserDto {
val sm = object : Continuation {
var label = 0
var profile: Profile? = null
var image: Image? = null
}
when (sm.label) {
0 -> { // 0단계 - 초기 시작
println("프로필 조회")
sm.label = 1
val profile = userProfileRepository.findProfile(userId)
sm.profile = profile
}
1 -> { // 1단계 - 1차 중단 후 재시작
println("이미지 조회")
sm.label = 2
val image = userImageRepository.findImage(sm.profile!!)
sm.image = image
}
2 -> { // 2단계 - 2차 중단 후 재시작
return UserDto(sm.profile!!, sm.image!!)
}
}
}
간단하다. 각 단계별로 label값을 위처럼 바꿔주면 된다. 하지만 위 코드를 실행해도 0단계 -> 1단계 -> 2단계 -> 3단계가 자연스럽게 호출되지는 않는다. 왜냐하면 0단계가 종료되면 누군가는 다시 findUser를 불러줘야하는데 현재 그러고 있지 않기 때문이다.
여기서 추가적인 작업이 필요한데, 핵심은 Continuation을 지속적으로 전달하는 것이다. 예를 들어,
- findUser에 최초 진입하면 label = 0 Continuation 객체를 생성한다.
- findUser가 또다른 중단 함수(findProfile(), findImage())를 부르면 이 때 Continuation 마치 콜백처럼 같이 넘겨준다.
- findProfile은 자신의 코드가 끝나면 Continuation의 resumeWith() 함수를 실행해 resumeWith()가 다시 findUser를 부르게 한다.
- 라벨이 0에서 1로 바뀌고 Profile 정보가 세팅되면서 findUser로 돌아온다.
- 라벨 1번의 코드가 실행되며 Continuation의 resumeWith() 함수를 실행해 resumeWith()가 다시 findUser를 부르게 한다.
- 라벨이 1에서 2로 바뀌고 최종적으로 UserDto를 리턴한다.
핵심은 또다른 suspend 함수를 부를 때 Continuation을 같이 넘겨주고 해당 suspend 함수가 전달받은 Continuation을 이용해 다시 Caller 함수를 부를 수 있다는 것이다. 이 과정을 코드로 바꿔보면 다음과 같다.
suspend fun findUser(userId: Long, continuation: Continuation): UserDto {
val sm = object : Continuation {
suspend fun findProfile(userId: Long, continuation: Continuation): Profile {
delay(100L)
return Profile()
}
suspend fun findImage(profile: Profile, continuation: Continuation): Image {
delay(100L)
return Image()
}
먼저 각 suspend 함수에 Continuation을 인자로 받을 수 있도록 한다.
interface Continuation {
suspend fun resumeWith(data: Any?)
}
Continuation 인터페이스에 resumeWith() 메소드를 추가한다.
suspend fun findUser(userId: Long, continuation: Continuation): UserDto {
val sm = object : Continuation {
var label = 0
var profile: Profile? = null
var image: Image? = null
override suspend fun resumeWith(data: Any?) {
findUser(userId, this)
}
}
findUser 내부 Continuation을 생성하는 익명 객체를 오버라이드한다. 그리고 일종의 재귀함수처럼 다시 findUser를 부르도록 한다.
val profile = userProfileRepository.findProfile(userId, sm)
val image = userImageRepository.findImage(sm.profile!!, sm)
그리고 각 코드에서 Continuation 객체인 sm을 같이 넘겨주도록 수정한다.
suspend fun findProfile(userId: Long, continuation: Continuation) {
delay(100L)
continuation.resumeWith(Profile())
}
suspend fun findImage(profile: Profile, continuation: Continuation) {
delay(100L)
continuation.resumeWith(Image())
}
다음으로 각 메소드들의 리턴값을 직접 반환하는게 아니라 continuation의 resumeWith()를 통해 넘겨주도록 수정한다.
override suspend fun resumeWith(data: Any?) {
when (label) {
0 -> {
profile = data as Profile
label = 1
}
1 -> {
image = data as Image
label = 2
}
}
findUser(userId, this)
}
이제 오버라이드한 resumeWith에서 각 라벨에 해당하는 코드를 작성한다. 0번을 예시로 보면 findProfile 메소드를 호출했을 때 Continuation을 통해 Profile을 넘겨주기 때문에 넘어온 데이터를 Profile로 변환하고 라벨을 1로 올려주는 모습을 확인할 수 있다. 그리고 최종적으로 재귀 호출인 findUser를 다시 호출한다.
when (sm.label) {
0 -> { // 0단계 - 초기 시작
println("프로필 조회")
userProfileRepository.findProfile(userId, sm)
}
1 -> { // 1단계 - 1차 중단 후 재시작
println("이미지 조회")
userImageRepository.findImage(sm.profile!!, sm)
}
2 -> { // 2단계 - 2차 중단 후 재시작
return UserDto(sm.profile!!, sm.image!!)
}
}
resumeWith()에 있는 Continuation을 통해 라벨을 올려주고 데이터를 넣어주기 때문에 기존 라벨에 대한 분기 코드는 위처럼 라벨 수정과 데이터를 넣어주는 코드를 제거하고 단순 메소드 호출하는 코드로 변경되었다.
suspend fun findUser(userId: Long, continuation: Continuation?): UserDto {
val sm = continuation ?: object : Continuation {
그리고 기존 코드는 findUser를 재귀적으로 호출하는 코드에서 무한루프에 걸리기 때문에 continuation을 Nullable로 변경해주고 null이 아닌 경우에만 수행되도록 변경한다.
class UserService {
private val userProfileRepository = UserProfileRepository()
private val userImageRepository = UserImageRepository()
private abstract class FindUserContinuation() : Continuation {
var label = 0
var profile: Profile? = null
var image: Image? = null
}
suspend fun findUser(userId: Long, continuation: Continuation?): UserDto {
val sm = continuation as? FindUserContinuation ?: object : FindUserContinuation() {
override suspend fun resumeWith(data: Any?) {
좀 더 나아가서 UserService안에 FindUserContinuation 추상 클래스를 생성하고 해당 클래스에서 라벨과 데이터를 관리하게 한다면 findUser에서는 직접 Continuation을 구현할 필요도 없어진다.
when (sm.label) {
0 -> { // 0단계 - 초기 시작
println("프로필 조회")
userProfileRepository.findProfile(userId, sm)
}
1 -> { // 1단계 - 1차 중단 후 재시작
println("이미지 조회")
userImageRepository.findImage(sm.profile!!, sm)
}
}
return UserDto(sm.profile!!, sm.image!!)
그리고 라벨 분기문에서도 라벨이 2라면 분기문이 종료되는것이기에 제거하고 최종적으로 UserDto를 리턴하도록 변경해준다. 핵심은 Continuation 객체를 여기저기 전달하며 콜백으로써 활용하는 것이다.
suspend fun main() {
val service = UserService()
println(service.findUser(1L, null))
}
interface Continuation {
suspend fun resumeWith(data: Any?)
}
class UserService {
private val userProfileRepository = UserProfileRepository()
private val userImageRepository = UserImageRepository()
private abstract class FindUserContinuation() : Continuation {
var label = 0
var profile: Profile? = null
var image: Image? = null
}
suspend fun findUser(userId: Long, continuation: Continuation?): UserDto {
val sm = continuation as? FindUserContinuation ?: object : FindUserContinuation() {
override suspend fun resumeWith(data: Any?) {
when (label) {
0 -> {
profile = data as Profile
label = 1
}
1 -> {
image = data as Image
label = 2
}
}
findUser(userId, this)
}
}
when (sm.label) {
0 -> { // 0단계 - 초기 시작
println("프로필 조회")
userProfileRepository.findProfile(userId, sm)
}
1 -> { // 1단계 - 1차 중단 후 재시작
println("이미지 조회")
userImageRepository.findImage(sm.profile!!, sm)
}
}
return UserDto(sm.profile!!, sm.image!!)
}
}
class UserProfileRepository {
suspend fun findProfile(userId: Long, continuation: Continuation) {
delay(100L)
continuation.resumeWith(Profile())
}
}
class UserImageRepository {
suspend fun findImage(profile: Profile, continuation: Continuation) {
delay(100L)
continuation.resumeWith(Image())
}
}
class Profile
class Image
data class UserDto(val profile: Profile, val image: Image)
// 결과
프로필 조회
이미지 조회
UserDto(profile=Profile@171a80a0, image=Image@2ea38e22)
최종적으로 작성한 코드는 위와 같다.
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
실제 Continuation 인터페이스의 모습이다. 코루틴의 내부 정보를 확인할 수 있도록 CoroutineContext 가지고 있고 결과의 성공/실패에 따라 다음 단계로 넘어갈 수 있는 콜백 함수인 resumeWith()가 존재한다. 다시 한번, 핵심은 코루틴은 Continuation을 매개변수마다 추가해서 콜백처럼 사용하게끔 변경한다. 이다
Reference
'Coding' 카테고리의 다른 글
인프런 2시간만에 끝내는 코루틴 1강 ~ 7강 정리 (0) | 2024.06.12 |
---|---|
200억건의 데이터를 MySQL로 마이그레이션 할 때 고려했던 개념과 튜닝 방법 정리 (0) | 2024.06.09 |
DB B Tree Index 정리 (0) | 2023.08.05 |
Block IO/Non-Block IO 개념 정리 (2) | 2022.12.13 |
Redlock 알고리즘 (0) | 2022.07.02 |