This is Understanding Combine, written by Matt Neuburg. Corrections and suggestions are greatly appreciated (you can comment here). So are donations; please consider keeping me going by funding this work at http://www.paypal.me/mattneub. Or buy my books: the current (and final) editions are iOS 15 Programming Fundamentals with Swift and Programming iOS 14. Thank you!
Custom Operators
Writing your own custom operator from scratch (rather than as a composed operator) can be daunting. But you’re probably curious about how operators generally work under the hood, so let’s talk about that.
Before we begin, you might like to study what I said earlier about how a Subscriber works and how a Publisher works. Let’s review:
subscribe
on the Publisher, with the Subscriber as parameter.The Subscriber implements these methods:
receive(subscription:)
, for when the Publisher initially hands it a Subscription object, which it retains
receive(_:)
, for when a value arrives from upstream
receive(completion:)
, for when a completion arrives from upstream
receive(subscriber:)
, which is called when the publisher is subscribed to.In that implementation, it creates an instance of its Inner class, which is a Subscription; it calls subscriber.receive(subscription:)
, passing the Subscription object back to the subscriber.
request(_:)
, which the Subscriber can call when it wants values; the Subscription responds by producing values and calling the Subscriber’s receive(_:)
.The Subscription is also responsible for calling the Subscriber’s receive(completion:)
.
Finally, a Subscription is also a Cancellable, so it implements cancel
.
That’s sufficient to describe the behavior of a pipeline consisting of one Publisher and one Subscriber. But what about an operator? Suppose we insert an operator between the Publisher and the Subscriber. What does it do? There are three things to know:
An operator is a Publisher, so that it can be subscribed to.
An operator is initialized with a reference to its upstream object.
An operator’s Inner class is both a Subscription and a Subscriber.
Okay, so let’s trace through the life of an operator. The first thing that happens is that the pipeline is assembled; to do so, each operator is instantiated by calling an initializer that takes an upstream:
parameter, which is obviously a Publisher. Each operator now knows who is upstream of it. But that’s all for now! Everything comes to stop. The chain is constructed; every operator has an upstream-pointing link. But the chain is inert; no subscriber has subscribed, and the publisher at the top hasn’t done anything yet.
Now let’s say the subscriber at the bottom of the chain actually subscribes to the operator upstream of it. It can do that because an operator is a Publisher. The subscriber calls subscribe
on the operator. Well, if this were a top-of-the-pipeline Publisher, it would respond by creating an instance of its Inner class and calling receive(subscription:)
downstream on the subscriber. But this is where the operator’s behavior diverges from that of a simple publisher. Instead, it turns the other way, facing upstream. It creates an instance of its Inner class and subscribes it to the upstream — by calling subscribe
on the upstream object, with the Inner object as parameter.
So there is now a chain reaction up the pipeline, as every operator is subscribed to and responds by subscribing its Inner to its own upstream, like this:
Publisher
Operator: Inner ⬆️ subscribe
Operator: Inner ⬆️ subscribe
Subscriber ⬆️ subscribe
The chain of subscribe
calls finally reaches the Publisher at the top of the chain. And we already know what it does; it makes an instance of its Inner, which is a Subscription, and turns back downstream and calls receive(subscription:)
on its subscriber. That subscriber is the Inner of some operator. That Inner knows who the operator’s downstream subscriber is, and it is itself a Subscription, so it holds on to the Subscription that it just got from upstream and passes itself downstream by calling receive(subscription:)
on that subscriber.
So there is now a chain reaction down the pipeline, as every operator’s Inner is handed the Inner from upstream, until at last a Subscription reaches the ultimate Subscriber:
Publisher: Inner ⬇️ receive
(Operator) Inner ⬇️ receive
(Operator) Inner ⬇️ receive
Subscriber
So now every operator’s Inner can see both ways, up and down the chain:
On the one hand, it has been subscribed to from downstream; that Subscriber is its downstream
.
On the other hand, it has received a Subscription from upstream; that Subscription is its upstream
.
Every operator has a view of the world like this:
Subscription
⬆️ upstream
(Operator) Inner
⬇️ downstream
Subscriber
Therefore, we now have a chain that operates in both directions. As a result, messages can be handed all the way up the pipeline or all the way down the pipeline, one operator at a time, like a game of Pass The Parcel.
For example, I just said that the ultimate Subscriber has now received a Subscription. Well, we know what the typical Subscriber does when that happens: it turns to the Subscription and calls request
. And so the game begins again, with a chain reaction up the pipeline: the Subscription at the bottom of the chain calls request
on its upstream Subscription, and so on and so on until request
is called on the Publisher’s Subscription.
And we know how the Publisher’s Subscription responds; it produces a value and calls receive
on the downstream Subscriber. And now there’s a chain reaction down the pipeline, as each Subscription calls receive
on its downstream Subscriber, which is itself a Subscription, until we reach the final Subscriber.
And so it goes, for all messages up and down the chain. Of course, in real life it isn’t so simple, because an operator does not merely pass along everything it receives. For example, many operators respond to being handed a value from upstream by handing a different value downstream. Some operators respond to being handed a value from upstream by doing nothing (they block the value). That’s what makes an operator an operator: it faithfully passes along the messages it receives, except that it gives some messages some sort of “spin” that constitutes the behavior of the operator.
Nevertheless, the basic structure and behavior of an operator is quite straightforward, as I’ve described, and so we can illustrate by writing an operator that does nothing — passing messages up and down the chain is all it does. This can act as a sort of base-level boilerplate for any further operators we might wish to write. I’ll call our custom operator DoNothing.
I’ll start with the struct which is the DoNothing operator object:
struct DoNothing<Upstream: Publisher>: Publisher {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
let upstream: Upstream
init(upstream: Upstream) {
self.upstream = upstream
}
// When subscribed to, subscribe my Inner _upstream_
func receive<S>(subscriber: S)
where S : Subscriber, S.Input == Output, S.Failure == Failure {
self.upstream.subscribe(Inner(downstream:subscriber))
}
// ... Inner goes here ...
}
Now let’s write Inner. It must be initialized by the operator with (at least) the downstream subscriber:
class Inner<S:Subscriber, Input>: Subscriber, Subscription
where S.Failure == Failure, S.Input == Input { // !
var downstream: S?
var upstream: Subscription?
init(downstream: S) {
self.downstream = downstream
}
// ...
}
Next, let’s have Inner satisfy the requirements of being a Subscriber:
// keep subscription, pass _self_ downstream
func receive(subscription: Subscription) {
self.upstream = subscription
self.downstream?.receive(subscription: self)
}
// pass input downstream
func receive(_ input: Input) -> Subscribers.Demand {
return self.downstream?.receive(input) ?? .max(0)
}
// pass completion downstream
func receive(completion: Subscribers.Completion<Failure>) {
self.downstream?.receive(completion: completion)
self.downstream = nil
self.upstream = nil
}
// ...
Finally, we’ll have Inner satisfy the requirements of being a Subscription:
// pass demand upstream
func request(_ demand: Subscribers.Demand) {
self.upstream?.request(demand)
}
// pass cancel upstream
func cancel() {
self.upstream?.cancel()
self.upstream = nil
self.downstream = nil
}
That’s it! Note that the responsibilities of Inner are neatly divided according to the protocol that needs satisfying:
As a Subscriber, its job is to receive things that arrive from upstream and pass them downstream.
As a Subscription, its job is to receive things that arrive from downstream and pass them upstream.
As a final bit of icing on the cake, let’s inject into Publisher an operator method that produces a DoNothing object, just as we did for our composed operator earlier:
extension Publisher {
func doNothing() -> DoNothing<Self> {
return DoNothing(upstream:self)
}
}
The outcome is that we can use the .doNothing
operator just like any other operator in forming a pipeline:
URLSession.shared.dataTaskPublisher(for: url)
.receive(on: DispatchQueue.main)
.doNothing()
.map {$0.data}
// ...
The result of that is: nothing! Our pipeline still works. That is exactly what we want. Our operator does no harm; it is a good messenger.
It is important to understand that our operator is just a toy. We are avoiding a lot of nitpicky problems involving threads, what happens if we are subscribed to when we already have a subscriber, and so forth. But it demonstrates the fundamental mechanism and architecture underlying any operator.