@@ -8,42 +8,65 @@ import io.reactivex.schedulers.Schedulers
88import org.reactivestreams.Subscriber
99import org.reactivestreams.Subscription
1010
11- fun main () {
11+ private fun main () {
1212 val flowable = Flowable .create(
1313 object : FlowableOnSubscribe <String > {
14+
15+ // FlowableEmitter 가 Subscriber 에게 데이터를 통지한다
16+ // create 의 구현을 따라가면
17+ // 에러가 발생하면 catch 해서 onError 로 전달하는 부분이 존재함
18+ // 단, 치명적인 에러라면 다시 Throw 를 던짐
19+ // https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495
1420 override fun subscribe (emitter : FlowableEmitter <String >) {
1521 val strArr = arrayOf(" Hello, World!" , " 안녕, RxJava!" )
1622 for (str in strArr) {
23+ // 구동 해지 상태에서 종료하지 않고 onNext 를 진행해도 데이터를 통지하지 않는다
24+ // 단, Rx 에서 해주는 것은 통지하지 않는 것이지 계속 진행은 하므로 직접 처리하는게 좋다
1725 if (emitter.isCancelled) {
1826 return
1927 }
28+ // 데이터를 통지한다
29+ // 만약 null 을 전달하면 NullPointException 이 발생한다
2030 emitter.onNext(str)
2131 }
32+ // onComplete 를 통지하면 그 이후엔 아무것도 통지하면 안된다
2233 emitter.onComplete()
2334 }
2435 },
36+ // BackpressureStrategy 에 따라 다른 Emitter 를 생성
2537 BackpressureStrategy .BUFFER ,
2638 )
2739 flowable
40+ // 데이터를 받는 측의 쓰레드를 변경할 때 사용
2841 .observeOn(Schedulers .computation())
42+ // Flowable 는 Publisher 인터페이스를 구현했기 때문에 Subscriber 와의 상호작용을 외부에서 영향을 받지 않는다
2943 .subscribe(object : Subscriber <String > {
3044
45+ // Subscriber 가 받을 데이터의 개수를 요청 및 구독 해지할 수 있는 인터페이스
46+ // onNext에서 직접 배압을 처리하기 위해서 subscription 을 멤버 변수로 저장
3147 private var subscription: Subscription ? = null
3248
3349 override fun onSubscribe (s : Subscription ? ) {
3450 subscription = s
51+ // 요청 데이터의 개수를 MAX 로 처리하면 onNext 에서 더 이상 요청하지 않아도 됨
52+ // onSubscribe 에서 request를 호출하지 않으면 데이터르 받을 수 없다
53+ // request는 onSubscribe 의 가장 마지막에서 호출 해야함
3554 subscription?.request(1L )
3655 }
3756
57+ // Flowable 에서 데이터를 받으면 호출 되는 메서드
3858 override fun onNext (data : String? ) {
3959 println (" ${Thread .currentThread().name} : $data " )
4060 subscription?.request(1L )
4161 }
4262
63+ // 에러가 발생했거나 에러를 통지할 때 실행되는 메서드
64+ // onError 이후에는 onNext 나 onComplete 가 실행되지 않는다
4365 override fun onError (error : Throwable ? ) {
4466 error?.printStackTrace()
4567 }
4668
69+ // 모든 데이터의 통지를 끝내고 처리가 완료됐을 때 실행되는 메서드
4770 override fun onComplete () {
4871 println (" ${Thread .currentThread().name} : 완료" )
4972 }
0 commit comments