|
const std = @import("std.zig"); const builtin = @import("builtin"); const root = @import("root"); const c = std.c; const math = std.math; const assert = std.debug.assert; const os = std.os; const fs = std.fs; const mem = std.mem; const meta = std.meta; const File = std.fs.File; const Allocator = std.mem.Allocator; |
Mode |
pub const Mode = enum { blocking, evented, }; const mode = std.options.io_mode; |
is_asyncI/O operates normally, waiting for the operating system syscalls to complete. I/O functions are generated async and rely on a global event loop. Event-based I/O. |
pub const is_async = mode != .blocking; |
ModeOverride This is an enum value to use for I/O mode at runtime, since it takes up zero bytes at runtime, and makes expressions comptime-known when |
pub const ModeOverride = if (is_async) Mode else enum { blocking }; |
default_mode |
pub const default_mode: ModeOverride = if (is_async) Mode.evented else .blocking; fn getStdOutHandle() os.fd_t { if (builtin.os.tag == .windows) { if (builtin.zig_backend == .stage2_aarch64) { // TODO: this is just a temporary workaround until we advance aarch64 backend further along. return os.windows.GetStdHandle(os.windows.STD_OUTPUT_HANDLE) catch os.windows.INVALID_HANDLE_VALUE; } return os.windows.peb().ProcessParameters.hStdOutput; } if (@hasDecl(root, "os") and @hasDecl(root.os, "io") and @hasDecl(root.os.io, "getStdOutHandle")) { return root.os.io.getStdOutHandle(); } return os.STDOUT_FILENO; } |
getStdOut()TODO: async stdout on windows without a dedicated thread. https://github.com/ziglang/zig/pull/4816#issuecomment-604521023 |
pub fn getStdOut() File { return File{ .handle = getStdOutHandle(), .capable_io_mode = .blocking, .intended_io_mode = default_mode, }; } fn getStdErrHandle() os.fd_t { if (builtin.os.tag == .windows) { if (builtin.zig_backend == .stage2_aarch64) { // TODO: this is just a temporary workaround until we advance aarch64 backend further along. return os.windows.GetStdHandle(os.windows.STD_ERROR_HANDLE) catch os.windows.INVALID_HANDLE_VALUE; } return os.windows.peb().ProcessParameters.hStdError; } if (@hasDecl(root, "os") and @hasDecl(root.os, "io") and @hasDecl(root.os.io, "getStdErrHandle")) { return root.os.io.getStdErrHandle(); } return os.STDERR_FILENO; } |
getStdErr() This returns a |
pub fn getStdErr() File { return File{ .handle = getStdErrHandle(), .capable_io_mode = .blocking, .intended_io_mode = .blocking, }; } fn getStdInHandle() os.fd_t { if (builtin.os.tag == .windows) { if (builtin.zig_backend == .stage2_aarch64) { // TODO: this is just a temporary workaround until we advance aarch64 backend further along. return os.windows.GetStdHandle(os.windows.STD_INPUT_HANDLE) catch os.windows.INVALID_HANDLE_VALUE; } return os.windows.peb().ProcessParameters.hStdInput; } if (@hasDecl(root, "os") and @hasDecl(root.os, "io") and @hasDecl(root.os.io, "getStdInHandle")) { return root.os.io.getStdInHandle(); } return os.STDIN_FILENO; } |
getStdIn()TODO: async stdin on windows without a dedicated thread. https://github.com/ziglang/zig/pull/4816#issuecomment-604521023 |
pub fn getStdIn() File { return File{ .handle = getStdInHandle(), .capable_io_mode = .blocking, .intended_io_mode = default_mode, }; } |
GenericReader() |
pub fn GenericReader( comptime Context: type, comptime ReadError: type, comptime readFn: fn (context: Context, buffer: []u8) ReadError!usize, ) type { return struct { context: Context, pub const Error = ReadError; pub const NoEofError = ReadError || error{ EndOfStream, }; |
read()Returns the number of bytes read. It may be less than buffer.len. If the number of bytes read is 0, it means end of stream. End of stream is not an error condition. |
pub inline fn read(self: Self, buffer: []u8) Error!usize { return readFn(self.context, buffer); } |
readAll() |
pub inline fn readAll(self: Self, buffer: []u8) Error!usize { return @errorCast(self.any().readAll(buffer)); } |
readAtLeast() |
pub inline fn readAtLeast(self: Self, buffer: []u8, len: usize) Error!usize { return @errorCast(self.any().readAtLeast(buffer, len)); } |
readNoEof() |
pub inline fn readNoEof(self: Self, buf: []u8) NoEofError!void { return @errorCast(self.any().readNoEof(buf)); } |
readAllArrayList() |
pub inline fn readAllArrayList( self: Self, array_list: *std.ArrayList(u8), max_append_size: usize, ) (error{StreamTooLong} || Error)!void { return @errorCast(self.any().readAllArrayList(array_list, max_append_size)); } |
readAllArrayListAligned() |
pub inline fn readAllArrayListAligned( self: Self, comptime alignment: ?u29, array_list: *std.ArrayListAligned(u8, alignment), max_append_size: usize, ) (error{StreamTooLong} || Error)!void { return @errorCast(self.any().readAllArrayListAligned( alignment, array_list, max_append_size, )); } |
readAllAlloc() |
pub inline fn readAllAlloc( self: Self, allocator: Allocator, max_size: usize, ) (Error || error{StreamTooLong})![]u8 { return @errorCast(self.any().readAllAlloc(allocator, max_size)); } |
readUntilDelimiterArrayList() |
pub inline fn readUntilDelimiterArrayList( self: Self, array_list: *std.ArrayList(u8), delimiter: u8, max_size: usize, ) (NoEofError || error{StreamTooLong})!void { return @errorCast(self.any().readUntilDelimiterArrayList( array_list, delimiter, max_size, )); } |
readUntilDelimiterAlloc() |
pub inline fn readUntilDelimiterAlloc( self: Self, allocator: Allocator, delimiter: u8, max_size: usize, ) (NoEofError || error{StreamTooLong})![]u8 { return @errorCast(self.any().readUntilDelimiterAlloc( allocator, delimiter, max_size, )); } |
readUntilDelimiter() |
pub inline fn readUntilDelimiter( self: Self, buf: []u8, delimiter: u8, ) (NoEofError || error{StreamTooLong})![]u8 { return @errorCast(self.any().readUntilDelimiter(buf, delimiter)); } |
readUntilDelimiterOrEofAlloc() |
pub inline fn readUntilDelimiterOrEofAlloc( self: Self, allocator: Allocator, delimiter: u8, max_size: usize, ) (Error || error{StreamTooLong})!?[]u8 { return @errorCast(self.any().readUntilDelimiterOrEofAlloc( allocator, delimiter, max_size, )); } |
readUntilDelimiterOrEof() |
pub inline fn readUntilDelimiterOrEof( self: Self, buf: []u8, delimiter: u8, ) (Error || error{StreamTooLong})!?[]u8 { return @errorCast(self.any().readUntilDelimiterOrEof(buf, delimiter)); } |
streamUntilDelimiter() |
pub inline fn streamUntilDelimiter( self: Self, writer: anytype, delimiter: u8, optional_max_size: ?usize, ) (NoEofError || error{StreamTooLong} || @TypeOf(writer).Error)!void { return @errorCast(self.any().streamUntilDelimiter( writer, delimiter, optional_max_size, )); } |
skipUntilDelimiterOrEof() |
pub inline fn skipUntilDelimiterOrEof(self: Self, delimiter: u8) Error!void { return @errorCast(self.any().skipUntilDelimiterOrEof(delimiter)); } |
readByte() |
pub inline fn readByte(self: Self) NoEofError!u8 { return @errorCast(self.any().readByte()); } |
readByteSigned() |
pub inline fn readByteSigned(self: Self) NoEofError!i8 { return @errorCast(self.any().readByteSigned()); } |
readBytesNoEof() |
pub inline fn readBytesNoEof( self: Self, comptime num_bytes: usize, ) NoEofError![num_bytes]u8 { return @errorCast(self.any().readBytesNoEof(num_bytes)); } |
readIntoBoundedBytes() |
pub inline fn readIntoBoundedBytes( self: Self, comptime num_bytes: usize, bounded: *std.BoundedArray(u8, num_bytes), ) Error!void { return @errorCast(self.any().readIntoBoundedBytes(num_bytes, bounded)); } |
readBoundedBytes() |
pub inline fn readBoundedBytes( self: Self, comptime num_bytes: usize, ) Error!std.BoundedArray(u8, num_bytes) { return @errorCast(self.any().readBoundedBytes(num_bytes)); } |
readInt() |
pub inline fn readInt(self: Self, comptime T: type, endian: std.builtin.Endian) NoEofError!T { return @errorCast(self.any().readInt(T, endian)); } |
readVarInt() |
pub inline fn readVarInt( self: Self, comptime ReturnType: type, endian: std.builtin.Endian, size: usize, ) NoEofError!ReturnType { return @errorCast(self.any().readVarInt(ReturnType, endian, size)); } pub const SkipBytesOptions = AnyReader.SkipBytesOptions; |
skipBytes() |
pub inline fn skipBytes( self: Self, num_bytes: u64, comptime options: SkipBytesOptions, ) NoEofError!void { return @errorCast(self.any().skipBytes(num_bytes, options)); } |
isBytes() |
pub inline fn isBytes(self: Self, slice: []const u8) NoEofError!bool { return @errorCast(self.any().isBytes(slice)); } |
readStruct() |
pub inline fn readStruct(self: Self, comptime T: type) NoEofError!T { return @errorCast(self.any().readStruct(T)); } |
readStructBig() |
pub inline fn readStructBig(self: Self, comptime T: type) NoEofError!T { return @errorCast(self.any().readStructBig(T)); } pub const ReadEnumError = NoEofError || error{ InvalidValue, }; |
readEnum()An integer was read, but it did not match any of the tags in the supplied enum. |
pub inline fn readEnum( self: Self, comptime Enum: type, endian: std.builtin.Endian, ) ReadEnumError!Enum { return @errorCast(self.any().readEnum(Enum, endian)); } |
any() |
pub inline fn any(self: *const Self) AnyReader { return .{ .context = @ptrCast(&self.context), .readFn = typeErasedReadFn, }; } const Self = @This(); fn typeErasedReadFn(context: *const anyopaque, buffer: []u8) anyerror!usize { const ptr: *const Context = @alignCast(@ptrCast(context)); return readFn(ptr.*, buffer); } }; } |
Reader Deprecated; consider switching to |
pub const Reader = GenericReader; |
AnyReaderio/Reader.zig |
pub const AnyReader = @import("io/Reader.zig"); |
Writerio/writer.zig |
pub const Writer = @import("io/writer.zig").Writer; |
SeekableStreamio/seekable_stream.zig |
pub const SeekableStream = @import("io/seekable_stream.zig").SeekableStream; |
BufferedWriterio/buffered_writer.zig |
pub const BufferedWriter = @import("io/buffered_writer.zig").BufferedWriter; |
bufferedWriterio/buffered_writer.zig |
pub const bufferedWriter = @import("io/buffered_writer.zig").bufferedWriter; |
BufferedReaderio/buffered_reader.zig |
pub const BufferedReader = @import("io/buffered_reader.zig").BufferedReader; |
bufferedReaderio/buffered_reader.zig |
pub const bufferedReader = @import("io/buffered_reader.zig").bufferedReader; |
bufferedReaderSizeio/buffered_reader.zig |
pub const bufferedReaderSize = @import("io/buffered_reader.zig").bufferedReaderSize; |
PeekStreamio/peek_stream.zig |
pub const PeekStream = @import("io/peek_stream.zig").PeekStream; |
peekStreamio/peek_stream.zig |
pub const peekStream = @import("io/peek_stream.zig").peekStream; |
FixedBufferStreamio/fixed_buffer_stream.zig |
pub const FixedBufferStream = @import("io/fixed_buffer_stream.zig").FixedBufferStream; |
fixedBufferStreamio/fixed_buffer_stream.zig |
pub const fixedBufferStream = @import("io/fixed_buffer_stream.zig").fixedBufferStream; |
CWriterio/c_writer.zig |
pub const CWriter = @import("io/c_writer.zig").CWriter; |
cWriterio/c_writer.zig |
pub const cWriter = @import("io/c_writer.zig").cWriter; |
LimitedReaderio/limited_reader.zig |
pub const LimitedReader = @import("io/limited_reader.zig").LimitedReader; |
limitedReaderio/limited_reader.zig |
pub const limitedReader = @import("io/limited_reader.zig").limitedReader; |
CountingWriterio/counting_writer.zig |
pub const CountingWriter = @import("io/counting_writer.zig").CountingWriter; |
countingWriterio/counting_writer.zig |
pub const countingWriter = @import("io/counting_writer.zig").countingWriter; |
CountingReaderio/counting_reader.zig |
pub const CountingReader = @import("io/counting_reader.zig").CountingReader; |
countingReaderio/counting_reader.zig |
pub const countingReader = @import("io/counting_reader.zig").countingReader; |
MultiWriterio/multi_writer.zig |
pub const MultiWriter = @import("io/multi_writer.zig").MultiWriter; |
multiWriterio/multi_writer.zig |
pub const multiWriter = @import("io/multi_writer.zig").multiWriter; |
BitReaderio/bit_reader.zig |
pub const BitReader = @import("io/bit_reader.zig").BitReader; |
bitReaderio/bit_reader.zig |
pub const bitReader = @import("io/bit_reader.zig").bitReader; |
BitWriterio/bit_writer.zig |
pub const BitWriter = @import("io/bit_writer.zig").BitWriter; |
bitWriterio/bit_writer.zig |
pub const bitWriter = @import("io/bit_writer.zig").bitWriter; |
ChangeDetectionStreamio/change_detection_stream.zig |
pub const ChangeDetectionStream = @import("io/change_detection_stream.zig").ChangeDetectionStream; |
changeDetectionStreamio/change_detection_stream.zig |
pub const changeDetectionStream = @import("io/change_detection_stream.zig").changeDetectionStream; |
FindByteWriterio/find_byte_writer.zig |
pub const FindByteWriter = @import("io/find_byte_writer.zig").FindByteWriter; |
findByteWriterio/find_byte_writer.zig |
pub const findByteWriter = @import("io/find_byte_writer.zig").findByteWriter; |
BufferedAtomicFileio/buffered_atomic_file.zig |
pub const BufferedAtomicFile = @import("io/buffered_atomic_file.zig").BufferedAtomicFile; |
StreamSourceio/stream_source.zig |
pub const StreamSource = @import("io/stream_source.zig").StreamSource; |
ttyio/tty.zig |
pub const tty = @import("io/tty.zig"); |
null_writerA Writer that doesn't write to anything. |
pub const null_writer = @as(NullWriter, .{ .context = {} }); const NullWriter = Writer(void, error{}, dummyWrite); fn dummyWrite(context: void, data: []const u8) error{}!usize { _ = context; return data.len; } |
Test:null_writer |
test "null_writer" { null_writer.writeAll("yay" ** 10) catch |err| switch (err) {}; } |
poll() |
pub fn poll( allocator: Allocator, comptime StreamEnum: type, files: PollFiles(StreamEnum), ) Poller(StreamEnum) { const enum_fields = @typeInfo(StreamEnum).Enum.fields; var result: Poller(StreamEnum) = undefined; if (builtin.os.tag == .windows) result.windows = .{ .first_read_done = false, .overlapped = [1]os.windows.OVERLAPPED{ mem.zeroes(os.windows.OVERLAPPED), } ** enum_fields.len, .active = .{ .count = 0, .handles_buf = undefined, .stream_map = undefined, }, }; inline for (0..enum_fields.len) |i| { result.fifos[i] = .{ .allocator = allocator, .buf = &.{}, .head = 0, .count = 0, }; if (builtin.os.tag == .windows) { result.windows.active.handles_buf[i] = @field(files, enum_fields[i].name).handle; } else { result.poll_fds[i] = .{ .fd = @field(files, enum_fields[i].name).handle, .events = os.POLL.IN, .revents = undefined, }; } } return result; } |
PollFifo |
pub const PollFifo = std.fifo.LinearFifo(u8, .Dynamic); |
Poller() |
pub fn Poller(comptime StreamEnum: type) type { return struct { const enum_fields = @typeInfo(StreamEnum).Enum.fields; const PollFd = if (builtin.os.tag == .windows) void else std.os.pollfd; fifos: [enum_fields.len]PollFifo, poll_fds: [enum_fields.len]PollFd, windows: if (builtin.os.tag == .windows) struct { first_read_done: bool, overlapped: [enum_fields.len]os.windows.OVERLAPPED, active: struct { count: math.IntFittingRange(0, enum_fields.len), handles_buf: [enum_fields.len]os.windows.HANDLE, stream_map: [enum_fields.len]StreamEnum, |
removeAt() |
pub fn removeAt(self: *@This(), index: u32) void { std.debug.assert(index < self.count); for (index + 1..self.count) |i| { self.handles_buf[i - 1] = self.handles_buf[i]; self.stream_map[i - 1] = self.stream_map[i]; } self.count -= 1; } }, } else void, const Self = @This(); |
deinit() |
pub fn deinit(self: *Self) void { if (builtin.os.tag == .windows) { // cancel any pending IO to prevent clobbering OVERLAPPED value for (self.windows.active.handles_buf[0..self.windows.active.count]) |h| { _ = os.windows.kernel32.CancelIo(h); } } inline for (&self.fifos) |*q| q.deinit(); self.* = undefined; } |
poll() |
pub fn poll(self: *Self) !bool { if (builtin.os.tag == .windows) { return pollWindows(self); } else { return pollPosix(self); } } |
fifo() |
pub inline fn fifo(self: *Self, comptime which: StreamEnum) *PollFifo { return &self.fifos[@intFromEnum(which)]; } fn pollWindows(self: *Self) !bool { const bump_amt = 512; if (!self.windows.first_read_done) { // Windows Async IO requires an initial call to ReadFile before waiting on the handle for (0..enum_fields.len) |i| { const handle = self.windows.active.handles_buf[i]; switch (try windowsAsyncRead( handle, &self.windows.overlapped[i], &self.fifos[i], bump_amt, )) { .pending => { self.windows.active.handles_buf[self.windows.active.count] = handle; self.windows.active.stream_map[self.windows.active.count] = @as(StreamEnum, @enumFromInt(i)); self.windows.active.count += 1; }, .closed => {}, // don't add to the wait_objects list } } self.windows.first_read_done = true; } while (true) { if (self.windows.active.count == 0) return false; const status = os.windows.kernel32.WaitForMultipleObjects( self.windows.active.count, &self.windows.active.handles_buf, 0, os.windows.INFINITE, ); if (status == os.windows.WAIT_FAILED) return os.windows.unexpectedError(os.windows.kernel32.GetLastError()); if (status < os.windows.WAIT_OBJECT_0 or status > os.windows.WAIT_OBJECT_0 + enum_fields.len - 1) unreachable; const active_idx = status - os.windows.WAIT_OBJECT_0; const handle = self.windows.active.handles_buf[active_idx]; const stream_idx = @intFromEnum(self.windows.active.stream_map[active_idx]); var read_bytes: u32 = undefined; if (0 == os.windows.kernel32.GetOverlappedResult( handle, &self.windows.overlapped[stream_idx], &read_bytes, 0, )) switch (os.windows.kernel32.GetLastError()) { .BROKEN_PIPE => { self.windows.active.removeAt(active_idx); continue; }, else => |err| return os.windows.unexpectedError(err), }; self.fifos[stream_idx].update(read_bytes); switch (try windowsAsyncRead( handle, &self.windows.overlapped[stream_idx], &self.fifos[stream_idx], bump_amt, )) { .pending => {}, .closed => self.windows.active.removeAt(active_idx), } return true; } } fn pollPosix(self: *Self) !bool { // We ask for ensureUnusedCapacity with this much extra space. This // has more of an effect on small reads because once the reads // start to get larger the amount of space an ArrayList will // allocate grows exponentially. const bump_amt = 512; const err_mask = os.POLL.ERR | os.POLL.NVAL | os.POLL.HUP; const events_len = try os.poll(&self.poll_fds, std.math.maxInt(i32)); if (events_len == 0) { for (self.poll_fds) |poll_fd| { if (poll_fd.fd != -1) return true; } else return false; } var keep_polling = false; inline for (&self.poll_fds, &self.fifos) |*poll_fd, *q| { // Try reading whatever is available before checking the error // conditions. // It's still possible to read after a POLL.HUP is received, // always check if there's some data waiting to be read first. if (poll_fd.revents & os.POLL.IN != 0) { const buf = try q.writableWithSize(bump_amt); const amt = try os.read(poll_fd.fd, buf); q.update(amt); if (amt == 0) { // Remove the fd when the EOF condition is met. poll_fd.fd = -1; } else { keep_polling = true; } } else if (poll_fd.revents & err_mask != 0) { // Exclude the fds that signaled an error. poll_fd.fd = -1; } else if (poll_fd.fd != -1) { keep_polling = true; } } return keep_polling; } }; } fn windowsAsyncRead( handle: os.windows.HANDLE, overlapped: *os.windows.OVERLAPPED, fifo: *PollFifo, bump_amt: usize, ) !enum { pending, closed } { while (true) { const buf = try fifo.writableWithSize(bump_amt); var read_bytes: u32 = undefined; const read_result = os.windows.kernel32.ReadFile(handle, buf.ptr, math.cast(u32, buf.len) orelse math.maxInt(u32), &read_bytes, overlapped); if (read_result == 0) return switch (os.windows.kernel32.GetLastError()) { .IO_PENDING => .pending, .BROKEN_PIPE => .closed, else => |err| os.windows.unexpectedError(err), }; fifo.update(read_bytes); } } |
PollFiles()Given an enum, returns a struct with fields of that enum, each field representing an I/O stream for polling. |
pub fn PollFiles(comptime StreamEnum: type) type { const enum_fields = @typeInfo(StreamEnum).Enum.fields; var struct_fields: [enum_fields.len]std.builtin.Type.StructField = undefined; for (&struct_fields, enum_fields) |*struct_field, enum_field| { struct_field.* = .{ .name = enum_field.name, .type = fs.File, .default_value = null, .is_comptime = false, .alignment = @alignOf(fs.File), }; } return @Type(.{ .Struct = .{ .layout = .Auto, .fields = &struct_fields, .decls = &.{}, .is_tuple = false, } }); } test { _ = AnyReader; _ = @import("io/bit_reader.zig"); _ = @import("io/bit_writer.zig"); _ = @import("io/buffered_atomic_file.zig"); _ = @import("io/buffered_reader.zig"); _ = @import("io/buffered_writer.zig"); _ = @import("io/c_writer.zig"); _ = @import("io/counting_writer.zig"); _ = @import("io/counting_reader.zig"); _ = @import("io/fixed_buffer_stream.zig"); _ = @import("io/writer.zig"); _ = @import("io/peek_stream.zig"); _ = @import("io/seekable_stream.zig"); _ = @import("io/stream_source.zig"); _ = @import("io/test.zig"); } |
Generated by zstd-browse2 on 2023-11-04 14:12:19 -0400. |