RxJava & Kotlin: Conditionally delaying the first item in a stream
25 May 2016
In my application, I recently had the following requirement: In a reactive stream of items, wait until one specific item is emitted, then emit all items to the observer. If that item isn’t found, don’t emit anything and terminate with an error.
Now, RxJava has a number of useful operators like buffer
or delaySubscription
but they weren’t exactly doing the right thing. So I came up with a solution I want to share.
The code
Here’s the implementation, wrapped in an extension function:
fun <T> Observable<T>.delayUntil(value: T): Observable<T> {
val replaySubject = ReplaySubject.create<T>()
val sharedObservable = this.publish().autoConnect(2)
sharedObservable.subscribe(replaySubject)
val delayObservable = sharedObservable
.first { it == value }
.concatWith(Observable.error(IllegalArgumentException()))
return replaySubject.delaySubscription(delayObservable)
}
Let’s analyze what it’s doing:
Because the items in the stream aren’t delivered immediately at first, we buffer them in a ReplaySubject
. As a reminder, a Subject
is both, an Observer
and an Observable
. We make use of this by subscribing it directly to our item source in line 5. Once an observer subscribes to the ReplaySubject
, it will emit all previous and future items to it.
We will need to subscribe to the source observable twice, first to feed the items into the replaySubject
and second to check for the expected item. Because the source observable is potentially cold, we don’t want to subscribe to it twice directly. Instead, we use the publish
operator which will deliver the results from a single stream to all subscribers in combination with the autoConnect
operator that makes the observable actually subscribe to the source once two observers are subscribed.
Next, in lines 7–9, we setup the delayObservable
that either emits the first occurence of the expected item in the source stream or terminates with an error. Note, that even if it finds the item, it will still terminate with an error, however that is perfectly fine for our needs.
This return value of the method is the subject itself with the delaySubscription
operator applied to it. This operator expects an observable parameter and waits until it emits any item or completes without emitting before it subscribes to the original observable. This is exactly, what the delayObservable
is used for. In our case, if no item is emited, the error is propagated instead.
Examples
Let’s look at what the result looks like. We use a Semaphore so that our little test programs doesn’t instantly terminate. We create a source that emits 10 sequential numbers every 200 milliseconds. We delay it until we find a 3 and subscribe to it by printing the results with a timestamp.
val mutex = Semaphore(0)
val source = Observable.interval(200, TimeUnit.MILLISECONDS).take(10)
val delayed = source.delayUntil(3L)
println("${System.currentTimeMillis()}: start")
delayed.subscribe(
{ println("${System.currentTimeMillis()}: $it") },
{ println("Failed with $it") },
{ mutex.release() }
)
mutex.acquire()
And here’s the output:
1464210578099: start
1464210578909: 0
1464210578909: 1
1464210578909: 2
1464210578909: 3
1464210579106: 4
1464210579306: 5
1464210579506: 6
1464210579706: 7
1464210579906: 8
1464210580106: 9
We can see that items 0 through 3 are buffered and then emitted all at once. After that, the remaining items are emitted immediately, just like we wanted to.
Making the observable reusable
Now what happens if we subscribe to the same delayed observable a second time? The answer is: nothing, it won’t even complete. The problem has to do with how ConnectableObservables
work, but I will spare you the details.
Anyway, there’s an easy fix. We simply wrap the source with a call to Observable.defer
to make it compute the results anew on every call:
val delayed = Observable.defer { source.delayUntil(3L) }
Alternatively, we can cache
the output of the observable to prevent it from doing the same work twice like so:
val delayed = source.delayUntil(3L).cache()
In both cases, you can call the observables as many times as you want and get correct behavior.
All Blog Posts