리액티브 프로그래밍
업데이트:
리액티브 매니패스토
- 반응성 : 빠른 속도와 일정하고 예상할 수 있는 반응 시간을 제공한다.
- 회복성 : 장애가 발생해도 시스템은 반응해야 한다.
- 탄력성 : 작업 부하 발생시 자동으로 관련 컴포넌트에 할당된 자원 수를 늘린다.
- 메시지 주도 : 시스템을 구성하는 컴포넌트의 경계를 명확하게 해야 한다. 비동기 메시지를 전달함으로써 회복성과 탄력성을 얻을 수 있다.
리액티브 스트림과 플로 API
- 리액티브 프로그래밍은 리액티브 스트림을 사용하는 프로그래밍이다.
- 잠재적으로 무한의 비동기 데이터를 순서대로 그리고 블록하지 않는 역압력을 전제해 처리하는 표준 기술이다.
Flow 클래스
자바 9에서 추가된 클래스로 정적 컴포넌트 하나를 포함하고 있으며 인스턴스화 할 수 없다.프로그래밍 발행-구독 모델을 지원할 수 있도로고 중첩된 인터페이스 네 개를 포함한다.
- Publisher
- 메시지의 발행자로 하나의 흐름에 실려 나가고 같은 순서로 나가는 것을 보장함
- Subscriber
- 메시지의 구독자로 메시지를정상적으로 수신하면 onNext 함수로 하나하나 볼수 있다
- publisher가 메시지를 정상 발행하지 못한 경우 onError가 호출된다.
- Subscription
- publisher와 subscriber를 잇는 메시지 컨트롤을 한다.
- request 메소드를 사용하면 갯수만큼 메시지를 가져온다.
- Processor
- publisher, Subscriber의 역활을 둘다 할 수 있는 인터페이스로 기본적으로 버퍼를 가지고 있어 pub-proc-sub의 구조로 두면 메시지를 가공하거나 잠시 유지할 수 있다.
RXJava
자바 Flow 클래스에 정의 된 인터페이스는 직접 구현하도록 의도하지 않아 구현체가 없다.
RkJava 라이브러리로 이 인터페이스를 구현할 수 있다.
Observable, Flowable
Publisher를 구현하므로 팩토리 메서드는 리액티브 스트림을 만든다.
//한개 이상의 요소를 이용해 방출함
//onNext("first"), onNext("first"), onComplete() 순서로 메시지를 받음
Observable<String> strings = Observable.jsut("first", secound);
//실시간 상호 작용
//1초 간격으로 Long 값을 반환, 이때 값은 계속 증가
Observable<String> onPerSec = Observable.interval(1, TimeUnit.SEVOUNDS);
Observer
Flow의 Subscriber의 인터페이스 역활을 한다.
public interface Observer<T> {
void onSubscribe(Disposable d);
void onNext(T t);
void onError(Throwable t);
void onComplete();
}
RxJava Api는 횔씬 유연하여 오버로드된 기능이 많아 Observer를 만들떄 onNext 메서드에서 쓸 람다만 전달해도 메시지를 수신할 수 있다.
Observable<String> onPerSec = Observable.interval(1, TimeUnit.SEVOUNDS);
onperSec.subscrible(i-> System.out.println(TempInfofetch("New York")));
observable.subscirber(observer) 형태로 람다만 전달 받아 onNext만 정의된 상태다.
Emitter
- 구독은 못하는 Obsrevalbe로 Observer에게 직접 TempInfo를 전달한다.
- onSubscribe가 없다.
- observer에서 직접 받아서 쓴다.
public static Observable<TempInfo> getTemperature(String town) {
return Observable.create(emitter ->
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(i -> {
if (!emitter.isDisposed()) {
if (i >= 5) {
emitter.onComplete();
}
else {
try {
emitter.onNext(TempInfo.fetch(town));
}
catch (Exception e) {
emitter.onError(e);
}
}
}
}}));
}
public class TempObserver implements Observer<TempInfo> {
@Override
public void onComplete() {
System.out.println("Done!");
}
@Override
public void onError(Throwable throwable) {
System.out.println("Got problem: " + throwable.getMessage());
}
@Override
public void onSubscribe(Disposable disposable) {}
@Override
public void onNext(TempInfo tempInfo) {
System.out.println(tempInfo);
}
}
Observale<TempInfo> observable = getTemperature("New York");
observable.blockingSubscribe(new TempObserver());
Observable 합치기
- 서로 다른 Observable이 하나의 Observable인 것처럼 합친다.
public static Observable<TempInfo> getCelsiusTemperatures(String... towns) {
return Observable.merge(Arrays.stream(towns)
.map(TempObservable::getCelsiusTemperature)
.collect(toList()));
}
어느 정도 알겠는데 완전히 알았다고 말은 못하겠다 하나의 예시를 들어 직접 구현을 해봐야 완전히 내것이 될 것 같다.
댓글남기기