반응형
구성 요소
- Publisher: 데이터를 생성하고 통지(발행, 게시, 방출)하는 역할을 한다.
- Subscriber: 구독한 Publisher로부터 통지된 데이터를 전달받아서 처리하는 역할을 한다.
- Subscription: Publisher에 요청할 데이터의 개수를 지정하고 데이터의 구독을 취소하는 역할을 한다.
- Processor: Publisher와 Subscriber의 기능을 모두 가지고 있다. 즉, Subscriber로서 다른 Publisher를 구독할 수 있고 Publisher로서 다른 Subscriber가 구독할 수 있다.
동작 과정
- Subscriber는 Publisher를 구독한다. (subscribe)
- Publisher는 Subscriber에게 데이터를 통지할 준비가 되었음을 알린다. (onSubscribe)
- Subscriber는 Publisher에게 전달받기를 원하는 데이터의 개수를 요청한다. (Subscription.request)
- Publisher는 3번에서 요청받은 만큼의 데이터를 생성하고 통지한다. (onNext)
- 이렇게 Publisher와 Subscriber간에 데이터 통지, 데이터 수신, 데이터 요청의 과정을 반복하다가 Publisher가 모든 데이터를 통지하게 되면 마지막으로 데이터 전송이 완료되었음을 Subscriber에게 알린다. (onComplete)
- 만약 Publisher가 데이터를 처리하는 과정에서 에러가 발생하면 에러가 발생했음을 Subscriber에게 알린다. (onError)
위 과정 3번을 보면 Subscriber는 Publisher에게 데이터의 요청 개수를 지정하는데 이는 실제로 Publisher와 Subscriber는 각각 다른 스레드에서 비동기적으로 상호작용하는 경우가 대부분이기 때문이다. 이러한 경우 Publisher가 통지하는 속도가 Subscriber의 처리 속도보다 더 빠르다면 처리를 대기하는 데이터는 쌓이게 되고 결과적으로 시스템 부하가 커지게 된다. 따라서 이러한 문제를 방지하기 위해 Subscription.request를 통해 데이터 개수를 제어하는 것이다.
Publisher
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Publisher는 subscribe 메소드 하나만 구현하면 되는 수준으로 매우 단순하다. 해당 메소드는 파라미터로 전달받은 Subscriber를 등록하는 역할을 한다.
Subscriber
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
- onSubscribe: 구독 시작 시점에 Publisher에게 요청할 데이터의 개수를 정하거나 구독을 해지하는 역할을 한다.
- onNext: Publisher가 통지한 데이터를 처리하는 역할을 한다.
- onError: Publisher가 데이터를 통지하는 과정에서 에러가 발생했을 때 에러를 처리하는 역할을 한다.
- onComplete: Publisher가 데이터 통지를 완료했음을 알릴 때 호출되는 메소드이다. 데이터 통지 완료 후 특정한 후처리를 하고싶다면 onComplete 메소드에 코드를 작성하면 된다.
Subscription
public interface Subscription {
public void request(long n);
public void cancel();
}
Subscription 인터페이스는 Subscriber가 구독한 데이터의 개수를 요청하거나 또는 데이터 요청의 취소, 즉 구독을 해지하는 역할을 한다.
위에서 설명한 동작 과정을 코드 관점에서 다시 설명하자면 다음과 같다.
- Publisher가 Subscriber 인터페이스 구현 객체를 subscribe 메소드의 파라미터로 전달한다.
- Publisher 내부에서는 전달받은 Subscriber 인터페이스 구현 객체의 onSubscribe 메소드를 호출하여 Subscriber의 구독을 의미하는 Subscription 인터페이스 구현 객체를 Subscriber에게 전달한다.
- 호출된 Subscriber 인터페이스 구현 객체의 onSubscriber 메소드에서 전달받은 Subscription 객체를 통해 전달받을 데이터의 개수를 Publisher에게 요청한다.
- Publisher는 Subscriber로부터 전달받은 요청 개수 만큼의 데이터를 onNext 메소드를 호출하여 Subscriber에게 전달한다.
- Publisher는 통지할 데이터가 없을 경우 onComplete 메소드를 호출하여 Subscriber에게 데이터 처리 종료를 알린다.
Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {}
Processor의 경우 별도로 구현해야 하는 메소드가 없다. 다른 인터페이스와 다른 점은 Subscriber 인터페이스와 Publisher 인터페이스를 상속한다는 것이다. 이는 Processor가 Publisher와 Subscriber의 기능을 모두 가지고 있기 때문이다.
리액티브 스트림즈의 구현 규칙
- Publisher가 Subscriber에게 보내는 onNext signal의 총 개수는 항상 해당 Subscriber의 구독을 통해 요청된 데이터의 총 개수보다 더 작거나 같아야 한다.
- Publisher는 요청된 것보다 적은 수의 onNext signal을 보내고 onComplete 또는 onError를 호출하여 구독을 종료할 수 있다.
- Publisher가 데이터 처리가 실패하면 onError signal을 보내야 한다.
- Publisher의 데이터 처리가 성공적으로 종료되면 onComplete signal을 보내야 한다.
- Publisher가 Subscriber에게 onError 또는 onComplete signal을 보내는 경우 해당 Subscriber의 구독은 취소된 것으로 간주되어야 한다.
- 일단 종료 상태 signal(onError, onComplete)을 받으면 더이상 signal이 발생되지 않아야 한다.
- 구독이 취소되면 Subscriber는 결국 signal 받는 것을 중지해야 한다.
Reference
스프링으로 시작하는 리액티브 프로그래밍
'Coding > Java Spring' 카테고리의 다른 글
스프링 MVC 1 정리 (0) | 2023.09.12 |
---|---|
스프링 통합 테스트를 위한 AbstractTestExecutionListener (0) | 2023.08.30 |
스프링 테스트 코드를 위한 어노테이션 (0) | 2023.08.22 |
스프링 프로젝트 시작을 위한 컨벤션 설정 (0) | 2023.08.22 |
DDD 이벤트 스토밍 - 크림 클론 프로젝트 (0) | 2023.08.18 |