Skip to content

Commit faade98

Browse files
committed
[1장] 예제 11 구현
1 parent b8573bf commit faade98

File tree

1 file changed

+53
-0
lines changed

1 file changed

+53
-0
lines changed
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package kms.chapter01
2+
3+
import io.reactivex.BackpressureStrategy
4+
import io.reactivex.Flowable
5+
import io.reactivex.FlowableEmitter
6+
import io.reactivex.FlowableOnSubscribe
7+
import io.reactivex.schedulers.Schedulers
8+
import org.reactivestreams.Subscriber
9+
import org.reactivestreams.Subscription
10+
11+
fun main() {
12+
val flowable = Flowable.create(
13+
object : FlowableOnSubscribe<String> {
14+
override fun subscribe(emitter: FlowableEmitter<String>) {
15+
val strArr = arrayOf("Hello, World!", "안녕, RxJava!")
16+
for (str in strArr) {
17+
if (emitter.isCancelled) {
18+
return
19+
}
20+
emitter.onNext(str)
21+
}
22+
emitter.onComplete()
23+
}
24+
},
25+
BackpressureStrategy.BUFFER,
26+
)
27+
flowable
28+
.observeOn(Schedulers.computation())
29+
.subscribe(object : Subscriber<String> {
30+
31+
private var subscription: Subscription? = null
32+
33+
override fun onSubscribe(s: Subscription?) {
34+
subscription = s
35+
subscription?.request(1L)
36+
}
37+
38+
override fun onNext(data: String?) {
39+
println("${Thread.currentThread().name}: $data")
40+
subscription?.request(1L)
41+
}
42+
43+
override fun onError(error: Throwable?) {
44+
error?.printStackTrace()
45+
}
46+
47+
override fun onComplete() {
48+
println("${Thread.currentThread().name}: 완료")
49+
}
50+
})
51+
52+
Thread.sleep(500L)
53+
}

0 commit comments

Comments
 (0)