重播&缓存

多过去也发送数据进行再次传播,如果不想错过某些数据,可以考虑重播,特别是interval

 var ob = Observable.interval(1, TimeUnit.SECONDS).replay(2).autoConnect()
 ob.subscribe { println("Observer 1: This time is $it") }
 Thread.sleep(4000)
 ob.subscribe { println("Observer 2: This time is $it") }    
 /**
Observer 1: This time is 0
Observer 1: This time is 1
Observer 1: This time is 2
Observer 1: This time is 3
Observer 2: This time is 2
Observer 2: This time is 3*/

缓存

和重播是一样的,当时缓存是长期的,不用改变订阅的行为

 val cachedRollingTotals = Observable.just(6, 2, 5, 7, 1, 4, 9, 8, 3)
                .scan(0) { total, next -> total + next }
                .cache()

 cachedRollingTotals.subscribe {
            println("This is $it")

如果利用interval 我们来看看cache 与 replay 同样的用法

 var Ob = Observable.interval(1, TimeUnit.SECONDS).publish().autoConnect().cache()
 Ob.subscribe { println("Observer 1: $it") }
 Thread.sleep(4000)
 Ob.subscribe { println("Observer 2: $it") }

Subject

这是组建Observable及Observer内部的操作流程的类,有点像builder的感觉,但是他有observable和observer的双重功能,因为他extends了Observable,implement Observer

  • 组建observable 流程

这很好地模拟了Observable的流程

        Subject<String> subject = PublishSubject.create();

        subject.map(String::length)
                .subscribe(System.out::println);

        subject.onNext("Alpha");
        subject.onNext("Beta");
        subject.onNext("Gamma");
        subject.onComplete();
  • 组建Observer

也可以当做是observer 使用,是不是感觉功能很强大

 var ob1 = Observable.just("dsdsds", "Sdsdsa", "ddsd")
        var sub = PublishSubject.create<String>()
        sub.subscribe { println("This letter is $it") }
        ob1.subscribe(sub)
  • 注意:
  • subject 组建observable时,要先订阅后,后调用
  • 组建 observer 时,先创建,组建,在调用订阅
  • 序列化

当我们使用考虑到啊多线程的情况时,我们就需要序列化,不然就会发生线程不安全的情况,用法很简单,调用一个方法就ok

Subject<String> subject = 
        PublishSubject.<String>create().toSerialized();
  • BehaviorSubject

和PublishSubject有类似的功能,但是Behavior 在此基础上有replay 的功能,相当于功能加强了

   var ob1 = Observable.just("dsdsds", "Sdsdsa", "ddsd")
        var ob2 = Observable.just("AAAAAAAAAAAAAAAAAA", "BBBBBBBBBBBBBBBBBBBBB")
        var sub = BehaviorSubject.create<String>()
        sub.subscribe { println("This letter is $it") }
        ob2.subscribe(sub)
        ob1.subscribe(sub)

上面这种情况,只会调动第一调用的,有定蒙圈, 看下面例子

        val subject = BehaviorSubject.create<Any>()

        subject.subscribe { s -> println("Observer 1: $s") }

        subject.onNext("Alpha")
        subject.onNext("Beta")
        subject.onNext("Gamma")

        subject.subscribe { s -> println("Observer 2: $s") }

如果是publishSubject 最后一条是不会打印出任何东西的,但是如果是Behavior则会打印最后一个缓存的gamma

  • ReplaySubject

这个是有点普通Observable的replay方法,和behaviorSubject相同之处也是具有重播功能,但是这里是全部重播,但是后者是只重播最后一个

val subject = ReplaySubject.create<Any>()

        subject.subscribe { s -> println("Observer 1: $s") }

        subject.onNext("Alpha")
        subject.onNext("Beta")
        subject.onNext("Gamma")

        subject.subscribe { s -> println("Observer 2: $s") }
  • AsyncSubject

这从名字上看是异步功能,实际也是这个目的,他的功能就是只发送接收到的最后一个信息源,在调用OnComplete之前的一次onNext 调用

 val subject = AsyncSubject.create<Any>()

        subject.subscribe { s -> println("Observer 1: $s") }

        subject.onNext("Alpha")
        subject.onNext("Beta")
        subject.onComplete()
        subject.onNext("Gamma")

        subject.subscribe { s -> println("Observer 2: $s") }
        /**
        打印
        Observer 1: Beta
        Observer 2: Beta
        */
  • UnicastSubject

唯一传播,当有一个订阅接收消息后,他将会释放资源,其他订阅者将不会收到,发送的消息,这就是人们常说的,这小伙用情很专一,值得点赞

        val subject = UnicastSubject.create<Any>()

        subject.subscribe { s -> println("Observer 1: $s") }
//        subject.subscribe { s -> println("Observer 2: $s") } 如果这里解开注释 运行将会报错
        subject.onNext("Alpha")
        subject.onNext("Beta")
        subject.onComplete()
        subject.onNext("Gamma")

results matching ""

    No results matching ""