Flowable
当我们使用Observable时,如果信息源和订阅都在同一个线程,这是同步问题,发送一个信息要等订阅处理完成后才进行下一个,但是我们进行异步处理时,情况就有点变化了,订阅的处理能力根不上发送信息的能力,就会出现一些问题 ,下例可以看出, 发送源已经发送完毕,但是接受的速度太慢,才从9开始进行
Observable.range(1, 100_000)
.map {
MyItem(it)
}
.observeOn(Schedulers.io())
.subscribe({ myItem ->
sleep(100)
System.out.println("Received MyItem " + myItem.id)
})
Thread.sleep(Long.MAX_VALUE)
/** 部分打印结果
...
Constructing MyItem 99998
Constructing MyItem 99999
Constructing MyItem 100000
Received MyItem 9
Received MyItem 10
Received MyItem 11
...
*/
internal class MyItem(val id: Int) {
init {
println("Constructing MyItem $id")
}
}
换成Flowable试试
Flowable
能保证一定顺序,如果接受来不及,他会等待一会再发送一波,flowable 发送128个后会等接受的都处理完后再重新发送
Flowable.range(1, 100_000)
.map {
MyItem(it)
}
.observeOn(Schedulers.io())
.subscribe({ myItem ->
sleep(100)
System.out.println("Received MyItem " + myItem.id)
})
Thread.sleep(Long.MAX_VALUE)
/**
...
Constructing MyItem 126
Constructing MyItem 127
Constructing MyItem 128
Received MyItem 1
Received MyItem 2
Received MyItem 3
Received MyItem 4
Received MyItem 5
...
*/
使用情况
- Observable
- 数据少,一般1000内
- 同步,异步情况比较少
- Flowable
- 数据多,1000上
- 异步
Subscriber
这和observer 有一定的区别,他返回的是Subscription,主要是利用这个进行操作,subscription.request 必须调用,不然就不会走onNext方法
Flowable.range(1, 10_000)
.observeOn(Schedulers.io())
.subscribe(object : Subscriber<Int> {
var subb: Subscription? = null
override fun onComplete() {
println("Done!")
}
override fun onSubscribe(s: Subscription?) {
s?.request(10)
println("onSubscribe is ${s.toString()}")
}
override fun onNext(t: Int?) {
println("OnNext is $t")
}
override fun onError(t: Throwable?) {
}
})
Flowable.create
就像Observable.create 一样也可以创建Flowable,来看代码
Flowable.create<String>(object : FlowableOnSubscribe<String> {
override fun subscribe(emitter: FlowableEmitter<String>) {
for (i in 1..100) {
emitter.onNext(" num of $i")
}
emitter.onComplete()
}
}, BackpressureStrategy.BUFFER).subscribe { println(it) }
除此之外 Observable 与 flowable 之间可以相互转换,看看
Flowable.just("sdsd").toObservable().subscribe { println(it) }
对于Flowable ,调用interval 时需要慎重 ,如果像下面这样调用就会报错,如果把注释去掉就ok 了
Flowable.interval(1, TimeUnit.MILLISECONDS)
// .onBackpressureBuffer()
.observeOn(Schedulers.io())
.subscribe { i ->
sleep(5)
println(i)
}