Flowable

当我们使用Observable时,如果信息源和订阅都在同一个线程,这是同步问题,发送一个信息要等订阅处理完成后才进行下一个,但是我们进行异步处理时,情况就有点变化了,订阅的处理能力根不上发送信息的能力,就会出现一些问题 ,下例可以看出, 发送源已经发送完毕,但是接受的速度太慢,才从9开始进行

 Observable.range(1, 100_000)
                .map {
                    MyItem(it)
                }
                .observeOn(Schedulers.io())
                .subscribe({ myItem ->
                    sleep(100)
                    System.out.println("Received MyItem " + myItem.id)
                })

        Thread.sleep(Long.MAX_VALUE)
/** 部分打印结果
...
Constructing MyItem 99998
Constructing MyItem 99999
Constructing MyItem 100000
Received MyItem 9
Received MyItem 10
Received MyItem 11
...
*/
internal class MyItem(val id: Int) {

    init {
        println("Constructing MyItem $id")
    }
}

换成Flowable试试

Flowable

能保证一定顺序,如果接受来不及,他会等待一会再发送一波,flowable 发送128个后会等接受的都处理完后再重新发送

Flowable.range(1, 100_000)
                .map {
                    MyItem(it)
                }
                .observeOn(Schedulers.io())
                .subscribe({ myItem ->
                    sleep(100)
                    System.out.println("Received MyItem " + myItem.id)
                })
Thread.sleep(Long.MAX_VALUE)
/**
...
Constructing MyItem 126
Constructing MyItem 127
Constructing MyItem 128
Received MyItem 1
Received MyItem 2
Received MyItem 3
Received MyItem 4
Received MyItem 5
...
*/

使用情况

  • Observable
  1. 数据少,一般1000内
  2. 同步,异步情况比较少
  • Flowable
  1. 数据多,1000上
  2. 异步

Subscriber

这和observer 有一定的区别,他返回的是Subscription,主要是利用这个进行操作,subscription.request 必须调用,不然就不会走onNext方法

 Flowable.range(1, 10_000)
                .observeOn(Schedulers.io())
                .subscribe(object : Subscriber<Int> {
                    var subb: Subscription? = null
                    override fun onComplete() {
                        println("Done!")
                    }

                    override fun onSubscribe(s: Subscription?) {
                        s?.request(10)
                        println("onSubscribe is ${s.toString()}")

                    }

                    override fun onNext(t: Int?) {
                        println("OnNext is $t")
                    }

                    override fun onError(t: Throwable?) {
                    }
                })

Flowable.create

就像Observable.create 一样也可以创建Flowable,来看代码

 Flowable.create<String>(object : FlowableOnSubscribe<String> {
            override fun subscribe(emitter: FlowableEmitter<String>) {
                for (i in 1..100) {
                    emitter.onNext(" num of $i")
                }
                emitter.onComplete()
            }

        }, BackpressureStrategy.BUFFER).subscribe { println(it) }

除此之外 Observable 与 flowable 之间可以相互转换,看看

Flowable.just("sdsd").toObservable().subscribe { println(it) }

对于Flowable ,调用interval 时需要慎重 ,如果像下面这样调用就会报错,如果把注释去掉就ok 了

        Flowable.interval(1, TimeUnit.MILLISECONDS)
//                .onBackpressureBuffer()
                .observeOn(Schedulers.io())
                .subscribe { i ->
                    sleep(5)
                    println(i)
                }

results matching ""

    No results matching ""