subscribeOn
这是订阅设置线程的方法,主要是控制Observer
先看这个,运行以后你会发现,他们运行在不同线程上
var ob1 = Observable.just("SSS", "SS").subscribeOn(Schedulers.computation()).map {
Thread.sleep(1000)
it
}
ob1.subscribe { println("Obser1: $it at thread ${Thread.currentThread().name}") }
ob1.subscribe { println("Obser2: $it at thread ${Thread.currentThread().name}") }
Thread.sleep(5_000)
改装一下,他们将会运行在同一个线程上,这是因为我们在方法中加入publish,
var ob1 = Observable.just("SSS", "SS").subscribeOn(Schedulers.computation()).map {
Thread.sleep(1000)
it
}.publish()
ob1.subscribe { println("Obser1: $it at thread ${Thread.currentThread().name}") }
ob1.subscribe { println("Obser2: $it at thread ${Thread.currentThread().name}") }
ob1.connect()
Thread.sleep(5_000)
如果是多个subscribeOn 则以最靠近资源的那个为准,下例打印出来,就是computation的线程,而不是io线程
Observable.just("SSS", "SS").subscribeOn(Schedulers.computation()).map {
Thread.sleep(1000)
it
}.subscribeOn(Schedulers.io()).subscribe { println("${Thread.currentThread().name}") }
Thread.sleep(Long.MAX_VALUE)
observeOn
他将会影响它下游的线程,即使后面再调用subscribeOn也不会改变,除非再次调用observeOn,当然如果只有subscribeOn,以最靠近资源的为准。
Observable.just("SSS")
.subscribeOn(Schedulers.io())
.map {
Thread.sleep(1000)
println("map1: ${Thread.currentThread().name}")
it
}
.observeOn(Schedulers.computation())
.map {
Thread.sleep(1000)
println("map2: ${Thread.currentThread().name}")
it
}
.observeOn(Schedulers.io())
.map {
Thread.sleep(1000)
println("map3: ${Thread.currentThread().name}")
}
.subscribe {
Thread.sleep(1000)
println("sub: ${Thread.currentThread().name}")
}
Thread.sleep(Long.MAX_VALUE)
/**
map1: RxCachedThreadScheduler-1
map2: RxComputationThreadPool-1
map3: RxCachedThreadScheduler-2
sub: RxCachedThreadScheduler-2
*/