Skip to content

Commit 40da2ca

Browse files
committed
[1장] BackPressure 부터 1장 끝까지 예제 및 정리
1 parent f680512 commit 40da2ca

File tree

5 files changed

+166
-1
lines changed

5 files changed

+166
-1
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package kms.chapter01
2+
3+
import io.reactivex.Flowable
4+
import io.reactivex.disposables.CompositeDisposable
5+
import io.reactivex.schedulers.Schedulers
6+
7+
private fun main() {
8+
val disposable = CompositeDisposable()
9+
disposable.add(
10+
Flowable.range(1, 3)
11+
.doOnCancel { println("No.1 canceled") }
12+
.observeOn(Schedulers.computation())
13+
.subscribe {
14+
Thread.sleep(100L)
15+
println("No.1: $it")
16+
},
17+
)
18+
disposable.add(
19+
Flowable.range(1, 3)
20+
.doOnCancel { println("No.2 canceled") }
21+
.observeOn(Schedulers.computation())
22+
.subscribe {
23+
Thread.sleep(100L)
24+
println("No.2: $it")
25+
},
26+
)
27+
Thread.sleep(150L)
28+
disposable.dispose()
29+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package kms.chapter01
2+
3+
import io.reactivex.Single
4+
import io.reactivex.SingleObserver
5+
import io.reactivex.disposables.Disposable
6+
import java.time.DayOfWeek
7+
import java.time.LocalDate
8+
9+
private fun main() {
10+
Single.create { emitter ->
11+
emitter.onSuccess(LocalDate.now().dayOfWeek)
12+
}.subscribe(object : SingleObserver<DayOfWeek> {
13+
override fun onSubscribe(d: Disposable) {
14+
}
15+
16+
override fun onSuccess(dayOfWeek: DayOfWeek) {
17+
println(dayOfWeek)
18+
}
19+
20+
override fun onError(e: Throwable) {
21+
e.printStackTrace()
22+
}
23+
})
24+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package kms.chapter01
2+
3+
import io.reactivex.Maybe
4+
import io.reactivex.MaybeObserver
5+
import io.reactivex.disposables.Disposable
6+
import java.time.DayOfWeek
7+
8+
private fun main() {
9+
Maybe.create<DayOfWeek> { emitter ->
10+
// 데이터를 통지할 때는 onSuccess 만 통지
11+
// emitter.onSuccess(LocalDate.now().dayOfWeek)
12+
// 데이터를 통지 하지 않을 때는 onComplete 만 통지
13+
emitter.onComplete()
14+
}.subscribe(object : MaybeObserver<DayOfWeek> {
15+
override fun onSubscribe(d: Disposable) {
16+
}
17+
18+
override fun onSuccess(dayOfWeek: DayOfWeek) {
19+
println(dayOfWeek)
20+
}
21+
22+
override fun onError(e: Throwable) {
23+
e.printStackTrace()
24+
}
25+
26+
override fun onComplete() {
27+
println("완료")
28+
}
29+
})
30+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package kms.chapter01
2+
3+
import io.reactivex.Completable
4+
import io.reactivex.CompletableObserver
5+
import io.reactivex.disposables.Disposable
6+
import io.reactivex.schedulers.Schedulers
7+
8+
private fun main() {
9+
Completable.create { emitter ->
10+
emitter.onComplete()
11+
}.subscribeOn(Schedulers.computation())
12+
.subscribe(object : CompletableObserver {
13+
override fun onSubscribe(d: Disposable) {
14+
}
15+
16+
override fun onComplete() {
17+
println("완료")
18+
}
19+
20+
override fun onError(e: Throwable) {
21+
e.printStackTrace()
22+
}
23+
})
24+
25+
Thread.sleep(100L)
26+
}

src/main/kotlin/kms/chapter01/README.md

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,4 +162,60 @@
162162
#### 데이터의 일부만 사용하는 경우
163163

164164
- BackpressureStrategy.DROP 을 설정하면 처리할 수 없는 데이터를 삭제 가능
165-
- Observable 의 throttle 계열의 메서드로 특정 시점의 데이터만을 사용
165+
- Observable 의 throttle 계열의 메서드로 특정 시점의 데이터만을 사용
166+
167+
### RxJava 기본 구성
168+
169+
- **소비자**(Subscriber/Observer)가 **생산자**(Flowable/Observable)를 구독하는 형태
170+
- Flowable/Subscriber/Subscription
171+
- Reactive Stream 사양 지원
172+
- Observable/Observer/Disposable
173+
174+
>[!note] Reactive Stream 사양
175+
>BackPressure 을 이용하여 비동기 요소들 사이의 상호작용을 정의하는 작은 스펙
176+
177+
- FlowableProcessor 는 Processor 를 구현하고 Flowable 과 Subscriber 가 될 수 있는 추상 클래스
178+
- Subject 는 BackPressure 가 없는 FlowableProcessor 로 볼 수 있다
179+
- Observable 이나 Observer 가 될 수 있는 추상 클래스
180+
- Subscriber 를 구현한 DisposableSubscriber 와 ResourceSubscriber 를 제공
181+
- Observer 를 구현한 DisposableObserver와 ResourceObserver도 제공
182+
183+
### 데이터 통지시 규칙
184+
185+
- **null** 을 통지하면 안 된다
186+
- 데이터 통지는 **해도 되고 안 해도 된다**
187+
- Flowable/ Observable 의 처리를 끝낼 때는 **완료****에러** 통지를 해야 하며, 둘 다 통지하지는 않는다
188+
- 완료나 에러 통지를 **한 뒤에는 다른 통지를 하면 안된다**
189+
- 통지할 때는 **1건씩 순차적**으로 통지하며, 동시에 통지하면 안 된다
190+
191+
### Processor / Subject 종류
192+
193+
- **Publish**
194+
- 데이터를 받은 시점에만 소비자에 데이터를 통지
195+
- SharedFlow 처럼 동작
196+
- **Behavior**
197+
- 소비자가 구독하기 직전 데이터를 버퍼링해 해당 데이터 부터 통지
198+
- StateFlow 처럼 동작
199+
- **Replay**
200+
- 처리하는 도중 구동한 소비자에게도 받은 모든 데이터를 통지
201+
- **Async**
202+
- 데이터 생성을 완료했을 때 마지막으로 받은 데이터만 소비자에게 통지
203+
- **Unicast**
204+
- 1개의 소비자만 구독할 수 있다
205+
206+
### CompositeDisposable
207+
208+
- 여러 Disposable 을 모아 한번에 구동 해지하는 클래스
209+
210+
### Single / Maybe / Completable
211+
212+
- **Single**
213+
- 1건만 통지하거나 에러를 통지하는 클래스
214+
- 통지 자체가 완료를 의미
215+
- SingleObserver
216+
- **Maybe**
217+
- 0 or 1건만 통지하고 완료 or 에러를 통지하는 클래스
218+
- MaybeObserver
219+
- **Completable**
220+
- 데이터를 통지하지 않고 완료 or 에러를 통지하는 클래스
221+
- CompletableObserver

0 commit comments

Comments
 (0)