This is Understanding Combine, written by Matt Neuburg. Corrections and suggestions are greatly appreciated (you can comment here). So are donations; please consider keeping me going by funding this work at http://www.paypal.me/mattneub. Or buy my books: the current (and final) editions are iOS 15 Programming Fundamentals with Swift and Programming iOS 14. Thank you!
.buffer
(Publishers.Buffer) is intended to reconcile the timing between the upstream’s production of values and the downstream’s demand for those values through the use of backpressure. The idea is that the buffer responds to backpressure from downstream by accumulating values from upstream that would otherwise be lost, until such time as they can be delivered. The buffer itself effectively behaves as a FIFO stack (also known as a queue), releasing values to the downstream in the same order that they arrived from upstream.
I’ll start with an artificial but very clear illustration of how a buffer can be useful. The idea is that we have a publisher that is likely to be emitting values at fairly frequent intervals, and we need to process each value through a .flatMap
that uses backpressure to limit concurrency and that produces a publisher which, itself, may take considerable time to emit a value. I’ll emulate that situation with a PassthroughSubject and a Timer, on the one hand, and a .flatMap
with maxPublishers
and a .delay
on its publisher, on the other:
class ViewController: UIViewController {
let sub = PassthroughSubject<Int,Never>()
var storage = Set<AnyCancellable>()
var timer : Timer!
override func viewDidLoad() {
super.viewDidLoad()
sub
.flatMap(maxPublishers:.max(3)) { i in
return Just(i)
.delay(for: 3, scheduler: DispatchQueue.main)
.eraseToAnyPublisher()
}
.sink { print($0) }
.store(in: &storage)
var count = 0
self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) {
_ in
count += 1
self.sub.send(count)
}
}
}
The outcome is that values are dropped; some values emitted by the PassthroughSubject publisher at the head of the pipeline never arrive at the sink
, because the .flatMap
was “busy” at the time they were emitted:
1
2
3
5
6
7
9
10
11
...
The solution is to put a .buffer
in front of the .flatMap
:
sub
.buffer(size: 20, prefetch: .keepFull, whenFull: .dropOldest)
.flatMap(maxPublishers:.max(3)) { i in
// ...
The result is that all the numeric values do in fact arrive at the sink
!
1
2
3
4
5
6
7
8
9
10
11
...
Of course in real life we could still lose values if the buffer is not large enough to compensate, in the long run, for the disparity between the rate of value emission from the initial publisher and the rate of value emission from the backpressuring .flatMap
. But in this example, the buffer is large enough, so that no values are lost.
Now let’s get down to the nitty-gritty. We need to talk about the .buffer
operator’s parameters. The thing to understand here is that the .buffer
operator accumulates values from upstream into its internal buffer, which is of predefined size, until the buffer fills up due to backpressure from downstream. At that point, it throws away values from upstream or throws an error, depending on how you’ve configured the operator. The point of the parameters is to dictate exactly how all that should happen:
size:
prefetch:
.byRequest
, despite the name, means that the operator makes an .unlimited
demand right at the start.
.keepFull
means that the operator pauses as soon as a value arrives that can’t go into the buffer.
whenFull:
.dropNewest
means throw away the new value that doesn’t fit into the buffer.
.dropOldest
means throw away the first value from the buffer and append the new value at the end.
.customError
means fail; it takes an associated value that’s a function producing an Error, which is sent downstream as a failure.
Now I’ll illustrate the meaning of the parameters by deliberately formulating some edge cases. As before, we need a pipeline that exerts backpressure from below. Here’s one:
let pub = Timer.publish(every: 0.2, on: .main, in: .common).autoconnect()
.scan(0) {i,_ in i+1}
.buffer(size: 4, prefetch: .byRequest, whenFull: .dropNewest)
.flatMap(maxPublishers:.max(1)) {
Just($0).delay(for: 2, scheduler: DispatchQueue.main)
}
This should remind you of our earlier example. At the top of the pipeline we have a publisher that produces values every one-fifth of a second. At the bottom we have a .flatMap
with a .max(1)
demand setting, so that it asks for just one value at a time, while transforming its received value into a publisher with a two-second .delay
. This means that when a value arrives into the .flatMap
, it sits there for two seconds before being published, at which point the .flatMap
requests a new value.
If there were no .buffer
operator in the story, this pipeline would simply produce one successive Int every two seconds. The backpressure from .flatMap
applies directly to the publisher at the top: every time the .flatMap
asks for a new value, the upstream produces the next value in the series. But now we have introduced a buffer between them — but the buffer can hold only four values at a time. What will happen? Here’s what we get, one value every two seconds, at the end of the pipeline:
1
2
3
4
5
12
23
34
...
Here’s what happened:
The buffer starts by asking for unlimited values, and the .flatMap
starts by asking for one value. After one-fifth of a second, the first value arrives and the buffer passes it on to the .flatMap
. The 1
is now sitting in the .flatMap
, waiting for two seconds to be published, exerting backpressure on the buffer.
Meanwhile, values keep arriving into the buffer: the 2
, the 3
, the 4
, the 5
… But now the buffer is full (because it only holds four values), and the 6
arrives. That’s an overflow. We have told the buffer to .dropNewest
, so the 6
is thrown away. And so on for the 7
, the 8
, and all the numbers up to 11
.
At that point, .flatMap
publishes the 1
and asks for a new value. The buffer responds by feeding it the 2
, freeing up a space in the buffer. So when the 12
arrives, there’s room for it, and in it goes into the buffer.
Now there’s no more room in the buffer again, while we wait two seconds for the .flatMap
to clear, and the 13
arrives and is thrown away, then the 14
, and so on.
Eventually we reach a steady state where only every 11th value from the top of pipeline arrives at the bottom; all the others are being thrown away because they encounter a full buffer.
If we change .dropNewest
to .dropOldest
, the result is the same except that we achieve the steady state much sooner:
1
8
18
29
40
...
Here’s what happened:
As before, the 1
passes into the .flatMap
and the 2
, 3
, 4
, and 5
fill up the buffer.
Now our policy is .dropOldest
, so when the 6
arrives it goes into the buffer and the 2
is thrown away; the 7
arrives and goes into the buffer and the 3
is thrown away; and so on.
Eventually the buffer consists of 8
, 9
, 10
, 11
— and at that moment the 1
is finally published from the .flatMap
, which requests a new value, and is handed the 8
from the start of the buffer. And so on, eventually reaching a steady state where only every 9th value from the top of the pipeline arrives at the bottom.
So much for the .byRequest
strategy. Now let’s illustrate the .keepFull
strategy. I’ll use a completely different example (suggested by Stack Overflow user New Dev). We have a PassthroughSubject (self.subject
):
self.subject
.flatMap(maxPublishers:.max(1)) {
Just($0).delay(for: 2, scheduler: DispatchQueue.main)
}
.sink { print($0) }.store(in: &self.storage)
for i in 1...10 {
self.subject.send(i)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
self.subject.send(11)
}
The output is:
1 (after two seconds)
11 (after three more seconds)
Again, this should remind you of the first example in this chapter. While the values 2
through 10
were arriving, the pipeline was blocked by the .flatMap
, which requests only one value at a time and doesn’t request a new value until the two-second .delay
has passed. So those values fell on deaf ears; they were thrown away. The 11
, on the other hand, was sent after the .flatMap
had cleared, so that it was ready for a new value; there was an additional delay of one second before the 11
was sent, and then the .delay
added two more seconds before the 11
reached the bottom of the pipeline.
Now let’s insert our buffer:
self.subject
.buffer(size: 4, prefetch: .keepFull, whenFull: .dropNewest)
.flatMap(maxPublishers:.max(1)) {
Just($0).delay(for: 2, scheduler: DispatchQueue.main)
}
.sink { print($0) }.store(in: &self.storage)
for i in 1...10 {
self.subject.send(i)
}
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
self.subject.send(11)
}
Now the output is:
1 (after two seconds)
2 (after two more seconds)
3 (after two more seconds)
4 (after two more seconds)
5 (after two more seconds)
11 (after two more seconds)
The buffer saved the 2
through 5
for us. At that point it was full; it is set to .dropNewest
, so the 6
through 10
were thrown away. When the .flatMap
asked for a new value, there was the 2
sitting in the buffer! So the .flatMap
queued that up in the .delay
, and meanwhile the buffer now had an opening, so when the 11
arrived, it got stored in the buffer and was eventually delivered.
So the buffer prevented at least some of the values from being lost — and of course, if the buffer were larger, it could have prevented any values from being lost. Not only that, but it evened out the timing! In effect it acted as a completely new source of values, so that every two seconds, when the .delay
ended and the .flatMap
requested a new value, there was in fact a new value ready to hand.