This is Understanding Combine, written by Matt Neuburg. Corrections and suggestions are greatly appreciated (you can comment here). So are donations; please consider keeping me going by funding this work at http://www.paypal.me/mattneub. Or buy my books: the current (and final) editions are iOS 15 Programming Fundamentals with Swift and Programming iOS 14. Thank you!
.collect
(Publishers.Collect) gathers into a buffer all values received from upstream and then, when it has received a .finished
completion, emits all of those values as a single value consisting of an array, and then emits a .finished
completion of its own.
For example, picking up from the technique developed earlier, we can use collect
to turn an array of URLs into an array of the results of fetching those URLs over the network:
[
"https://www.apeth.com/pep/manny.jpg",
"https://www.apethh.com/pep/moe.jpg",
"https://www.apeth.com/pep/jack.jpg",
]
.map(URL.init(string:)).compactMap{$0}
.map{ URLSession.shared.dataTaskPublisher(for:$0) }
.publisher
.flatMap(maxPublishers:.max(1)) {
$0.replaceError(with: (data: Data(), response: URLResponse()))
}
.collect()
The result is an array of (data:Data, response:URLResponse)
that is guaranteed to be in the same order as the original array of URLs. Note that without .maxPublishers:.max(1)
, we could not make that guarantee; the results of our data task publishers can arrive in any order, so we must serialize asynchronicity if we want to prevent that.
.collect(_:)
(Publishers.CollectByCount) doesn’t merely wait for a .finished
completion; instead, it also emits an array of the values received from upstream so far when some fixed number of values have arrived, and then clears its buffer of received values. Thus it divides up the stream of values from upstream into chunks of a fixed size (except for the last chunk, after the .finished
completion is received, which might be smaller). For example:
[1,2,3].publisher
.collect(2) // [1, 2], then [3], then finished
.collect(_:options:)
(Publishers.CollectionByTime) emits the buffer as an array of values at fixed time intervals. The first parameter is a TimeGroupingStrategy:
.byTime
: The associated value is a Scheduler and a Stride indicating how the time is to be reckoned.
.byTimeOrCount
: Like .byTime
, but with an additional parameter representing a count; the operator emits when the time elapses or the buffer fills to the count, whichever comes first.
Here’s a simple test example. We’ll use a Timer to publish every three-tenths of a second, and we’ll capture the stream into one-second clumps:
Timer.publish(every: 0.3, on: .main, in: .common).autoconnect()
.scan(0) {i,_ in i + 1}
.collect(.byTime(DispatchQueue.main, 1))
The result is:
[1, 2, 3]
[4, 5, 6]
[7, 8, 9, 10]
That’s because, during the third interval, the first value arrived early enough that there was time for four values to arrive before the interval ended. But now we’ll set a limit to the array size:
Timer.publish(every: 0.3, on: .main, in: .common).autoconnect()
.scan(0) {i,_ in i + 1}
.collect(.byTimeOrCount(DispatchQueue.main, 1, 3))
Now we get:
[1, 2, 3]
[4, 5, 6]
[7, 8, 9]
(The options:
parameter in .collect(_:options:)
is optional, and you’ll probably omit it. For a DispatchQueue, it lets you set the quality of service (DispatchQoS), dispatch work item flags, and dispatch group, but you are unlikely to need to specify any of those.)