Skip to content

Commit aa27d7b

Browse files
committed
[1장] Flowable 예제 설명 및 추가 정리
1 parent faade98 commit aa27d7b

File tree

2 files changed

+52
-1
lines changed

2 files changed

+52
-1
lines changed

src/main/kotlin/kms/chapter01/L11_FlowableSample.kt

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,42 +8,65 @@ import io.reactivex.schedulers.Schedulers
88
import org.reactivestreams.Subscriber
99
import org.reactivestreams.Subscription
1010

11-
fun main() {
11+
private fun main() {
1212
val flowable = Flowable.create(
1313
object : FlowableOnSubscribe<String> {
14+
15+
// FlowableEmitter 가 Subscriber 에게 데이터를 통지한다
16+
// create 의 구현을 따라가면
17+
// 에러가 발생하면 catch 해서 onError 로 전달하는 부분이 존재함
18+
// 단, 치명적인 에러라면 다시 Throw 를 던짐
19+
// https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495
1420
override fun subscribe(emitter: FlowableEmitter<String>) {
1521
val strArr = arrayOf("Hello, World!", "안녕, RxJava!")
1622
for (str in strArr) {
23+
// 구동 해지 상태에서 종료하지 않고 onNext 를 진행해도 데이터를 통지하지 않는다
24+
// 단, Rx 에서 해주는 것은 통지하지 않는 것이지 계속 진행은 하므로 직접 처리하는게 좋다
1725
if (emitter.isCancelled) {
1826
return
1927
}
28+
// 데이터를 통지한다
29+
// 만약 null 을 전달하면 NullPointException 이 발생한다
2030
emitter.onNext(str)
2131
}
32+
// onComplete 를 통지하면 그 이후엔 아무것도 통지하면 안된다
2233
emitter.onComplete()
2334
}
2435
},
36+
// BackpressureStrategy 에 따라 다른 Emitter 를 생성
2537
BackpressureStrategy.BUFFER,
2638
)
2739
flowable
40+
// 데이터를 받는 측의 쓰레드를 변경할 때 사용
2841
.observeOn(Schedulers.computation())
42+
// Flowable 는 Publisher 인터페이스를 구현했기 때문에 Subscriber 와의 상호작용을 외부에서 영향을 받지 않는다
2943
.subscribe(object : Subscriber<String> {
3044

45+
// Subscriber 가 받을 데이터의 개수를 요청 및 구독 해지할 수 있는 인터페이스
46+
// onNext에서 직접 배압을 처리하기 위해서 subscription 을 멤버 변수로 저장
3147
private var subscription: Subscription? = null
3248

3349
override fun onSubscribe(s: Subscription?) {
3450
subscription = s
51+
// 요청 데이터의 개수를 MAX 로 처리하면 onNext 에서 더 이상 요청하지 않아도 됨
52+
// onSubscribe 에서 request를 호출하지 않으면 데이터르 받을 수 없다
53+
// request는 onSubscribe 의 가장 마지막에서 호출 해야함
3554
subscription?.request(1L)
3655
}
3756

57+
// Flowable 에서 데이터를 받으면 호출 되는 메서드
3858
override fun onNext(data: String?) {
3959
println("${Thread.currentThread().name}: $data")
4060
subscription?.request(1L)
4161
}
4262

63+
// 에러가 발생했거나 에러를 통지할 때 실행되는 메서드
64+
// onError 이후에는 onNext 나 onComplete 가 실행되지 않는다
4365
override fun onError(error: Throwable?) {
4466
error?.printStackTrace()
4567
}
4668

69+
// 모든 데이터의 통지를 끝내고 처리가 완료됐을 때 실행되는 메서드
4770
override fun onComplete() {
4871
println("${Thread.currentThread().name}: 완료")
4972
}

src/main/kotlin/kms/chapter01/README.md

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,31 @@
104104
>- 이미 구독하고 있던 소비자는 버퍼에 있는데이터를 통지 받음
105105
>- 새로 구독한 소비자는 최신 데이터를 통지 받음
106106
107+
### Flowable 시퀀스
108+
109+
1. Subscriber 가 Flowable을 구독한다(**Subscribe**)
110+
2. Flowable이 **Subscription**을 생성한다
111+
3. Flowable이 Subscriber에 구독 시작(**onSubscribe**)을 통지하여 Subscription을 전달
112+
4. Subscriber는 Subscription에 데이터를 통지하게 요청
113+
5. Flowable은 데이터를 Subscriber에게 통지
114+
6. Subscriber는 받은 데이터를 처리한다
115+
7. 처리한 후 Subscriber는 Subscription에게 데이터 통지를 요청
116+
8. Flowable은 데이터가 있다면 그 데이터를 Subscriber에게 통지
117+
9. Subscriber는 받은 데이터를 처리한다
118+
10. 처리한 후 Subscriber는 Subscription에게 데이터 통지를 요청
119+
11. Flowable 은 데이터가 없다면 완료(onComplete)를 통지
120+
12. Subscriber는 완료를 처리한다
121+
122+
### BackpressureStrategy 종류
123+
124+
- **BUFFER**
125+
- 통지할 수 있을 때까지 모든 데이터를 버퍼링 한다
126+
- **DROP**
127+
- 통지할 수 있을 때까지 새로 생성한 데이터를 삭제한다
128+
- **LATEST**
129+
- 생성한 최신 데이터만 버퍼링하고 생성할 때마다 버퍼링하는 데이터를 교환한다
130+
- **ERROR**
131+
- 버퍼 크기를 초과하면 MissingBackPressureException 에러를 통지
132+
- **NONE**
133+
- 특정 처리를 수행하지 않는다.
134+
- onBackPressure로 시작하는 메서드로 배압 모드를 설정할 때 사용

0 commit comments

Comments
 (0)