This is Understanding Combine, written by Matt Neuburg. Corrections and suggestions are greatly appreciated (you can comment here). So are donations; please consider keeping me going by funding this work at http://www.paypal.me/mattneub. Or buy my books: the current (and final) editions are iOS 15 Programming Fundamentals with Swift and Programming iOS 14. Thank you!


Zip

.zip (Publishers.Zip) takes a publisher as a parameter; it is also applied to a publisher (obviously). Both publishers must have the same Failure generic types, but their Output types can be different. Now there are effectively two upstream publishers. When either of those upstream publishers produces a value, this operator puts it in a buffer (effectively a LIFO stack). When both of those upstream publishers have produced a value — that is, any time there is something in both buffers — this operator pops the oldest value from the start of both buffers, combines them into a tuple, and emits the tuple.

If either upstream publisher emits a .finished completion, then if the buffer for that publisher is empty (because the .zip operator has popped its last value to send it downstream), the .zip operator cancels the other publisher and sends a .finished completion downstream. In other words, the pipeline terminates when the last value from either publisher has been sent downstream; the maximum count of tuples published is the count of the shorter upstream publisher’s stream. (That’s similar to the familiar sequence zip function.)

If either upstream publisher sends a failure, this operator immediately cancels the other publisher and sends the completion on downstream.

Demonstrating .zip adequately with a toy example is a little tricky because, to get the point, we have to introduce some delay into at least one of the streams:

[1,2,3].publisher
    .zip(
        ["a","b"].publisher
            .flatMap(maxPublishers:.max(1)) {
                Just($0).delay(for: 1, scheduler: DispatchQueue.main)
            }
    )

In that example, we have two finite sequence publishers. The first one emits 1, then 2, then 3, in quick succession. The second one uses .flatMap to serialize asynchronicity, so that the .delay applies to each value in turn; therefore it waits one second and emits "a", then waits one second and emits "b". So the timing looks like this:

1st  2nd
===============
1
2
3
(finished)
    [one second]
    "a"
    [one second]
    "b"
    (finished)

The output from the .zip operator is:

[one second]
(1, "a")
[one second]
(2, "b")
finished

That shows the logic of .zip operator in action. It cannot publish anything until both upstream publishers have published. At that point, it pairs the oldest value that came from each publisher and publishes that pair as a tuple. Then it waits again until it has a value from both upstream publishers — and so on. In our example, after publishing the second tuple, the 3 is thrown away; the second publisher has finished, so there will never be anything to pair the 3 with, and so the .zip operator finishes.

There is another form of .zip that takes two parameters — a publisher and a function. When this operator is ready to produce a tuple, instead of forming that tuple and sending it downstream, it passes the two values as parameters into the function, which is permitted to transform it into a different value. It’s as if the first form of .zip were immediately followed by a .map operator; in fact, that’s exactly what’s happening! The function is a map function, and this form of .zip is simply a convenient way of making a Publishers.Map whose upstream is a Publishers.Zip.

The convenience form of .zip is very nice, because you might not want to pass a tuple downstream. Often, what you really want is to combine the two incoming values in some way, which is just what .map lets you do. To illustrate, I’ll modify the previous example so that both streams are emitting Ints, and I’ll pass the sum of each pair downstream:

[1,2,3].publisher
    .zip(
        [100,200].publisher
            .flatMap(maxPublishers:.max(1)) {
                Just($0).delay(for: 1, scheduler: DispatchQueue.main)
            }
    ) { $0 + $1 }

That produces this output:

[one second]
101
[one second]
202
finished

Like .merge, the .zip operator comes with additional forms that take two publishers, thus zipping three streams altogether (Publishers.Zip3), or three publishers, zipping four streams altogether (Publishers.Zip4). Again, each of those comes in two forms, depending whether or not you want to attach a map function.

Unlike .merge, however, you cannot readily emulate Zip3 and Zip4 by looping over multiple publishers, because these publishers are allowed to have different output types. That’s why the output is a tuple whose elements can be of different types. You cannot just append new values to a tuple! And you can’t emulate this behavior with an array, because in an array of publishers, the output types must all be the same. To zip an arbitrary number of publishers, therefore, you need to apply the .zip operator multiple times, manually.

For example, let’s say we want to .zip five publishers. We can zip the first four publishers and zip the last publisher to the result:

[1,2,3].publisher
    .zip([10,11,12].publisher, [20,21,22].publisher, [30,31,32].publisher)
    .zip([40,41,42].publisher)

However, we have not really emulated a hypothetical Zip5, because the output is a tuple whose first element is a tuple:

((1, 10, 20, 30), 40)
((2, 11, 21, 31), 41)
((3, 12, 22, 32), 42)

If you really wanted to, you could use the map function to flatten those results:

[1,2,3].publisher
    .zip([10,11,12].publisher, [20,21,22].publisher, [30,31,32].publisher)
    .zip([40,41,42].publisher) {($0.0, $0.1, $0.2, $0.3, $1)}

The result is:

(1, 10, 20, 30, 40)
(2, 11, 21, 31, 41)
(3, 12, 22, 32, 42)

But this approach is not very general. On the other hand, when (as here) the publishers all have the same output type, you can use an array, so you can form a general solution by looping over an array of the publishers:

    var pubs = [
        [1,2,3].publisher,
        [10,11,12].publisher,
        [20,21,22].publisher,
        [30,31,32].publisher,
        [40,41,42].publisher
    ]
    pubs.dropFirst().reduce(into: AnyPublisher(pubs[0].map{[$0]})) {
        res, pub in
        res = res.zip(pub) {
            i1, i2 -> [Int] in
            return i1 + [i2]
        }.eraseToAnyPublisher()
    }

That behaves like a hypothetical Zip5, but the output is an array:

[1, 10, 20, 30, 40]
[2, 11, 21, 31, 41]
[3, 12, 22, 32, 42]

Table of Contents