리액티브 프로그래밍

업데이트:

리액티브 매니패스토

  • 반응성 : 빠른 속도와 일정하고 예상할 수 있는 반응 시간을 제공한다.
  • 회복성 : 장애가 발생해도 시스템은 반응해야 한다.
  • 탄력성 : 작업 부하 발생시 자동으로 관련 컴포넌트에 할당된 자원 수를 늘린다.
  • 메시지 주도 : 시스템을 구성하는 컴포넌트의 경계를 명확하게 해야 한다. 비동기 메시지를 전달함으로써 회복성과 탄력성을 얻을 수 있다.

리액티브 스트림과 플로 API

  • 리액티브 프로그래밍은 리액티브 스트림을 사용하는 프로그래밍이다.
  • 잠재적으로 무한의 비동기 데이터를 순서대로 그리고 블록하지 않는 역압력을 전제해 처리하는 표준 기술이다.

Flow 클래스

자바 9에서 추가된 클래스로 정적 컴포넌트 하나를 포함하고 있으며 인스턴스화 할 수 없다.프로그래밍 발행-구독 모델을 지원할 수 있도로고 중첩된 인터페이스 네 개를 포함한다.

  • Publisher
    • 메시지의 발행자로 하나의 흐름에 실려 나가고 같은 순서로 나가는 것을 보장함
  • Subscriber
    • 메시지의 구독자로 메시지를정상적으로 수신하면 onNext 함수로 하나하나 볼수 있다
    • publisher가 메시지를 정상 발행하지 못한 경우 onError가 호출된다.
  • Subscription
    • publisher와 subscriber를 잇는 메시지 컨트롤을 한다.
    • request 메소드를 사용하면 갯수만큼 메시지를 가져온다.
  • Processor
    • publisher, Subscriber의 역활을 둘다 할 수 있는 인터페이스로 기본적으로 버퍼를 가지고 있어 pub-proc-sub의 구조로 두면 메시지를 가공하거나 잠시 유지할 수 있다.

RXJava

자바 Flow 클래스에 정의 된 인터페이스는 직접 구현하도록 의도하지 않아 구현체가 없다.
RkJava 라이브러리로 이 인터페이스를 구현할 수 있다.

Observable, Flowable

Publisher를 구현하므로 팩토리 메서드는 리액티브 스트림을 만든다.

//한개 이상의 요소를 이용해 방출함
//onNext("first"), onNext("first"), onComplete() 순서로 메시지를 받음
Observable<String> strings =  Observable.jsut("first", secound); 

//실시간 상호 작용
//1초 간격으로 Long 값을 반환, 이때 값은 계속 증가
Observable<String> onPerSec =  Observable.interval(1, TimeUnit.SEVOUNDS); 

Observer

Flow의 Subscriber의 인터페이스 역활을 한다.

public interface Observer<T> {
	void onSubscribe(Disposable d);
	void onNext(T t);
	void onError(Throwable t);
	void onComplete();
}

RxJava Api는 횔씬 유연하여 오버로드된 기능이 많아 Observer를 만들떄 onNext 메서드에서 쓸 람다만 전달해도 메시지를 수신할 수 있다.

Observable<String> onPerSec =  Observable.interval(1, TimeUnit.SEVOUNDS); 

onperSec.subscrible(i-> System.out.println(TempInfofetch("New York")));

observable.subscirber(observer) 형태로 람다만 전달 받아 onNext만 정의된 상태다.

Emitter

  • 구독은 못하는 Obsrevalbe로 Observer에게 직접 TempInfo를 전달한다.
  • onSubscribe가 없다.
  • observer에서 직접 받아서 쓴다.
public static Observable<TempInfo> getTemperature(String town) {
    return Observable.create(emitter -> 
    	Observable.interval(1, TimeUnit.SECONDS)
    	.subscribe(i -> {
      	if (!emitter.isDisposed()) {
      	  if (i >= 5) {
      	    emitter.onComplete();
      	  }
      	  else {
      	    try {
      	      emitter.onNext(TempInfo.fetch(town));
      	    }
      	    catch (Exception e) {
      	      emitter.onError(e);
      	    }
      	  }
      	}
    	}}));
  	}
public class TempObserver implements Observer<TempInfo> {

  @Override
  public void onComplete() {
    System.out.println("Done!");
  }

  @Override
  public void onError(Throwable throwable) {
    System.out.println("Got problem: " + throwable.getMessage());
  }

  @Override
  public void onSubscribe(Disposable disposable) {}

  @Override
  public void onNext(TempInfo tempInfo) {
    System.out.println(tempInfo);
  }

}
Observale<TempInfo> observable = getTemperature("New York");
observable.blockingSubscribe(new TempObserver());

Observable 합치기

  • 서로 다른 Observable이 하나의 Observable인 것처럼 합친다.
public static Observable<TempInfo> getCelsiusTemperatures(String... towns) {
    return Observable.merge(Arrays.stream(towns)
        .map(TempObservable::getCelsiusTemperature)
        .collect(toList()));
}

어느 정도 알겠는데 완전히 알았다고 말은 못하겠다 하나의 예시를 들어 직접 구현을 해봐야 완전히 내것이 될 것 같다.

댓글남기기