Disposing
从字面意思,是处理,安排的意思,当我们订阅一个观察源,来接收消息,创建一个处理来自观察源链的消息的流,当然这是耗费资源的,当我们处理完成后,我们期望他们能进行垃圾回收,值得感激的是,对于有限的观察源,当调用onComplete的时候Rx 会自动处理掉这些垃圾,当对于无限的观察源,就没有这么幸运了,你需要自己处理,基于这样的事实,你就不能再依靠垃圾回收机制了,相应的处理也是必须进行的,来防止内存泄漏。
Disposable 将观察者和观察源之间建立连接,你可以调用dispose方法来停止发送,处理所有用在观察者上的资源。你也可以调用isDisposed 方法进行检测是否已经处理过。
其他别说,看例子
val seconds = Observable.interval(1, TimeUnit.SECONDS)
val disposable = seconds.subscribe { l -> println("Give me a double : $l") }
//sleep 5 seconds
Thread.sleep(5000)
//dispose and stop emissions
disposable.dispose()
Thread.sleep(5000)
当我们调用dispose 的时候,观察源会停止,嗨不嗨
观察者中Disposable
顾名思义,就是在观察者中,在订阅的时候,获取disposable 对象,在其他方法中合适的时候执行相应的方法,这是不是很帅!
来看例子,同样的例子,感受一下它的又一种用法
val seconds = Observable.interval(1, TimeUnit.SECONDS)
seconds.subscribe(object : Observer<Long> {
private var disposable: Disposable? = null
override fun onComplete() {
println("Game over, you knooooooooooooooooooooooooooooooooooooooooooooooow?")
}
override fun onSubscribe(d: Disposable) {
this.disposable = d
}
override fun onNext(t: Long) {
println("onNext t = $t")
if (t == 7L) {
disposable?.dispose()
println("There is a disposing.")
}
}
override fun onError(e: Throwable) {
}
})
//sleep 5 seconds
Thread.sleep(10 * 1000)
CompositeDisposable
当我们需要同时停止多个订阅时,我们可以分别单独停止,但是我们有更好的选择,将每个Disposable 添加到CompositeDisposable 中,由它来统一管理,进行停止工作,是不是感觉这家伙在有的时候还是很好用的呢,想要停止所有的,只需要调用xx.disposable. 一切都很nice,不是吗!
val seconds = Observable.interval(1, TimeUnit.SECONDS)
var composite: CompositeDisposable = CompositeDisposable()
composite.add(seconds.subscribe { println("This num is : $it") })
Thread.sleep(2000)
composite.add(seconds.subscribe { println("WO DE GE QU: $it") })
Thread.sleep(4000)
composite.dispose()
Thread.sleep(2000)
Observable.create 中怎么用Disposable
如果在create 中我们做了很多耗时操作,为了确保我们调用过dispose 方法后,内部执行的方法能够及时停止,我们需要在create 内部进行isDisposabled 的判断,这就是用到Disposable 的另一方式。
var dis = Observable.create<String> {
try {
for (i in 1..10) {
Thread.sleep(1000)
if (!it.isDisposed) {
it.onNext("This num is $i")
} else {
return@create
}
}
it.onComplete()
} catch (e: Exception) {
it.onComplete()
}
}.subscribeOn(Schedulers.newThread())
.subscribe { println("$it") }
Thread.sleep(3000)
dis.dispose()
从例子中可以看出,如果外部停止了,create 的发送信息进行了有效的停止。