This is Understanding Combine, written by Matt Neuburg. Corrections and suggestions are greatly appreciated (you can comment here). So are donations; please consider keeping me going by funding this work at http://www.paypal.me/mattneub. Or buy my books: the current (and final) editions are iOS 15 Programming Fundamentals with Swift and Programming iOS 14. Thank you!


CombineLatest

.combineLatest (Publishers.CombineLatest) takes a publisher as a parameter; it is also applied to a publisher (obviously). Both publishers must have the same Failure generic types, but their Output types can be different. It is sort of like .zip turned upside-down:

  • .zip waits for both upstream publishers to publish and then emits their oldest contributions as a tuple.

  • .combineLatest waits for both upstream publishers to publish and then emits their newest contributions as a tuple.

Recall the toy example I used earlier to demonstrate .zip:

[1,2,3].publisher
    .zip(
        ["a","b"].publisher
            .flatMap(maxPublishers:.max(1)) {
                Just($0).delay(for: 1, scheduler: DispatchQueue.main)
            }
    )

The timing, you remember, works like this:

1st  2nd
===============
1
2
3
(finished)
    [one second]
    "a"
    [one second]
    "b"
    (finished)

With .zip, that resulted in:

(1, "a")
(2, "b")
finished

Now let’s replace .zip with .combineLatest:

[1,2,3].publisher
    .combineLatest(
        ["a","b"].publisher
            .flatMap(maxPublishers:.max(1)) {
                Just($0).delay(for: 1, scheduler: DispatchQueue.main)
            }
    )

That results in:

(3, "a")
(3, "b")
finished

What happened? The 1, 2, and 3 arrived from the first publisher, and then one second elapsed. Then the "a" arrived from the second publisher. Instead of going all the way back to the start of the first publisher’s buffer and fetching the 1, like .zip, the .combineLatest operator uses the end of the first publisher’s buffer, which is 3. In fact, there is no “start of the buffer”; the only thing the operator is remembering is the most recently arrived value from each publisher. Moreover, having published 3, it doesn’t remove it from the buffer, because no subsequent value arrives from the first publisher. Therefore, when the "b" arrives from the second publisher, the operator publishes the 3 again.

So .combineLatest waits for both publishers to publish, and from then on it publishes every time either publisher publishes — and what it publishes is the two most recent values emitted by both publishers.

When both publishers have emitted a .finished completion, the operator emits a .finished completion of its own. If either publisher emits a failure, the operator cancels the other publisher and passes the failure downstream.

Just as with .zip, there’s variant of .combineLatest that takes a map function so that instead of publishing a tuple of the two latest values, you can combine however you like and publish that instead. And, as with .zip, this is really just a convenient way of making a Map whose upstream is a CombineLatest.

Just as with .zip, there are additional publishers CombineLatest3 and CombineLatest4, so that you can join three or four publishers conveniently: just call .combineLatest with a comma-separated list of up to three publishers.

Implementing WithLatestFrom

A complaint that arises from time to time among RxSwift users is that there is no Combine equivalent of RxSwift’s .withLatestFrom operator. It’s like .combineLatest, emitting a tuple consisting of the newest contributions from each of two publishers; but it publishes only whenever the first of the two publishers publishes (whereas .combineLatest publishes when either of the two publishers publishes).

It’s a pity that this operator doesn’t exist right out of the box, but it’s fairly easy to emulate its behavior. After all, it effectively is the same as .combineLatest except that some of the values emitted by .combineLatest need to be filtered out and prevented from flowing further down the pipeline.

Here’s a fairly straightforward way to do that. When the first publisher publishes, we’ll wrap its value in a tuple that also contains a timestamp. Then we’ll do a .combineLatest with the second publisher — and then we’ll immediately use .removeDuplicates to block the output from .combineLatest unless the timestamp has changed, indicating that the first publisher has published a new value. Finally, we’ll strip out the timestamp so that only the actual values from the two publishers pass down the pipeline:

pub1.map {value in (value:value, date:Date()) }
    .combineLatest(pub2)
    .removeDuplicates { $0.0.date == $1.0.date }
    .map { ($0.value, $1) }

If that’s something you do a lot, it’s easy to wrap it up as a custom operator, as I’ll explain later:

extension Publisher {
    func withLatestFrom<Other:Publisher>(_ other: Other)
        -> AnyPublisher<(Self.Output, Other.Output), Failure>
        where Self.Failure == Other.Failure {
            self.map { value in (value:value, date:Date()) }
                .combineLatest(other)
                .removeDuplicates { $0.0.date == $1.0.date }
                .map { ($0.value, $1) }
                .eraseToAnyPublisher()
    }
}

Now our original code can be written more simply, as pub1.withLatestFrom(pub2).


Table of Contents