Disposing

从字面意思,是处理,安排的意思,当我们订阅一个观察源,来接收消息,创建一个处理来自观察源链的消息的流,当然这是耗费资源的,当我们处理完成后,我们期望他们能进行垃圾回收,值得感激的是,对于有限的观察源,当调用onComplete的时候Rx 会自动处理掉这些垃圾,当对于无限的观察源,就没有这么幸运了,你需要自己处理,基于这样的事实,你就不能再依靠垃圾回收机制了,相应的处理也是必须进行的,来防止内存泄漏。

Disposable 将观察者和观察源之间建立连接,你可以调用dispose方法来停止发送,处理所有用在观察者上的资源。你也可以调用isDisposed 方法进行检测是否已经处理过。

其他别说,看例子

 val seconds = Observable.interval(1, TimeUnit.SECONDS)

        val disposable = seconds.subscribe { l -> println("Give me a double : $l") }

        //sleep 5 seconds
        Thread.sleep(5000)
        //dispose and stop emissions
        disposable.dispose()
        Thread.sleep(5000)

当我们调用dispose 的时候,观察源会停止,嗨不嗨

观察者中Disposable

顾名思义,就是在观察者中,在订阅的时候,获取disposable 对象,在其他方法中合适的时候执行相应的方法,这是不是很帅!

来看例子,同样的例子,感受一下它的又一种用法

       val seconds = Observable.interval(1, TimeUnit.SECONDS)

        seconds.subscribe(object : Observer<Long> {
            private var disposable: Disposable? = null
            override fun onComplete() {
                println("Game over, you knooooooooooooooooooooooooooooooooooooooooooooooow?")
            }

            override fun onSubscribe(d: Disposable) {
                this.disposable = d
            }

            override fun onNext(t: Long) {
                println("onNext t = $t")
                if (t == 7L) {
                    disposable?.dispose()
                    println("There is a disposing.")
                }
            }

            override fun onError(e: Throwable) {
            }

        })
        //sleep 5 seconds
        Thread.sleep(10 * 1000)

CompositeDisposable

当我们需要同时停止多个订阅时,我们可以分别单独停止,但是我们有更好的选择,将每个Disposable 添加到CompositeDisposable 中,由它来统一管理,进行停止工作,是不是感觉这家伙在有的时候还是很好用的呢,想要停止所有的,只需要调用xx.disposable. 一切都很nice,不是吗!

        val seconds = Observable.interval(1, TimeUnit.SECONDS)
        var composite: CompositeDisposable = CompositeDisposable()
        composite.add(seconds.subscribe { println("This num is : $it") })
        Thread.sleep(2000)
        composite.add(seconds.subscribe { println("WO DE GE QU: $it") })
        Thread.sleep(4000)
        composite.dispose()
        Thread.sleep(2000)

Observable.create 中怎么用Disposable

如果在create 中我们做了很多耗时操作,为了确保我们调用过dispose 方法后,内部执行的方法能够及时停止,我们需要在create 内部进行isDisposabled 的判断,这就是用到Disposable 的另一方式。

  var dis = Observable.create<String> {
            try {
                for (i in 1..10) {
                    Thread.sleep(1000)
                    if (!it.isDisposed) {
                        it.onNext("This num is $i")
                    } else {
                        return@create
                    }
                }
                it.onComplete()
            } catch (e: Exception) {
                it.onComplete()
            }
        }.subscribeOn(Schedulers.newThread())
                .subscribe { println("$it") }
        Thread.sleep(3000)
        dis.dispose()

从例子中可以看出,如果外部停止了,create 的发送信息进行了有效的停止。

results matching ""

    No results matching ""