|
const std = @import("../std.zig"); const builtin = @import("builtin"); const Condition = @This(); const Mutex = std.Thread.Mutex; const os = std.os; const assert = std.debug.assert; const testing = std.testing; const Atomic = std.atomic.Atomic; const Futex = std.Thread.Futex; impl: Impl = .{}, |
wait() Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return. "Atomically" here refers to accesses done on the Condition after acquiring the Mutex. |
pub fn wait(self: *Condition, mutex: *Mutex) void { self.impl.wait(mutex, null) catch |err| switch (err) { error.Timeout => unreachable, // no timeout provided so we shouldn't have timed-out }; } |
timedWait() Atomically releases the Mutex, blocks the caller thread, then re-acquires the Mutex on return. "Atomically" here refers to accesses done on the Condition after acquiring the Mutex. |
pub fn timedWait(self: *Condition, mutex: *Mutex, timeout_ns: u64) error{Timeout}!void { return self.impl.wait(mutex, timeout_ns); } |
signal() Unblocks at least one thread blocked in a call to |
pub fn signal(self: *Condition) void { self.impl.wake(.one); } |
broadcast() Unblocks all threads currently blocked in a call to |
pub fn broadcast(self: *Condition) void { self.impl.wake(.all); } const Impl = if (builtin.single_threaded) SingleThreadedImpl else if (builtin.os.tag == .windows) WindowsImpl else FutexImpl; const Notify = enum { one, // wake up only one thread all, // wake up all threads }; const SingleThreadedImpl = struct { fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void { _ = self; _ = mutex; // There are no other threads to wake us up. // So if we wait without a timeout we would never wake up. const timeout_ns = timeout orelse { unreachable; // deadlock detected }; std.time.sleep(timeout_ns); return error.Timeout; } fn wake(self: *Impl, comptime notify: Notify) void { // There are no other threads to wake up. _ = self; _ = notify; } }; const WindowsImpl = struct { condition: os.windows.CONDITION_VARIABLE = .{}, fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void { var timeout_overflowed = false; var timeout_ms: os.windows.DWORD = os.windows.INFINITE; if (timeout) |timeout_ns| { // Round the nanoseconds to the nearest millisecond, // then saturating cast it to windows DWORD for use in kernel32 call. const ms = (timeout_ns +| (std.time.ns_per_ms / 2)) / std.time.ns_per_ms; timeout_ms = std.math.cast(os.windows.DWORD, ms) orelse std.math.maxInt(os.windows.DWORD); // Track if the timeout overflowed into INFINITE and make sure not to wait forever. if (timeout_ms == os.windows.INFINITE) { timeout_overflowed = true; timeout_ms -= 1; } } if (comptime builtin.mode == .Debug) { // The internal state of the DebugMutex needs to be handled here as well. mutex.impl.locking_thread.store(0, .Unordered); } const rc = os.windows.kernel32.SleepConditionVariableSRW( &self.condition, if (comptime builtin.mode == .Debug) &mutex.impl.impl.srwlock else &mutex.impl.srwlock, timeout_ms, 0, // the srwlock was assumed to acquired in exclusive mode not shared ); if (comptime builtin.mode == .Debug) { // The internal state of the DebugMutex needs to be handled here as well. mutex.impl.locking_thread.store(std.Thread.getCurrentId(), .Unordered); } // Return error.Timeout if we know the timeout elapsed correctly. if (rc == os.windows.FALSE) { assert(os.windows.kernel32.GetLastError() == .TIMEOUT); if (!timeout_overflowed) return error.Timeout; } } fn wake(self: *Impl, comptime notify: Notify) void { switch (notify) { .one => os.windows.kernel32.WakeConditionVariable(&self.condition), .all => os.windows.kernel32.WakeAllConditionVariable(&self.condition), } } }; const FutexImpl = struct { state: Atomic(u32) = Atomic(u32).init(0), epoch: Atomic(u32) = Atomic(u32).init(0), const one_waiter = 1; const waiter_mask = 0xffff; const one_signal = 1 << 16; const signal_mask = 0xffff << 16; fn wait(self: *Impl, mutex: *Mutex, timeout: ?u64) error{Timeout}!void { // Observe the epoch, then check the state again to see if we should wake up. // The epoch must be observed before we check the state or we could potentially miss a wake() and deadlock: // // - T1: s = LOAD(&state) // - T2: UPDATE(&s, signal) // - T2: UPDATE(&epoch, 1) + FUTEX_WAKE(&epoch) // - T1: e = LOAD(&epoch) (was reordered after the state load) // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed the state update + the epoch change) // // Acquire barrier to ensure the epoch load happens before the state load. var epoch = self.epoch.load(.Acquire); var state = self.state.fetchAdd(one_waiter, .Monotonic); assert(state & waiter_mask != waiter_mask); state += one_waiter; mutex.unlock(); defer mutex.lock(); var futex_deadline = Futex.Deadline.init(timeout); while (true) { futex_deadline.wait(&self.epoch, epoch) catch |err| switch (err) { // On timeout, we must decrement the waiter we added above. error.Timeout => { while (true) { // If there's a signal when we're timing out, consume it and report being woken up instead. // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return. while (state & signal_mask != 0) { const new_state = state - one_waiter - one_signal; state = self.state.tryCompareAndSwap(state, new_state, .Acquire, .Monotonic) orelse return; } // Remove the waiter we added and officially return timed out. const new_state = state - one_waiter; state = self.state.tryCompareAndSwap(state, new_state, .Monotonic, .Monotonic) orelse return err; } }, }; epoch = self.epoch.load(.Acquire); state = self.state.load(.Monotonic); // Try to wake up by consuming a signal and decremented the waiter we added previously. // Acquire barrier ensures code before the wake() which added the signal happens before we decrement it and return. while (state & signal_mask != 0) { const new_state = state - one_waiter - one_signal; state = self.state.tryCompareAndSwap(state, new_state, .Acquire, .Monotonic) orelse return; } } } fn wake(self: *Impl, comptime notify: Notify) void { var state = self.state.load(.Monotonic); while (true) { const waiters = (state & waiter_mask) / one_waiter; const signals = (state & signal_mask) / one_signal; // Reserves which waiters to wake up by incrementing the signals count. // Therefore, the signals count is always less than or equal to the waiters count. // We don't need to Futex.wake if there's nothing to wake up or if other wake() threads have reserved to wake up the current waiters. const wakeable = waiters - signals; if (wakeable == 0) { return; } const to_wake = switch (notify) { .one => 1, .all => wakeable, }; // Reserve the amount of waiters to wake by incrementing the signals count. // Release barrier ensures code before the wake() happens before the signal it posted and consumed by the wait() threads. const new_state = state + (one_signal * to_wake); state = self.state.tryCompareAndSwap(state, new_state, .Release, .Monotonic) orelse { // Wake up the waiting threads we reserved above by changing the epoch value. // NOTE: a waiting thread could miss a wake up if *exactly* ((1<<32)-1) wake()s happen between it observing the epoch and sleeping on it. // This is very unlikely due to how many precise amount of Futex.wake() calls that would be between the waiting thread's potential preemption. // // Release barrier ensures the signal being added to the state happens before the epoch is changed. // If not, the waiting thread could potentially deadlock from missing both the state and epoch change: // // - T2: UPDATE(&epoch, 1) (reordered before the state change) // - T1: e = LOAD(&epoch) // - T1: s = LOAD(&state) // - T2: UPDATE(&state, signal) + FUTEX_WAKE(&epoch) // - T1: s & signals == 0 -> FUTEX_WAIT(&epoch, e) (missed both epoch change and state change) _ = self.epoch.fetchAdd(1, .Release); Futex.wake(&self.epoch, to_wake); return; }; } } }; |
Test:Condition - smoke test |
test "Condition - smoke test" { var mutex = Mutex{}; var cond = Condition{}; // Try to wake outside the mutex defer cond.signal(); defer cond.broadcast(); mutex.lock(); defer mutex.unlock(); // Try to wait with a timeout (should not deadlock) try testing.expectError(error.Timeout, cond.timedWait(&mutex, 0)); try testing.expectError(error.Timeout, cond.timedWait(&mutex, std.time.ns_per_ms)); // Try to wake inside the mutex. cond.signal(); cond.broadcast(); } // Inspired from: https://github.com/Amanieu/parking_lot/pull/129 |
Test:Condition - wait and signal |
test "Condition - wait and signal" { // This test requires spawning threads if (builtin.single_threaded) { return error.SkipZigTest; } if (builtin.zig_backend == .stage2_x86_64) return error.SkipZigTest; const num_threads = 4; const MultiWait = struct { mutex: Mutex = .{}, cond: Condition = .{}, threads: [num_threads]std.Thread = undefined, spawn_count: std.math.IntFittingRange(0, num_threads) = 0, fn run(self: *@This()) void { self.mutex.lock(); defer self.mutex.unlock(); self.spawn_count += 1; self.cond.wait(&self.mutex); self.cond.timedWait(&self.mutex, std.time.ns_per_ms) catch {}; self.cond.signal(); } }; var multi_wait = MultiWait{}; for (&multi_wait.threads) |*t| { t.* = try std.Thread.spawn(.{}, MultiWait.run, .{&multi_wait}); } while (true) { std.time.sleep(100 * std.time.ns_per_ms); multi_wait.mutex.lock(); defer multi_wait.mutex.unlock(); // Make sure all of the threads have finished spawning to avoid a deadlock. if (multi_wait.spawn_count == num_threads) break; } multi_wait.cond.signal(); for (multi_wait.threads) |t| { t.join(); } } |
Test:Condition - signal |
test "Condition - signal" { // This test requires spawning threads if (builtin.single_threaded) { return error.SkipZigTest; } if (builtin.zig_backend == .stage2_x86_64) return error.SkipZigTest; const num_threads = 4; const SignalTest = struct { mutex: Mutex = .{}, cond: Condition = .{}, notified: bool = false, threads: [num_threads]std.Thread = undefined, spawn_count: std.math.IntFittingRange(0, num_threads) = 0, fn run(self: *@This()) void { self.mutex.lock(); defer self.mutex.unlock(); self.spawn_count += 1; // Use timedWait() a few times before using wait() // to test multiple threads timing out frequently. var i: usize = 0; while (!self.notified) : (i +%= 1) { if (i < 5) { self.cond.timedWait(&self.mutex, 1) catch {}; } else { self.cond.wait(&self.mutex); } } // Once we received the signal, notify another thread (inside the lock). assert(self.notified); self.cond.signal(); } }; var signal_test = SignalTest{}; for (&signal_test.threads) |*t| { t.* = try std.Thread.spawn(.{}, SignalTest.run, .{&signal_test}); } while (true) { std.time.sleep(10 * std.time.ns_per_ms); signal_test.mutex.lock(); defer signal_test.mutex.unlock(); // Make sure at least one thread has finished spawning to avoid testing nothing. if (signal_test.spawn_count > 0) break; } { // Wake up one of them (outside the lock) after setting notified=true. defer signal_test.cond.signal(); signal_test.mutex.lock(); defer signal_test.mutex.unlock(); try testing.expect(!signal_test.notified); signal_test.notified = true; } for (signal_test.threads) |t| { t.join(); } } |
Test:Condition - multi signal |
test "Condition - multi signal" { // This test requires spawning threads if (builtin.single_threaded) { return error.SkipZigTest; } if (builtin.zig_backend == .stage2_x86_64) return error.SkipZigTest; const num_threads = 4; const num_iterations = 4; const Paddle = struct { mutex: Mutex = .{}, cond: Condition = .{}, value: u32 = 0, fn hit(self: *@This()) void { defer self.cond.signal(); self.mutex.lock(); defer self.mutex.unlock(); self.value += 1; } fn run(self: *@This(), hit_to: *@This()) !void { self.mutex.lock(); defer self.mutex.unlock(); var current: u32 = 0; while (current < num_iterations) : (current += 1) { // Wait for the value to change from hit() while (self.value == current) { self.cond.wait(&self.mutex); } // hit the next paddle try testing.expectEqual(self.value, current + 1); hit_to.hit(); } } }; var paddles = [_]Paddle{.{}} ** num_threads; var threads = [_]std.Thread{undefined} ** num_threads; // Create a circle of paddles which hit each other for (&threads, 0..) |*t, i| { const paddle = &paddles[i]; const hit_to = &paddles[(i + 1) % paddles.len]; t.* = try std.Thread.spawn(.{}, Paddle.run, .{ paddle, hit_to }); } // Hit the first paddle and wait for them all to complete by hitting each other for num_iterations. paddles[0].hit(); for (threads) |t| t.join(); // The first paddle will be hit one last time by the last paddle. for (paddles, 0..) |p, i| { const expected = @as(u32, num_iterations) + @intFromBool(i == 0); try testing.expectEqual(p.value, expected); } } |
Test:Condition - broadcasting |
test "Condition - broadcasting" { // This test requires spawning threads if (builtin.single_threaded) { return error.SkipZigTest; } if (builtin.zig_backend == .stage2_x86_64) return error.SkipZigTest; const num_threads = 10; const BroadcastTest = struct { mutex: Mutex = .{}, cond: Condition = .{}, completed: Condition = .{}, count: usize = 0, threads: [num_threads]std.Thread = undefined, fn run(self: *@This()) void { self.mutex.lock(); defer self.mutex.unlock(); // The last broadcast thread to start tells the main test thread it's completed. self.count += 1; if (self.count == num_threads) { self.completed.signal(); } // Waits for the count to reach zero after the main test thread observes it at num_threads. // Tries to use timedWait() a bit before falling back to wait() to test multiple threads timing out. var i: usize = 0; while (self.count != 0) : (i +%= 1) { if (i < 10) { self.cond.timedWait(&self.mutex, 1) catch {}; } else { self.cond.wait(&self.mutex); } } } }; var broadcast_test = BroadcastTest{}; for (&broadcast_test.threads) |*t| { t.* = try std.Thread.spawn(.{}, BroadcastTest.run, .{&broadcast_test}); } { broadcast_test.mutex.lock(); defer broadcast_test.mutex.unlock(); // Wait for all the broadcast threads to spawn. // timedWait() to detect any potential deadlocks. while (broadcast_test.count != num_threads) { broadcast_test.completed.timedWait( &broadcast_test.mutex, 1 * std.time.ns_per_s, ) catch {}; } // Reset the counter and wake all the threads to exit. broadcast_test.count = 0; broadcast_test.cond.broadcast(); } for (broadcast_test.threads) |t| { t.join(); } } |
Test:Condition - broadcasting - wake all threads |
test "Condition - broadcasting - wake all threads" { // Tests issue #12877 // This test requires spawning threads if (builtin.single_threaded) { return error.SkipZigTest; } if (builtin.zig_backend == .stage2_x86_64) return error.SkipZigTest; var num_runs: usize = 1; const num_threads = 10; while (num_runs > 0) : (num_runs -= 1) { const BroadcastTest = struct { mutex: Mutex = .{}, cond: Condition = .{}, completed: Condition = .{}, count: usize = 0, thread_id_to_wake: usize = 0, threads: [num_threads]std.Thread = undefined, wakeups: usize = 0, fn run(self: *@This(), thread_id: usize) void { self.mutex.lock(); defer self.mutex.unlock(); // The last broadcast thread to start tells the main test thread it's completed. self.count += 1; if (self.count == num_threads) { self.completed.signal(); } while (self.thread_id_to_wake != thread_id) { self.cond.timedWait(&self.mutex, 1 * std.time.ns_per_s) catch {}; self.wakeups += 1; } if (self.thread_id_to_wake <= num_threads) { // Signal next thread to wake up. self.thread_id_to_wake += 1; self.cond.broadcast(); } } }; var broadcast_test = BroadcastTest{}; var thread_id: usize = 1; for (&broadcast_test.threads) |*t| { t.* = try std.Thread.spawn(.{}, BroadcastTest.run, .{ &broadcast_test, thread_id }); thread_id += 1; } { broadcast_test.mutex.lock(); defer broadcast_test.mutex.unlock(); // Wait for all the broadcast threads to spawn. // timedWait() to detect any potential deadlocks. while (broadcast_test.count != num_threads) { broadcast_test.completed.timedWait( &broadcast_test.mutex, 1 * std.time.ns_per_s, ) catch {}; } // Signal thread 1 to wake up broadcast_test.thread_id_to_wake = 1; broadcast_test.cond.broadcast(); } for (broadcast_test.threads) |t| { t.join(); } } } |
Generated by zstd-browse2 on 2023-11-04 14:12:32 -0400. |