This is Understanding Combine, written by Matt Neuburg. Corrections and suggestions are greatly appreciated (you can comment here). So are donations; please consider keeping me going by funding this work at http://www.paypal.me/mattneub. Or buy my books: the current (and final) editions are iOS 15 Programming Fundamentals with Swift and Programming iOS 14. Thank you!


FlatMap

.flatMap (Publishers.FlatMap) does two different things — things that might seem almost unrelated.

First and foremost, .flatMap takes a map function similar to .map and .compactMap, but what this map function produces must be a publisher. As the publisher passes out of the map function, it starts publishing. The values it produces, not the publisher itself, proceed down the pipeline. So that’s what the downstream sees: the values produced by that publisher. The publishers produced by the map function must all have the same generic Output and Failure types, so that the types expected downstream of the .flatMap operator will be consistent.

The publisher produced by a .flatMap map function is sometimes called a nested publisher, or an inner publisher.

Here’s a simple example. I’ll start with the UIControl publisher that I developed earlier; it emits a value when the user taps a button in the interface. At that point, I’ll have flatMap produce a Timer publisher:

self.myButton.publisher()
    .flatMap { _ in
        Timer.publish(every: 1, on: .main, in: .common)
            .autoconnect()
    }

The result is that when the user taps the button, the timer publisher comes into existence and starts publishing; a Date value is emitted down the pipeline every second. The example is a toy, but it clearly demonstrates the core behavior of flatMap.

A way of picturing what happens is this:

  • Any time a value arrives from upstream into the .flatMap operator, its map function uses that value as desired (or, as in the toy example, ignores it) and produces a publisher.

  • That publisher is somehow retained behind the scenes, and is somehow subscribed to, in such a way that it starts publishing values that are sent to the downstream object.

  • If the map function produces a publisher while a publisher that it produced earlier is still being retained and is still publishing values, the values from both publishers are interleaved in the single stream of values received by the downstream object. (We can see that with the toy example I just gave: if the user taps the button twice, there are two timers running simultaneously, and their outputs are interleaved.)

  • If a publisher produced by the map function completes — that is, if it runs out of values to publish — its completion message is somehow swallowed so that it does not pass downstream. This makes sense, because we would not want the whole pipeline to terminate just because the inner publisher terminated.

  • If the downstream subscriber is cancelled, all existing publishers produced by the map function are cancelled in good order.

Note that although I say that the .flatMap map function produces a publisher, that publisher does not have to be a simple publisher. Our toy example is a case in point. The map function doesn’t just produce a Timer; it produces a Timer with an .autoconnect operator attached to it. That’s okay, because an operator is a publisher. In effect, what the map function really produces is the start of a new pipeline. The rest of the (outer) pipeline — everything downstream of the flatMap operator — is subscribed to this new (inner) pipeline.

To illustrate, I’ll extend the inner pipeline a little:

self.myButton.publisher()
    .flatMap { _ in
        Timer.publish(every: 1, on: .main, in: .common)
            .autoconnect()
            .scan(0) {count,date in count + 1}
    }

Every time the user taps the button, the series of integers 1, 2, 3 start flowing down rest of the pipeline, one per second. If the series is already flowing because this is not the first time the user tapped the button, the pipeline produces multiple series interleaved with one another.

Erasing to AnyPublisher

It’s easy to get confused about the type of the publisher produced by the map function. In that example, for instance, it is a

Scan<Publishers.Autoconnect<Timer.TimerPublisher>, Int>

For this reason, it is good practice to end the inner pipeline with a call to eraseToAnyPublisher. This collapses the complex type into a single AnyPublisher whose generic types are clear:

self.myButton.publisher()
    .flatMap { _ in
        Timer.publish(every: 1, on: .main, in: .common)
            .autoconnect()
            .scan(0) {count,date in count + 1}
            .eraseToAnyPublisher()
    }

That map function produces an AnyPublisher<Int, Never>, which is a lot easier to reason about.

And you will need to reason about it if the map function does anything other than return a publisher in a single line. For example, if we merely change our map function so that it stores the publisher in a variable and returns that variable, we no longer compile:

self.myButton.publisher()
    .flatMap { _ in // compile error
        let p = Timer.publish(every: 1, on: .main, in: .common)
            .autoconnect()
            .scan(0) {count,date in count + 1}
            .eraseToAnyPublisher()
        return p
    }

To work around this, we have to state the output publisher type explicitly in our in line:

self.myButton.publisher()
    .flatMap { _ -> AnyPublisher<Int,Never> in
        let p = Timer.publish(every: 1, on: .main, in: .common)
            .autoconnect()
            .scan(0) {count,date in count + 1}
            .eraseToAnyPublisher()
        return p
    }

That is a lot easier to do when you use eraseToAnyPublisher. Also, eraseToAnyPublisher allows your map function to produce different publishers, conditionally. They can be different types of publisher, just so long as their generic parameterized types are the same, so that they can be type-erased to the same AnyPublisher:

var alreadyProduced = false
self.myButton.publisher()
    .flatMap { _ -> AnyPublisher<Int,Never> in
        let p = Timer.publish(every: 1, on: .main, in: .common)
            .autoconnect()
            .scan(0) {count,date in count + 1}
            .eraseToAnyPublisher()
        if !alreadyProduced {
            alreadyProduced = true
            return p
        } else {
            return Empty<Int,Never>(completeImmediately: true)
                .eraseToAnyPublisher()
        }
    }

That pipeline counts 1, 2, 3 once per second starting the first time the user taps the button; after that, when the user taps the button, nothing happens. That illustrates what I meant when I said that the value publishers are useful particularly in conjunction with .flatMap. Our map function has to produce something under all conditions, so in order to do nothing, we need a publisher that means “do nothing.” That is exactly what Empty is.

Producing a Publisher that can Fail

Another issue you’re likely to run into when using .flatMap is when you want to produce a publisher that can fail. (This didn’t arise in the toy example earlier; a Timer publisher’s Failure type is Never.) Such a publisher is perfectly legal, and failure itself is handled coherently:

  • If a publisher produced by the map function emits a failure, the failure passes down the pipeline and all the produced publishers are cancelled, as well as the .flatMap operator itself and the whole pipeline upstream from there. The entire pipeline thus comes to stop. This makes sense, because a failure is intended to be fatal.

However, there’s another part to that rule. .flatMap is not able to change the Failure type coming down the pipeline. Therefore:

  • The Failure type of the publisher produced by the map function must match the Failure type of the .flatMap operator’s upstream object.

That can pose issues if the Failure type of the publisher produced by the map function doesn’t match the Failure type of the upstream object. To fix that, you’ll need to change the Failure type of the upstream object, before the .flatMap call, so that it matches the Failure type of the publisher you want to produce. That isn’t difficult to do; there are operators for exactly that purpose. But you have to remember to do it, or your code will mysteriously fail to compile.

For example, suppose (just for the sake of argument) that our pipeline starts with a data task publisher instead of a UIControl publisher:

URLSession.shared.dataTaskPublisher(for: url)
    .flatMap { _ -> AnyPublisher<Int,Never> in

That won’t compile, because a data task publisher’s Failure type is URLError; you can’t produce a publisher with a Never failure into a pipeline with a Failure type of URLError. The simple solution is replace the Failure type with Never, before the .flatMap call. One way to do that is with .replaceError(with:), supplying a dummy value of the correct output type:

URLSession.shared.dataTaskPublisher(for: url)
    .replaceError(with: (data: Data(), response: URLResponse()))
    .flatMap { _ -> AnyPublisher<Int,Never> in

The converse problem is that the upstream Failure type is Never, but you want to produce a publisher that can fail. In that case, use .setFailureType(to:), like this:

.setFailureType(to: Error.self)

Again, that comes before the call to .flatMap. You’re changing the upstream Failure type to Error, even though in fact there won’t be any failure coming from upstream.

Serializing Asynchronicity

An important way to characterize what .flatMap does is that it serializes asynchronicity. By this I mean that it is key to ensuring that one asynchronous operation cannot start until a prior asynchronous operation has completed.

Our toy example is a case in point: our timer doesn’t start producing values — indeed, it doesn’t even come into existence — until the user taps the button to start the pipeline running.

Here’s a more realistic example. Let’s say we want to interact with the user’s contacts database using the Contacts framework. Any such interaction must be preceeded by a check for user authorization; if we don’t have authorization, we need to ask for it. But asking for authorization and then learning whether we have it is an asynchronous operation. This puts us in a bind: if we have authorization, we can just start interacting with the contacts database, but if we don’t, we have to ask for authorization, wait until we learn whether the user has granted authorization, and then if so we can start interacting with the contacts database. Implementing a strategy that expresses that logic can be quite tricky, because asking for authorization is asynchronous, but just going ahead if we already have authorization is not. The Combine framework, however, makes it easy to encapsulate this flow elegantly.

To simplify and generalize the presentation, I’ll start by writing some utility functions. First, here’s a function that checks whether we have authorization up front. There are three possible outcomes:

  • We have authorization.

  • We don’t have authorization but we might be able to get it (because our authorization status is .notDetermined).

  • We don’t have authorization and can’t get it, so there’s no point proceeding any further.

So I’ll characterize the outcome as a Result, where .success is a Bool saying whether we already have authorization, and .failure means we should just give up:

enum NoPoint : Error { case userRefusedAuthorization }
func checkAccess() -> Result<Bool, Error> {
    Result<Bool, Error> {
        let status = CNContactStore.authorizationStatus(for:.contacts)
        switch status {
        case .authorized: return true
        case .notDetermined: return false
        default: throw NoPoint.userRefusedAuthorization
        }
    }
}

Next here’s a function that actually checks for authorization. This is an asynchronous operation, so I’ll wrap it up in a Future:

func requestAccessFuture() -> Future<Bool, Error> {
    Future<Bool, Error> { promise in
        CNContactStore().requestAccess(for:.contacts) { ok, err in
            if ok {
                promise(.success(ok))
            } else {
                promise(.failure(NoPoint.userRefusedAuthorization))
            }
        }
    }
}

Okay! Now all I have to do is write a pipeline that hooks up calls to those two methods in such a way as to serialize asynchronicity. And we know how to do that: with .flatMap! We’ll start our pipeline with a button tap:

self.myButton.publisher()
    .setFailureType(to: Error.self)
    .flatMap { _ in
        checkAccess().publisher
    }.flatMap { gotAccess -> AnyPublisher<Bool,Error> in
        if gotAccess {
            return Just(true)
                .setFailureType(to: Error.self)
                .eraseToAnyPublisher()
        } else {
            return requestAccessFuture()
                .eraseToAnyPublisher()
        }
    }

That is a beautiful example of Combine in action. In a single pipeline, we have specified what should happen when the button is tapped:

  • If we have authorization already, we produce true and we can then go on to interact with the contacts database.

  • If we don’t have authorization already and we can’t get it (because our status is not .notDetermined), we fail.

  • If we don’t have authorization already, but our status is .notDetermined, we ask for authorization, and if we get it, we produce true.

  • If we don’t have authorization already, but our status is .notDetermined, we ask for authorization, and if we don’t get it, we fail.

Thus our pipeline produces true if and only if we can get authorization, no matter how and no matter whether the process of obtaining it is asynchronous.

Failing Without Terminating

Another common use of .flatMap is when you want part of a pipeline to be able to fail without disabling the entire pipeline. The idea is that you fail within the inner pipeline produced by the map function, and catch the failure within the inner pipeline. This prevents the failure and attendant cancellation from escaping the inner pipeline, and so the outer pipeline never hears about it and just keeps going.

Here’s an artificial but telling example. We have a list of URLs that we want to use to fetch data from the network. So we turn the list into a publisher, and follow it with a .flatMap whose map function produces a data task publisher:

let urls = [
    "https://www.apeth.com/pep/manny.jpg",
    "https://www.apethh.com/pep/moe.jpg",
    "https://www.apeth.com/pep/jack.jpg"
]
urls.map(URL.init(string:)).compactMap{$0}.publisher
    .setFailureType(to: URLError.self)
    .flatMap { url in
        URLSession.shared.dataTaskPublisher(for: url)
    }

It looks good on paper, but there’s a problem. It happens that the second URL is a bad one; it will generate a failure. That failure arrives at the end of the pipeline, and the whole pipeline is torn down before the other two data task publishers even have a chance to get any data and publish it. In other words, just because one data task publisher threw a failure, the whole pipeline fails.

We might try to prevent this by catching the failure, replacing it with empty data:

urls.map(URL.init(string:)).compactMap{$0}.publisher
    .setFailureType(to: URLError.self)
    .flatMap { url in
        URLSession.shared.dataTaskPublisher(for: url)
    }
.replaceError(with: (data: Data(), response: URLResponse()))

But that doesn’t actually solve anything. When the second data task publisher fails, the failure no longer percolates down the pipeline, but the attendant cancellation percolates up the pipeline and the whole pipeline is still cancelled before any data can be downloaded.

The solution is to put the .replaceError call inside the map function, as part of the publisher that it produces:

urls.map(URL.init(string:)).compactMap{$0}.publisher
    .flatMap { url in
        URLSession.shared.dataTaskPublisher(for: url)
            .replaceError(with: (data: Data(), response: URLResponse()))
    }

Now what happens is that the second publisher fails and produces an empty data object, and then it alone is cancelled. That’s fine with us, because it already did its job. Meanwhile, the other two data tasks go right on working, and they produce their data. So that demonstrates how .flatMap can contain a failure within a part of the pipeline and replace it with a value, without causing the whole pipeline to terminate.

There is one possible downside to that use of .replaceError, namely, that we have “swallowed” the error. No error is passing down the main pipeline, which is good because the main pipeline isn’t cancelled; but no information about the error is passing down the main pipeline either. A solution to that is to use .catch instead of .replaceError and produce a Result object, which carries either the value information in its .success case or the error information in its .failure case:

urls.map(URL.init(string:)).compactMap{$0}.publisher
    .flatMap { url in
        URLSession.shared.dataTaskPublisher(for: url)
            .map { Result<Data,Error>.success($0.data) }
            .catch { Just(Result<Data,Error>.failure($0)) }
    }

The value passing down the main pipeline is now a more complex object than before — it’s a Result object, and subsequent operators will need to take account of that. But it is also a more communicative object: it hands subsequent operators the Data from the data task or else tells them about the error without in fact being an error.

Exerting Backpressure

You may remember that I said .flatMap does two different, almost unrelated things. The first thing it does is to produce a publisher from its map function. It’s now time to talk about the second thing.

It turns out that .flatMap takes an optional first parameter: maxPublishers:. This is a Subscribers.Demand (I talked earlier about what that is).

Now, if you omit this parameter, it is .unlimited. But by including it, you can limit the maximum number of publishers you want to produce at once. This means that you have the opportunity, through .flatMap, to exert backpressure on the upstream publisher. And in fact, .flatMap is just about the only built-in operator that lets you exert backpressure directly. That makes .flatMap extremely important in a whole new way.

Here’s a toy example that demonstrates:

[1,2].publisher
    .flatMap { _ in
        Just(Date())
            .delay(for: .seconds(10), scheduler: DispatchQueue.main)
    }

We start with a “one-two punch” — two integers that arrive into the .flatMap operator in quick succession. There is a delay of ten seconds, and then two dates are emitted from the pipeline, also in quick succession, and effectively set to the same date–time (give or take a tiny amount), which is ten seconds ago.

Now we’ll change the example to use a maxPublishers: value of .max(1):

[1,2].publisher
    .flatMap(maxPublishers:.max(1)) { _ in
        Just(Date())
            .delay(for: .seconds(10), scheduler: DispatchQueue.main)
    }

What happens now is that there is a delay of ten seconds, and one date is emitted from the pipeline, which is ten seconds ago. Then another ten seconds elapses, and another date is emitted from the pipeline, set to ten seconds after the first date, which is now also ten seconds ago.

Why? Well, the Just publisher is formed with the current date instantly when a value arrives from upstream. But a .delay operator doesn’t actually publish until the delay has elapsed. And we have said that we only want to produce one publisher at a time. So the .flatMap operator requests just one value from the Sequence publisher: it sends a demand of .max(1), and the 1 arrives, and the Just is created — and then nothing happens for ten seconds, while the .delay operator waits ten seconds. Then the inner publisher publishes, the published Date value is sent downstream, and at long last the .flatMap operator turns around to the upstream and sends another demand of .max(1), and the 2 arrives — and so on.

So the use of maxPublishers effectively throttles the pipeline by exerting backpressure in accordance with the actual publication rate of the inner pipeline. Once again, we can use this feature of .flatMap to serialize asynchronicity. But this time we are serializing our own asynchronicity — the asynchronicity of the map function publishers themselves.

For a more practical example, let’s return to the idea of fetching over the network from a sequence of URLs:

let urls = [
    "https://www.apeth.com/pep/manny.jpg",
    "https://www.apethh.com/pep/moe.jpg",
    "https://www.apeth.com/pep/jack.jpg"
]
urls.map(URL.init(string:)).compactMap{$0}.publisher
    .flatMap { url in
        URLSession.shared.dataTaskPublisher(for: url)
        .replaceError(with: (data: Data(), response: URLResponse()))
    }

The URLs may arrive sequentially, but the data tasks all run simultaneously — they are all started immediately, and their results may arrive in any order, depending on how long each network request takes to complete. That might not be what we want. (For instance, we might feel that it puts an unnecessary strain on the network.) Simply using maxPublishers: solves the problem:

urls.map(URL.init(string:)).compactMap{$0}.publisher
    .flatMap(maxPublishers:.max(1)) { url in

Now no network request starts until the previous network request has completed. Only one request is posted to the URLSession at a time, and the results are guaranteed to arrive in the same order as the original URLs — in this case, that will be the data for manny.jpg, empty data for moe.jpg (because it’s a bad URL), and the data for jack.jpg.

NOTE: Using backpressure in that way can cause values emitted by the upstream publisher to be lost, because the .flatMap is still “busy” with the existing publisher when the new value would arrive from upstream. A possible solution to that is to use a buffer.


Table of Contents