This is Understanding Combine, written by Matt Neuburg. Corrections and suggestions are greatly appreciated (you can comment here). So are donations; please consider keeping me going by funding this work at http://www.paypal.me/mattneub. Or buy my books: the current (and final) editions are iOS 15 Programming Fundamentals with Swift and Programming iOS 14. Thank you!


Composed Operators

Sometimes, you might need to create your own operator. When you do, you usually won’t actually need to write the operator from scratch. Instead, you’ll combine existing operators to form a new operator. I call that sort of operator a composed operator.

Here’s an example. Recall how earlier we devised a technique for combining .delay with .retry only just in case there was an upstream failure:

let pub = URLSession.shared.dataTaskPublisher(for: url).share()
let head = pub.catch {_ in 
    pub.delay(for: 3, scheduler: DispatchQueue.main)
}.retry(3)

If that’s the kind of thing we’re likely to do a lot, we might want to be able to create that pipeline at will, with any publisher at its start. So let’s write a function that does create that pipeline. Our function will take a Publisher as its parameter and will use it as the upstream for the first operator in that chain. And what operator is that? I take it to be the .share; everything from that point on down needs to be part of the pipeline that our function will produce.

Our function’s return type will be simply AnyPublisher, with the generic types dictated by the upstream (because we aren’t going to be changing any of them in the course of our pipeline):

func applyDelayAndRetry<Upstream:Publisher>(upstream:Upstream) 
    -> AnyPublisher<Upstream.Output, Upstream.Failure> {
        let share = Publishers.Share(upstream: upstream)
        return share
            .catch { _ in 
                share.delay(for: 3, scheduler: DispatchQueue.main) 
            }.retry(3)
            .eraseToAnyPublisher()
}

That’s all there is to it! All we have to do is call that function with, say, a data task publisher as its parameter:

let pub = URLSession.shared.dataTaskPublisher(for: url)
let head = applyDelayAndRetry(upstream: pub)

Now head is the start of a pipeline, and we can attach any further operators to it.

Of course, you might object that our function is not very flexible. We have hard-coded both the delay interval and the retry count into the body of the function. But that is merely a matter of detail! If you want more flexibility, add more parameters:

func applyDelayAndRetry<Upstream:Publisher, S:Scheduler>(
        upstream:Upstream,
        for interval:S.SchedulerTimeType.Stride,
        scheduler:S,
        count:Int
    )
    -> AnyPublisher<Upstream.Output, Upstream.Failure> {
        let share = Publishers.Share(upstream: upstream)
        return share
            .catch { _ in 
                share.delay(for:interval, scheduler:scheduler) 
            }.retry(count)
            .eraseToAnyPublisher()
}

And now when we call our function, we just supply those parameters:

let pub = URLSession.shared.dataTaskPublisher(for: url)
let head = applyDelayAndRetry(upstream:pub, 
    for:3, scheduler:DispatchQueue.main, count:3)

Excellent. But now let’s go one step further. Wouldn’t it be nice if, instead of having to call a global function, our operator were represented by an operator method that we could call on any publisher? We can arrange that too. In fact, this is the really clever part. All we have to do is extend Publisher with an operator method that calls our applyDelayAndRetry. And when we do, what will upstream: be? It will be self:

extension Publisher {
    func delayAndRetry<S:Scheduler>(
        for interval:S.SchedulerTimeType.Stride,
        scheduler:S,
        count:Int
    ) -> AnyPublisher<Self.Output, Self.Failure> {
        return applyDelayAndRetry(upstream:self, 
            for:interval, scheduler:scheduler, count:count)
    }
}

Now we’re able to chain a call to .delayAndRetry into a pipeline, just as if it were a built-in operator:

URLSession.shared.dataTaskPublisher(for: url)
    .delayAndRetry(for: 3, scheduler: DispatchQueue.main, count: 3)
    .receive(on: DispatchQueue.main)
    // ... and so on ...

Finally, we now have no public use for our separate applyDelayAndRetry function, so we may as well incorporate it into the operator method:

extension Publisher {
    func delayAndRetry<S:Scheduler>(
        for interval:S.SchedulerTimeType.Stride,
        scheduler:S,
        count:Int
    ) -> AnyPublisher<Self.Output, Self.Failure> {
        func applyDelayAndRetry<Upstream:Publisher, S:Scheduler>(
                upstream:Upstream,
                for interval:S.SchedulerTimeType.Stride,
                scheduler:S,
                count:Int
            )
            -> AnyPublisher<Upstream.Output, Upstream.Failure> {
                let share = Publishers.Share(upstream: upstream)
                return share
                    .catch { _ in 
                        share.delay(for:interval, scheduler:scheduler) 
                    }.retry(count)
                    .eraseToAnyPublisher()
        }
        return applyDelayAndRetry(upstream:self, 
            for:interval, scheduler:scheduler, count:count)
    }
}

That’s a custom composed operator, all wrapped up neatly and ready to use in any of our pipelines.


Table of Contents