重播&缓存
多过去也发送数据进行再次传播,如果不想错过某些数据,可以考虑重播,特别是interval
var ob = Observable.interval(1, TimeUnit.SECONDS).replay(2).autoConnect()
ob.subscribe { println("Observer 1: This time is $it") }
Thread.sleep(4000)
ob.subscribe { println("Observer 2: This time is $it") }
/**
Observer 1: This time is 0
Observer 1: This time is 1
Observer 1: This time is 2
Observer 1: This time is 3
Observer 2: This time is 2
Observer 2: This time is 3*/
缓存
和重播是一样的,当时缓存是长期的,不用改变订阅的行为
val cachedRollingTotals = Observable.just(6, 2, 5, 7, 1, 4, 9, 8, 3)
.scan(0) { total, next -> total + next }
.cache()
cachedRollingTotals.subscribe {
println("This is $it")
如果利用interval 我们来看看cache 与 replay 同样的用法
var Ob = Observable.interval(1, TimeUnit.SECONDS).publish().autoConnect().cache()
Ob.subscribe { println("Observer 1: $it") }
Thread.sleep(4000)
Ob.subscribe { println("Observer 2: $it") }
Subject
这是组建Observable及Observer内部的操作流程的类,有点像builder的感觉,但是他有observable和observer的双重功能,因为他extends了Observable,implement Observer
- 组建observable 流程
这很好地模拟了Observable的流程
Subject<String> subject = PublishSubject.create();
subject.map(String::length)
.subscribe(System.out::println);
subject.onNext("Alpha");
subject.onNext("Beta");
subject.onNext("Gamma");
subject.onComplete();
- 组建Observer
也可以当做是observer 使用,是不是感觉功能很强大
var ob1 = Observable.just("dsdsds", "Sdsdsa", "ddsd")
var sub = PublishSubject.create<String>()
sub.subscribe { println("This letter is $it") }
ob1.subscribe(sub)
- 注意:
- subject 组建observable时,要先订阅后,后调用
- 组建 observer 时,先创建,组建,在调用订阅
- 序列化
当我们使用考虑到啊多线程的情况时,我们就需要序列化,不然就会发生线程不安全的情况,用法很简单,调用一个方法就ok
Subject<String> subject =
PublishSubject.<String>create().toSerialized();
- BehaviorSubject
和PublishSubject有类似的功能,但是Behavior 在此基础上有replay 的功能,相当于功能加强了
var ob1 = Observable.just("dsdsds", "Sdsdsa", "ddsd")
var ob2 = Observable.just("AAAAAAAAAAAAAAAAAA", "BBBBBBBBBBBBBBBBBBBBB")
var sub = BehaviorSubject.create<String>()
sub.subscribe { println("This letter is $it") }
ob2.subscribe(sub)
ob1.subscribe(sub)
上面这种情况,只会调动第一调用的,有定蒙圈, 看下面例子
val subject = BehaviorSubject.create<Any>()
subject.subscribe { s -> println("Observer 1: $s") }
subject.onNext("Alpha")
subject.onNext("Beta")
subject.onNext("Gamma")
subject.subscribe { s -> println("Observer 2: $s") }
如果是publishSubject 最后一条是不会打印出任何东西的,但是如果是Behavior则会打印最后一个缓存的gamma
- ReplaySubject
这个是有点普通Observable的replay方法,和behaviorSubject相同之处也是具有重播功能,但是这里是全部重播,但是后者是只重播最后一个
val subject = ReplaySubject.create<Any>()
subject.subscribe { s -> println("Observer 1: $s") }
subject.onNext("Alpha")
subject.onNext("Beta")
subject.onNext("Gamma")
subject.subscribe { s -> println("Observer 2: $s") }
- AsyncSubject
这从名字上看是异步功能,实际也是这个目的,他的功能就是只发送接收到的最后一个信息源,在调用OnComplete之前的一次onNext 调用
val subject = AsyncSubject.create<Any>()
subject.subscribe { s -> println("Observer 1: $s") }
subject.onNext("Alpha")
subject.onNext("Beta")
subject.onComplete()
subject.onNext("Gamma")
subject.subscribe { s -> println("Observer 2: $s") }
/**
打印
Observer 1: Beta
Observer 2: Beta
*/
- UnicastSubject
唯一传播,当有一个订阅接收消息后,他将会释放资源,其他订阅者将不会收到,发送的消息,这就是人们常说的,这小伙用情很专一,值得点赞
val subject = UnicastSubject.create<Any>()
subject.subscribe { s -> println("Observer 1: $s") }
// subject.subscribe { s -> println("Observer 2: $s") } 如果这里解开注释 运行将会报错
subject.onNext("Alpha")
subject.onNext("Beta")
subject.onComplete()
subject.onNext("Gamma")