|
const std = @import("../std.zig"); const testing = std.testing; |
Batch() Performs multiple async functions in parallel, without heap allocation. Async function frames are managed externally to this abstraction, and passed in via the |
pub fn Batch( comptime Result: type, comptime max_jobs: comptime_int, comptime async_behavior: enum { auto_async, never_async, always_async, }, ) type { return struct { jobs: [max_jobs]Job, next_job_index: usize, collected_result: CollectedResult, const Job = struct { frame: ?anyframe->Result, result: Result, }; const Self = @This(); const CollectedResult = switch (@typeInfo(Result)) { .ErrorUnion => Result, else => void, }; const async_ok = switch (async_behavior) { .auto_async => std.io.is_async, .never_async => false, .always_async => true, }; |
init() The return value for each job. If a job slot was re-used due to maxed out concurrency, then its result value will be overwritten. The values can be accessed with the |
pub fn init() Self { return Self{ .jobs = [1]Job{ .{ .frame = null, .result = undefined, }, } ** max_jobs, .next_job_index = 0, .collected_result = {}, }; } |
add()Add a frame to the Batch. If all jobs are in-flight, then this function waits until one completes. This function is *not* thread-safe. It must be called from one thread at a time, however, it need not be the same thread. TODO: "select" language feature to use the next available slot, rather than awaiting the next index. |
pub fn add(self: *Self, frame: anyframe->Result) void { const job = &self.jobs[self.next_job_index]; self.next_job_index = (self.next_job_index + 1) % max_jobs; if (job.frame) |existing| { job.result = if (async_ok) await existing else nosuspend await existing; if (CollectedResult != void) { job.result catch |err| { self.collected_result = err; }; } } job.frame = frame; } |
wait() Wait for all the jobs to complete. Safe to call any number of times. If |
pub fn wait(self: *Self) CollectedResult { for (self.jobs) |*job| if (job.frame) |f| { job.result = if (async_ok) await f else nosuspend await f; if (CollectedResult != void) { job.result catch |err| { self.collected_result = err; }; } job.frame = null; }; return self.collected_result; } }; } |
Test:std.event.Batch |
test "std.event.Batch" { if (true) return error.SkipZigTest; var count: usize = 0; var batch = Batch(void, 2, .auto_async).init(); batch.add(&async sleepALittle(&count)); batch.add(&async increaseByTen(&count)); batch.wait(); try testing.expect(count == 11); var another = Batch(anyerror!void, 2, .auto_async).init(); another.add(&async somethingElse()); another.add(&async doSomethingThatFails()); try testing.expectError(error.ItBroke, another.wait()); } fn sleepALittle(count: *usize) void { std.time.sleep(1 * std.time.ns_per_ms); _ = @atomicRmw(usize, count, .Add, 1, .SeqCst); } fn increaseByTen(count: *usize) void { var i: usize = 0; while (i < 10) : (i += 1) { _ = @atomicRmw(usize, count, .Add, 1, .SeqCst); } } fn doSomethingThatFails() anyerror!void {} fn somethingElse() anyerror!void { return error.ItBroke; } |
Generated by zstd-browse2 on 2023-11-04 14:12:31 -0400. |