subscribeOn

这是订阅设置线程的方法,主要是控制Observer

先看这个,运行以后你会发现,他们运行在不同线程上

   var ob1 = Observable.just("SSS", "SS").subscribeOn(Schedulers.computation()).map {
            Thread.sleep(1000)
            it
        }
        ob1.subscribe { println("Obser1: $it at thread ${Thread.currentThread().name}") }
        ob1.subscribe { println("Obser2: $it at thread ${Thread.currentThread().name}") }
        Thread.sleep(5_000)

改装一下,他们将会运行在同一个线程上,这是因为我们在方法中加入publish,

  var ob1 = Observable.just("SSS", "SS").subscribeOn(Schedulers.computation()).map {
            Thread.sleep(1000)
            it
        }.publish()
        ob1.subscribe { println("Obser1: $it at thread ${Thread.currentThread().name}") }
        ob1.subscribe { println("Obser2: $it at thread ${Thread.currentThread().name}") }
        ob1.connect()
        Thread.sleep(5_000)

如果是多个subscribeOn 则以最靠近资源的那个为准,下例打印出来,就是computation的线程,而不是io线程

Observable.just("SSS", "SS").subscribeOn(Schedulers.computation()).map {
            Thread.sleep(1000)
            it
       }.subscribeOn(Schedulers.io()).subscribe { println("${Thread.currentThread().name}") }
        Thread.sleep(Long.MAX_VALUE)

observeOn

他将会影响它下游的线程,即使后面再调用subscribeOn也不会改变,除非再次调用observeOn,当然如果只有subscribeOn,以最靠近资源的为准。

        Observable.just("SSS")
                .subscribeOn(Schedulers.io())
                .map {
                    Thread.sleep(1000)
                    println("map1: ${Thread.currentThread().name}")
                    it
                }
                .observeOn(Schedulers.computation())
                .map {
                    Thread.sleep(1000)
                    println("map2: ${Thread.currentThread().name}")
                    it
                }
                .observeOn(Schedulers.io())
                .map {
                    Thread.sleep(1000)
                    println("map3: ${Thread.currentThread().name}")
                }
                .subscribe {
                    Thread.sleep(1000)
                    println("sub: ${Thread.currentThread().name}")
                }
        Thread.sleep(Long.MAX_VALUE)
    /**
map1: RxCachedThreadScheduler-1
map2: RxComputationThreadPool-1
map3: RxCachedThreadScheduler-2
sub: RxCachedThreadScheduler-2
    */

results matching ""

    No results matching ""