This is Understanding Combine, written by Matt Neuburg. Corrections and suggestions are greatly appreciated (you can comment here). So are donations; please consider keeping me going by funding this work at http://www.paypal.me/mattneub. Or buy my books: the current (and final) editions are iOS 15 Programming Fundamentals with Swift and Programming iOS 14. Thank you!


Custom Subscribers

The Combine framework includes the Sink subscriber and the Assign subscriber, and that’s probably all you’ll need. But let’s just say, for the sake of argument, that you wanted to write your own subscriber. How would you do it? The question is worth asking, because it gives us a chance to peek behind the curtain and see what a subscriber really is and how it works.

I’ll write a very simple subscriber — so simple, in fact, that it’s downright crude. It doesn’t really fulfill all of the obligations of a subscriber, and I haven’t given any thought to higher considerations of thread safety. It doesn’t do anything useful; it just automatically prints the value or failure message that it receives from its upstream publisher. I’ll call it Printer.

The chief thing that a subscriber needs to cope with is the fact that its communication with its publisher takes place entirely by way of a Subscription object. The subscriber needs an instance property referring to that Subscription object, so that it has a way to talk to the publisher. So I’ll start by giving my subscriber that instance property; I’ll also give it a Bool property flagging whether we’ve terminated due to completion or cancellation:

class Printer<Input, Failure:Error> {
    var subscription : Subscription?
    var completed = false
}

You’ll notice that my Printer class is a generic, parameterized on the two types that need to correspond to a publisher, the Input type and the Failure type, as I explained earlier.

Now, Subscriber is a protocol. So I’ll add an extension declaring adoption of that protocol:

extension Printer : Subscriber {
    // ...?
}

Our extension immediately fails to compile, because the Subscriber protocol has three required methods, and we haven’t supplied any of them. So now I’ll supply them.

The first required method is

func receive(subscription: Subscription)

This method informs us that we have been subscribed to a publisher. The publisher has created a Subscription object and has handed it to us. Our chief job is to hang on to that Subscription in case we need to communicate with the publisher later.

Most receive(subscription) implementations will also perform a second job: they turn immediately to the publisher, by way of the Subscription, and ask it to start sending values. This is not a required functionality of receive(subscription); if you don’t want the publisher to begin publishing immediately, you don’t need to ask it yet to start sending values. But usually you do want the publisher to begin publishing immediately. And interestingly, if you don’t ask the publisher for a value, it won’t send a value.

NOTE: The architecture of the Combine framework leaves it up to the subscriber to “pull” values from the publisher. The subscriber can thus prevent the publisher from sending values by not pulling values. This ability of the subscriber to regulate the publisher’s rate of value production is called backpressure.

The way you tell the publisher you’d like to receive values is by calling the Subscription’s request method. The parameter is a Subscribers.Demand struct, with static members .unlimited, .none, and .max() which takes an Int associated value. I’ll signal interest in unlimited values:

func receive(subscription: Subscription) {
    if self.subscription == nil && !self.completed {
        self.subscription = subscription
        subscription.request(.unlimited)
    }
}

The second required method is

func receive(_ input: Input) -> Subscribers.Demand

This method informs us that we’ve got a value coming to us from the publisher. We can do anything we like with this value; here, I’ll just print it. We must also return another Demand enum saying how many more values we’d like to receive:

func receive(_ input: Input) -> Subscribers.Demand {
    print(input)
    return .unlimited
}

The third (and last) required method is

func receive(completion: Subscribers.Completion<Failure>)

This method informs us that we’ve received a completion message from the publisher. It might be .finished or it might be .failure, and in the latter case it will have an associated value of the Failure type, which will be some sort of Error. In my implementation, I’ll print the Error if there is one, and then I’ll clean up by releasing the subscription and marking my state as completed:

func receive(completion: Subscribers.Completion<Failure>) {
    if case let .failure(err) = completion {
        print(err)
    }
    self.subscription = nil
    self.completed = true
}

That’s the end of the extension making our class conform to Subscriber! I’ll also make it conform to Cancellable, just like Sink and Assign. The only requirement here is the cancel method so that we can be told to send a cancel message up to the publisher by way of the Subscription; this might be because other code wants to tear down the pipeline or because we are wrapped in an AnyCancellable that is going out of existence. I’ll also clean up just as I did on receiving a completion:

extension Printer : Cancellable {
    func cancel() {
        self.subscription?.cancel()
        self.subscription = nil
        self.completed = true
    }
}

Our basic subscriber is now complete! Here’s an example of how to use it, just to prove it works:

let pub = [1, 2, 3].publisher
let sub = Printer<Int, Never>()
pub.subscribe(sub)
AnyCancellable(sub)
    .store(in:&self.storage)
// prints 1, then 2, then 3

You’ll notice I didn’t write a publisher method that generates a Printer; therefore I have to do all the work manually, initializing a Printer of the correct type for this publisher, subscribing to the publisher, and wrapping the subscriber in an AnyCancellable and storing it.

An interesting exercise is to tweak the functionality of the subscriber to explore what happens. For example, when I receive the Subscription, suppose I respond with a .max(1) demand, and when I receive an input, I respond with a .none demand — using backpressure to request only one value in total. Then no matter how many values this publisher would like to produce, it will only produce one:

let pub = [1, 2, 3].publisher
let sub = Printer<Int, Never>() // use backpressure to request only one value
pub.subscribe(sub)
AnyCancellable(sub)
    .store(in:&self.storage)
// prints 1 and stops

Table of Contents