|
const std = @import("../std.zig");
const testing = std.testing;
|
Batch() Performs multiple async functions in parallel, without heap allocation. Async function frames are managed externally to this abstraction, and passed in via the |
pub fn Batch(
comptime Result: type,
comptime max_jobs: comptime_int,
comptime async_behavior: enum {
auto_async,
never_async,
always_async,
},
) type {
return struct {
jobs: [max_jobs]Job,
next_job_index: usize,
collected_result: CollectedResult,
const Job = struct {
frame: ?anyframe->Result,
result: Result,
};
const Self = @This();
const CollectedResult = switch (@typeInfo(Result)) {
.ErrorUnion => Result,
else => void,
};
const async_ok = switch (async_behavior) {
.auto_async => std.io.is_async,
.never_async => false,
.always_async => true,
};
|
init() The return value for each job. If a job slot was re-used due to maxed out concurrency, then its result value will be overwritten. The values can be accessed with the |
pub fn init() Self {
return Self{
.jobs = [1]Job{
.{
.frame = null,
.result = undefined,
},
} ** max_jobs,
.next_job_index = 0,
.collected_result = {},
};
}
|
add()Add a frame to the Batch. If all jobs are in-flight, then this function waits until one completes. This function is *not* thread-safe. It must be called from one thread at a time, however, it need not be the same thread. TODO: "select" language feature to use the next available slot, rather than awaiting the next index. |
pub fn add(self: *Self, frame: anyframe->Result) void {
const job = &self.jobs[self.next_job_index];
self.next_job_index = (self.next_job_index + 1) % max_jobs;
if (job.frame) |existing| {
job.result = if (async_ok) await existing else nosuspend await existing;
if (CollectedResult != void) {
job.result catch |err| {
self.collected_result = err;
};
}
}
job.frame = frame;
}
|
wait() Wait for all the jobs to complete. Safe to call any number of times. If |
pub fn wait(self: *Self) CollectedResult {
for (self.jobs) |*job|
if (job.frame) |f| {
job.result = if (async_ok) await f else nosuspend await f;
if (CollectedResult != void) {
job.result catch |err| {
self.collected_result = err;
};
}
job.frame = null;
};
return self.collected_result;
}
};
}
|
Test:std.event.Batch |
test "std.event.Batch" {
if (true) return error.SkipZigTest;
var count: usize = 0;
var batch = Batch(void, 2, .auto_async).init();
batch.add(&async sleepALittle(&count));
batch.add(&async increaseByTen(&count));
batch.wait();
try testing.expect(count == 11);
var another = Batch(anyerror!void, 2, .auto_async).init();
another.add(&async somethingElse());
another.add(&async doSomethingThatFails());
try testing.expectError(error.ItBroke, another.wait());
}
fn sleepALittle(count: *usize) void {
std.time.sleep(1 * std.time.ns_per_ms);
_ = @atomicRmw(usize, count, .Add, 1, .SeqCst);
}
fn increaseByTen(count: *usize) void {
var i: usize = 0;
while (i < 10) : (i += 1) {
_ = @atomicRmw(usize, count, .Add, 1, .SeqCst);
}
}
fn doSomethingThatFails() anyerror!void {}
fn somethingElse() anyerror!void {
return error.ItBroke;
}
|
| Generated by zstd-browse2 on 2023-11-04 14:12:31 -0400. |