diff --git a/Package.swift b/Package.swift index b71ade66..14b040e7 100644 --- a/Package.swift +++ b/Package.swift @@ -48,4 +48,4 @@ let package = Package( dependencies: ["SystemPackage"], cSettings: cSettings, swiftSettings: swiftSettings), - ]) + ]) \ No newline at end of file diff --git a/Sources/CSystem/include/CSystemLinux.h b/Sources/CSystem/include/CSystemLinux.h index b172d658..6489c4f3 100644 --- a/Sources/CSystem/include/CSystemLinux.h +++ b/Sources/CSystem/include/CSystemLinux.h @@ -21,5 +21,6 @@ #include #include #include +#include "io_uring.h" #endif diff --git a/Sources/CSystem/include/io_uring.h b/Sources/CSystem/include/io_uring.h new file mode 100644 index 00000000..5cab757c --- /dev/null +++ b/Sources/CSystem/include/io_uring.h @@ -0,0 +1,61 @@ +#include +#include +#include + +#include +#include + +#ifndef SWIFT_IORING_C_WRAPPER +#define SWIFT_IORING_C_WRAPPER + +#ifdef __alpha__ +/* + * alpha is the only exception, all other architectures + * have common numbers for new system calls. + */ +# ifndef __NR_io_uring_setup +# define __NR_io_uring_setup 535 +# endif +# ifndef __NR_io_uring_enter +# define __NR_io_uring_enter 536 +# endif +# ifndef __NR_io_uring_register +# define __NR_io_uring_register 537 +# endif +#else /* !__alpha__ */ +# ifndef __NR_io_uring_setup +# define __NR_io_uring_setup 425 +# endif +# ifndef __NR_io_uring_enter +# define __NR_io_uring_enter 426 +# endif +# ifndef __NR_io_uring_register +# define __NR_io_uring_register 427 +# endif +#endif + +int io_uring_register(int fd, unsigned int opcode, void *arg, + unsigned int nr_args) +{ + return syscall(__NR_io_uring_register, fd, opcode, arg, nr_args); +} + +int io_uring_setup(unsigned int entries, struct io_uring_params *p) +{ + return syscall(__NR_io_uring_setup, entries, p); +} + +int io_uring_enter2(int fd, unsigned int to_submit, unsigned int min_complete, + unsigned int flags, void *args, size_t sz) +{ + return syscall(__NR_io_uring_enter, fd, to_submit, min_complete, + flags, args, _NSIG / 8); +} + +int io_uring_enter(int fd, unsigned int to_submit, unsigned int min_complete, + unsigned int flags, sigset_t *sig) +{ + return io_uring_enter2(fd, to_submit, min_complete, flags, sig, _NSIG / 8); +} + +#endif diff --git a/Sources/CSystem/shims.c b/Sources/CSystem/shims.c deleted file mode 100644 index f492a2ae..00000000 --- a/Sources/CSystem/shims.c +++ /dev/null @@ -1,18 +0,0 @@ -/* - This source file is part of the Swift System open source project - - Copyright (c) 2020 Apple Inc. and the Swift System project authors - Licensed under Apache License v2.0 with Runtime Library Exception - - See https://swift.org/LICENSE.txt for license information -*/ - -#ifdef __linux__ - -#include - -#endif - -#if defined(_WIN32) -#include -#endif diff --git a/Sources/System/IOCompletion.swift b/Sources/System/IOCompletion.swift new file mode 100644 index 00000000..ee81797e --- /dev/null +++ b/Sources/System/IOCompletion.swift @@ -0,0 +1,56 @@ +@_implementationOnly import CSystem + +public struct IOCompletion: ~Copyable { + let rawValue: io_uring_cqe +} + +extension IOCompletion { + public struct Flags: OptionSet, Hashable, Codable { + public let rawValue: UInt32 + + public init(rawValue: UInt32) { + self.rawValue = rawValue + } + + public static let allocatedBuffer = Flags(rawValue: 1 << 0) + public static let moreCompletions = Flags(rawValue: 1 << 1) + public static let socketNotEmpty = Flags(rawValue: 1 << 2) + public static let isNotificationEvent = Flags(rawValue: 1 << 3) + } +} + +extension IOCompletion { + public var userData: UInt64 { //TODO: naming? + get { + rawValue.user_data + } + } + + public var userPointer: UnsafeRawPointer? { + get { + UnsafeRawPointer(bitPattern: UInt(rawValue.user_data)) + } + } + + public var result: Int32 { + get { + rawValue.res + } + } + + public var flags: IOCompletion.Flags { + get { + Flags(rawValue: rawValue.flags & 0x0000FFFF) + } + } + + public var bufferIndex: UInt16? { + get { + if self.flags.contains(.allocatedBuffer) { + return UInt16(rawValue.flags >> 16) + } else { + return nil + } + } + } +} diff --git a/Sources/System/IORequest.swift b/Sources/System/IORequest.swift new file mode 100644 index 00000000..1eed6edf --- /dev/null +++ b/Sources/System/IORequest.swift @@ -0,0 +1,383 @@ +@_implementationOnly import struct CSystem.io_uring_sqe + +@usableFromInline +internal enum IORequestCore { + case nop // nothing here + case openat( + atDirectory: FileDescriptor, + path: FilePath, + FileDescriptor.AccessMode, + options: FileDescriptor.OpenOptions = FileDescriptor.OpenOptions(), + permissions: FilePermissions? = nil, + userData: UInt64 = 0 + ) + case openatSlot( + atDirectory: FileDescriptor, + path: FilePath, + FileDescriptor.AccessMode, + options: FileDescriptor.OpenOptions = FileDescriptor.OpenOptions(), + permissions: FilePermissions? = nil, + intoSlot: IORingFileSlot, + userData: UInt64 = 0 + ) + case read( + file: FileDescriptor, + buffer: IORingBuffer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case readUnregistered( + file: FileDescriptor, + buffer: UnsafeMutableRawBufferPointer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case readSlot( + file: IORingFileSlot, + buffer: IORingBuffer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case readUnregisteredSlot( + file: IORingFileSlot, + buffer: UnsafeMutableRawBufferPointer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case write( + file: FileDescriptor, + buffer: IORingBuffer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case writeUnregistered( + file: FileDescriptor, + buffer: UnsafeMutableRawBufferPointer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case writeSlot( + file: IORingFileSlot, + buffer: IORingBuffer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case writeUnregisteredSlot( + file: IORingFileSlot, + buffer: UnsafeMutableRawBufferPointer, + offset: UInt64 = 0, + userData: UInt64 = 0 + ) + case close( + FileDescriptor, + userData: UInt64 = 0 + ) + case closeSlot( + IORingFileSlot, + userData: UInt64 = 0 + ) + case unlinkAt( + atDirectory: FileDescriptor, + path: FilePath, + userData: UInt64 = 0 + ) +} + +@inline(__always) +internal func makeRawRequest_readWrite_registered( + file: FileDescriptor, + buffer: IORingBuffer, + offset: UInt64, + userData: UInt64 = 0, + request: consuming RawIORequest +) -> RawIORequest { + request.fileDescriptor = file + request.buffer = buffer.unsafeBuffer + request.rawValue.buf_index = UInt16(exactly: buffer.index)! + request.offset = offset + request.rawValue.user_data = userData + return request +} + +@inline(__always) +internal func makeRawRequest_readWrite_registered_slot( + file: IORingFileSlot, + buffer: IORingBuffer, + offset: UInt64, + userData: UInt64 = 0, + request: consuming RawIORequest +) -> RawIORequest { + request.rawValue.fd = Int32(exactly: file.index)! + request.flags = .fixedFile + request.buffer = buffer.unsafeBuffer + request.rawValue.buf_index = UInt16(exactly: buffer.index)! + request.offset = offset + request.rawValue.user_data = userData + return request +} + +@inline(__always) +internal func makeRawRequest_readWrite_unregistered( + file: FileDescriptor, + buffer: UnsafeMutableRawBufferPointer, + offset: UInt64, + userData: UInt64 = 0, + request: consuming RawIORequest +) -> RawIORequest { + request.fileDescriptor = file + request.buffer = buffer + request.offset = offset + request.rawValue.user_data = userData + return request +} + +@inline(__always) +internal func makeRawRequest_readWrite_unregistered_slot( + file: IORingFileSlot, + buffer: UnsafeMutableRawBufferPointer, + offset: UInt64, + userData: UInt64 = 0, + request: consuming RawIORequest +) -> RawIORequest { + request.rawValue.fd = Int32(exactly: file.index)! + request.flags = .fixedFile + request.buffer = buffer + request.offset = offset + request.rawValue.user_data = userData + return request +} + +public struct IORequest { + @usableFromInline var core: IORequestCore + + @inlinable internal consuming func extractCore() -> IORequestCore { + return core + } +} + +extension IORequest { + public static func nop(userData: UInt64 = 0) -> IORequest { + IORequest(core: .nop) + } + + public static func reading( + _ file: IORingFileSlot, + into buffer: IORingBuffer, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest(core: .readSlot(file: file, buffer: buffer, offset: offset, userData: userData)) + } + + public static func reading( + _ file: FileDescriptor, + into buffer: IORingBuffer, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest(core: .read(file: file, buffer: buffer, offset: offset, userData: userData)) + } + + public static func reading( + _ file: IORingFileSlot, + into buffer: UnsafeMutableRawBufferPointer, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest( + core: .readUnregisteredSlot( + file: file, buffer: buffer, offset: offset, userData: userData)) + } + + public static func reading( + _ file: FileDescriptor, + into buffer: UnsafeMutableRawBufferPointer, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest( + core: .readUnregistered(file: file, buffer: buffer, offset: offset, userData: userData)) + } + + public static func writing( + _ buffer: IORingBuffer, + into file: IORingFileSlot, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest(core: .writeSlot(file: file, buffer: buffer, offset: offset, userData: userData)) + } + + public static func writing( + _ buffer: IORingBuffer, + into file: FileDescriptor, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest(core: .write(file: file, buffer: buffer, offset: offset, userData: userData)) + } + + public static func writing( + _ buffer: UnsafeMutableRawBufferPointer, + into file: IORingFileSlot, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest( + core: .writeUnregisteredSlot( + file: file, buffer: buffer, offset: offset, userData: userData)) + } + + public static func writing( + _ buffer: UnsafeMutableRawBufferPointer, + into file: FileDescriptor, + at offset: UInt64 = 0, + userData: UInt64 = 0 + ) -> IORequest { + IORequest( + core: .writeUnregistered(file: file, buffer: buffer, offset: offset, userData: userData) + ) + } + + public static func closing( + _ file: FileDescriptor, + userData: UInt64 = 0 + ) -> IORequest { + IORequest(core: .close(file, userData: userData)) + } + + public static func closing( + _ file: IORingFileSlot, + userData: UInt64 = 0 + ) -> IORequest { + IORequest(core: .closeSlot(file, userData: userData)) + } + + public static func opening( + _ path: FilePath, + in directory: FileDescriptor, + into slot: IORingFileSlot, + mode: FileDescriptor.AccessMode, + options: FileDescriptor.OpenOptions = FileDescriptor.OpenOptions(), + permissions: FilePermissions? = nil, + userData: UInt64 = 0 + ) -> IORequest { + IORequest( + core: .openatSlot( + atDirectory: directory, path: path, mode, options: options, + permissions: permissions, intoSlot: slot, userData: userData)) + } + + public static func opening( + _ path: FilePath, + in directory: FileDescriptor, + mode: FileDescriptor.AccessMode, + options: FileDescriptor.OpenOptions = FileDescriptor.OpenOptions(), + permissions: FilePermissions? = nil, + userData: UInt64 = 0 + ) -> IORequest { + IORequest( + core: .openat( + atDirectory: directory, path: path, mode, options: options, + permissions: permissions, userData: userData + )) + } + + public static func unlinking( + _ path: FilePath, + in directory: FileDescriptor, + userData: UInt64 = 0 + ) -> IORequest { + IORequest(core: .unlinkAt(atDirectory: directory, path: path, userData: userData)) + } + + @inline(__always) + public consuming func makeRawRequest() -> RawIORequest { + var request = RawIORequest() + switch extractCore() { + case .nop: + request.operation = .nop + case .openatSlot( + let atDirectory, let path, let mode, let options, let permissions, let fileSlot, + let userData): + // TODO: use rawValue less + request.operation = .openAt + request.fileDescriptor = atDirectory + request.rawValue.addr = UInt64( + UInt( + bitPattern: path.withPlatformString { ptr in + ptr //this is unsavory, but we keep it alive by storing path alongside it in the request + })) + request.rawValue.open_flags = UInt32(bitPattern: options.rawValue | mode.rawValue) + request.rawValue.len = permissions?.rawValue ?? 0 + request.rawValue.file_index = UInt32(fileSlot.index + 1) + request.path = path + request.rawValue.user_data = userData + case .openat( + let atDirectory, let path, let mode, let options, let permissions, let userData): + request.operation = .openAt + request.fileDescriptor = atDirectory + request.rawValue.addr = UInt64( + UInt( + bitPattern: path.withPlatformString { ptr in + ptr //this is unsavory, but we keep it alive by storing path alongside it in the request + })) + request.rawValue.open_flags = UInt32(bitPattern: options.rawValue | mode.rawValue) + request.rawValue.len = permissions?.rawValue ?? 0 + request.path = path + request.rawValue.user_data = userData + case .write(let file, let buffer, let offset, let userData): + request.operation = .writeFixed + return makeRawRequest_readWrite_registered( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .writeSlot(let file, let buffer, let offset, let userData): + request.operation = .writeFixed + return makeRawRequest_readWrite_registered_slot( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .writeUnregistered(let file, let buffer, let offset, let userData): + request.operation = .write + return makeRawRequest_readWrite_unregistered( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .writeUnregisteredSlot(let file, let buffer, let offset, let userData): + request.operation = .write + return makeRawRequest_readWrite_unregistered_slot( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .read(let file, let buffer, let offset, let userData): + request.operation = .readFixed + return makeRawRequest_readWrite_registered( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .readSlot(let file, let buffer, let offset, let userData): + request.operation = .readFixed + return makeRawRequest_readWrite_registered_slot( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .readUnregistered(let file, let buffer, let offset, let userData): + request.operation = .read + return makeRawRequest_readWrite_unregistered( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .readUnregisteredSlot(let file, let buffer, let offset, let userData): + request.operation = .read + return makeRawRequest_readWrite_unregistered_slot( + file: file, buffer: buffer, offset: offset, userData: userData, request: request) + case .close(let file, let userData): + request.operation = .close + request.fileDescriptor = file + request.rawValue.user_data = userData + case .closeSlot(let file, let userData): + request.operation = .close + request.rawValue.file_index = UInt32(file.index + 1) + request.rawValue.user_data = userData + case .unlinkAt(let atDirectory, let path, let userData): + request.operation = .unlinkAt + request.fileDescriptor = atDirectory + request.rawValue.addr = UInt64( + UInt( + bitPattern: path.withPlatformString { ptr in + ptr //this is unsavory, but we keep it alive by storing path alongside it in the request + }) + ) + request.path = path + request.rawValue.user_data = userData + } + return request + } +} diff --git a/Sources/System/IORing.swift b/Sources/System/IORing.swift new file mode 100644 index 00000000..79840c1c --- /dev/null +++ b/Sources/System/IORing.swift @@ -0,0 +1,657 @@ +@_implementationOnly import CSystem +import Glibc // needed for mmap +import Synchronization + +@_implementationOnly import struct CSystem.io_uring_sqe + +// XXX: this *really* shouldn't be here. oh well. +extension UnsafeMutableRawPointer { + func advanced(by offset: UInt32) -> UnsafeMutableRawPointer { + return advanced(by: Int(offset)) + } +} + +extension UnsafeMutableRawBufferPointer { + func to_iovec() -> iovec { + iovec(iov_base: baseAddress, iov_len: count) + } +} + +// all pointers in this struct reference kernel-visible memory +@usableFromInline struct SQRing: ~Copyable { + let kernelHead: UnsafePointer> + let kernelTail: UnsafePointer> + var userTail: UInt32 + + // from liburing: the kernel should never change these + // might change in the future with resizable rings? + let ringMask: UInt32 + // let ringEntries: UInt32 - absorbed into array.count + + // ring flags bitfield + // currently used by the kernel only in SQPOLL mode to indicate + // when the polling thread needs to be woken up + let flags: UnsafePointer> + + // ring array + // maps indexes between the actual ring and the submissionQueueEntries list, + // allowing the latter to be used as a kind of freelist with enough work? + // currently, just 1:1 mapping (0.. +} + +struct CQRing: ~Copyable { + let kernelHead: UnsafePointer> + let kernelTail: UnsafePointer> + + // TODO: determine if this is actually used + var userHead: UInt32 + + let ringMask: UInt32 + + let cqes: UnsafeBufferPointer +} + +public struct IOResource { + public typealias Resource = T + @usableFromInline let resource: T + @usableFromInline let index: Int + + internal init( + resource: T, + index: Int + ) { + self.resource = resource + self.index = index + } +} + +public typealias IORingFileSlot = IOResource +public typealias IORingBuffer = IOResource + +extension IORingFileSlot { + public var unsafeFileSlot: Int { + return index + } +} +extension IORingBuffer { + public var unsafeBuffer: UnsafeMutableRawBufferPointer { + return .init(start: resource.iov_base, count: resource.iov_len) + } +} + +@inline(__always) +internal func _writeRequest( + _ request: __owned RawIORequest, ring: inout SQRing, + submissionQueueEntries: UnsafeMutableBufferPointer +) + -> Bool +{ + let entry = _blockingGetSubmissionEntry( + ring: &ring, submissionQueueEntries: submissionQueueEntries) + entry.pointee = request.rawValue + return true +} + +@inline(__always) +internal func _blockingGetSubmissionEntry( + ring: inout SQRing, submissionQueueEntries: UnsafeMutableBufferPointer +) -> UnsafeMutablePointer< + io_uring_sqe +> { + while true { + if let entry = _getSubmissionEntry( + ring: &ring, + submissionQueueEntries: submissionQueueEntries + ) { + return entry + } + // TODO: actually block here instead of spinning + } + +} + +//TODO: omitting signal mask for now +//Tell the kernel that we've submitted requests and/or are waiting for completions +internal func _enter( + ringDescriptor: Int32, + numEvents: UInt32, + minCompletions: UInt32, + flags: UInt32 +) throws -> Int32 { + // Ring always needs enter right now; + // TODO: support SQPOLL here + while true { + let ret = io_uring_enter(ringDescriptor, numEvents, minCompletions, flags, nil) + // error handling: + // EAGAIN / EINTR (try again), + // EBADF / EBADFD / EOPNOTSUPP / ENXIO + // (failure in ring lifetime management, fatal), + // EINVAL (bad constant flag?, fatal), + // EFAULT (bad address for argument from library, fatal) + if ret == -EAGAIN || ret == -EINTR { + //TODO: should we wait a bit on AGAIN? + continue + } else if ret < 0 { + fatalError( + "fatal error in submitting requests: " + Errno(rawValue: -ret).debugDescription + ) + } else { + return ret + } + } +} + +internal func _submitRequests(ring: borrowing SQRing, ringDescriptor: Int32) throws { + let flushedEvents = _flushQueue(ring: ring) + _ = try _enter( + ringDescriptor: ringDescriptor, numEvents: flushedEvents, minCompletions: 0, flags: 0) +} + +internal func _getUnconsumedSubmissionCount(ring: borrowing SQRing) -> UInt32 { + return ring.userTail - ring.kernelHead.pointee.load(ordering: .acquiring) +} + +internal func _getUnconsumedCompletionCount(ring: borrowing CQRing) -> UInt32 { + return ring.kernelTail.pointee.load(ordering: .acquiring) + - ring.kernelHead.pointee.load(ordering: .acquiring) +} + +//TODO: pretty sure this is supposed to do more than it does +internal func _flushQueue(ring: borrowing SQRing) -> UInt32 { + ring.kernelTail.pointee.store( + ring.userTail, ordering: .releasing + ) + return _getUnconsumedSubmissionCount(ring: ring) +} + +@inline(__always) +internal func _getSubmissionEntry( + ring: inout SQRing, submissionQueueEntries: UnsafeMutableBufferPointer +) -> UnsafeMutablePointer< + io_uring_sqe +>? { + let next = ring.userTail &+ 1 //this is expected to wrap + + // FEAT: smp load when SQPOLL in use (not in MVP) + let kernelHead = ring.kernelHead.pointee.load(ordering: .acquiring) + + // FEAT: 128-bit event support (not in MVP) + if next - kernelHead <= ring.array.count { + // let sqe = &sq->sqes[(sq->sqe_tail & sq->ring_mask) << shift]; + let sqeIndex = Int( + ring.userTail & ring.ringMask + ) + + let sqe = submissionQueueEntries + .baseAddress.unsafelyUnwrapped + .advanced(by: sqeIndex) + + ring.userTail = next + return sqe + } + return nil +} + +public struct IORing: ~Copyable { + let ringFlags: UInt32 + let ringDescriptor: Int32 + + @usableFromInline var submissionRing: SQRing + // FEAT: set this eventually + let submissionPolling: Bool = false + + let completionRing: CQRing + + let submissionQueueEntries: UnsafeMutableBufferPointer + + // kept around for unmap / cleanup + let ringSize: Int + let ringPtr: UnsafeMutableRawPointer + + var _registeredFiles: [UInt32]? + var _registeredBuffers: [iovec]? + + public init(queueDepth: UInt32) throws { + var params = io_uring_params() + + ringDescriptor = withUnsafeMutablePointer(to: ¶ms) { + return io_uring_setup(queueDepth, $0) + } + + if params.features & IORING_FEAT_SINGLE_MMAP == 0 + || params.features & IORING_FEAT_NODROP == 0 + { + close(ringDescriptor) + // TODO: error handling + throw IORingError.missingRequiredFeatures + } + + if ringDescriptor < 0 { + // TODO: error handling + } + + let submitRingSize = + params.sq_off.array + + params.sq_entries * UInt32(MemoryLayout.size) + + let completionRingSize = + params.cq_off.cqes + + params.cq_entries * UInt32(MemoryLayout.size) + + ringSize = Int(max(submitRingSize, completionRingSize)) + + ringPtr = mmap( + /* addr: */ nil, + /* len: */ ringSize, + /* prot: */ PROT_READ | PROT_WRITE, + /* flags: */ MAP_SHARED | MAP_POPULATE, + /* fd: */ ringDescriptor, + /* offset: */ __off_t(IORING_OFF_SQ_RING) + ) + + if ringPtr == MAP_FAILED { + perror("mmap") + // TODO: error handling + fatalError("mmap failed in ring setup") + } + + let submissionRing = SQRing( + kernelHead: UnsafePointer>( + ringPtr.advanced(by: params.sq_off.head) + .assumingMemoryBound(to: Atomic.self) + ), + kernelTail: UnsafePointer>( + ringPtr.advanced(by: params.sq_off.tail) + .assumingMemoryBound(to: Atomic.self) + ), + userTail: 0, // no requests yet + ringMask: ringPtr.advanced(by: params.sq_off.ring_mask) + .assumingMemoryBound(to: UInt32.self).pointee, + flags: UnsafePointer>( + ringPtr.advanced(by: params.sq_off.flags) + .assumingMemoryBound(to: Atomic.self) + ), + array: UnsafeMutableBufferPointer( + start: ringPtr.advanced(by: params.sq_off.array) + .assumingMemoryBound(to: UInt32.self), + count: Int( + ringPtr.advanced(by: params.sq_off.ring_entries) + .assumingMemoryBound(to: UInt32.self).pointee) + ) + ) + + // fill submission ring array with 1:1 map to underlying SQEs + for i in 0...size, + /* prot: */ PROT_READ | PROT_WRITE, + /* flags: */ MAP_SHARED | MAP_POPULATE, + /* fd: */ ringDescriptor, + /* offset: */ __off_t(IORING_OFF_SQES) + ) + + if sqes == MAP_FAILED { + perror("mmap") + // TODO: error handling + fatalError("sqe mmap failed in ring setup") + } + + submissionQueueEntries = UnsafeMutableBufferPointer( + start: sqes!.assumingMemoryBound(to: io_uring_sqe.self), + count: Int(params.sq_entries) + ) + + let completionRing = CQRing( + kernelHead: UnsafePointer>( + ringPtr.advanced(by: params.cq_off.head) + .assumingMemoryBound(to: Atomic.self) + ), + kernelTail: UnsafePointer>( + ringPtr.advanced(by: params.cq_off.tail) + .assumingMemoryBound(to: Atomic.self) + ), + userHead: 0, // no completions yet + ringMask: ringPtr.advanced(by: params.cq_off.ring_mask) + .assumingMemoryBound(to: UInt32.self).pointee, + cqes: UnsafeBufferPointer( + start: ringPtr.advanced(by: params.cq_off.cqes) + .assumingMemoryBound(to: io_uring_cqe.self), + count: Int( + ringPtr.advanced(by: params.cq_off.ring_entries) + .assumingMemoryBound(to: UInt32.self).pointee) + ) + ) + + self.submissionRing = submissionRing + self.completionRing = completionRing + + self.ringFlags = params.flags + } + + private func _blockingConsumeCompletionGuts( + minimumCount: UInt32, + maximumCount: UInt32, + extraArgs: UnsafeMutablePointer? = nil, + consumer: (consuming IOCompletion?, IORingError?, Bool) throws -> Void + ) rethrows { + var count = 0 + while let completion = _tryConsumeCompletion(ring: completionRing) { + count += 1 + if completion.result < 0 { + try consumer(nil, IORingError(completionResult: completion.result), false) + } else { + try consumer(completion, nil, false) + } + if count == maximumCount { + try consumer(nil, nil, true) + return + } + } + + if count < minimumCount { + while count < minimumCount { + var sz = 0 + if extraArgs != nil { + sz = MemoryLayout.size + } + let res = io_uring_enter2( + ringDescriptor, + 0, + minimumCount, + IORING_ENTER_GETEVENTS, + extraArgs, + sz + ) + // error handling: + // EAGAIN / EINTR (try again), + // EBADF / EBADFD / EOPNOTSUPP / ENXIO + // (failure in ring lifetime management, fatal), + // EINVAL (bad constant flag?, fatal), + // EFAULT (bad address for argument from library, fatal) + // EBUSY (not enough space for events; implies events filled + // by kernel between kernelTail load and now) + if res >= 0 || res == -EBUSY { + break + } else if res == -EAGAIN || res == -EINTR { + continue + } + fatalError( + "fatal error in receiving requests: " + + Errno(rawValue: -res).debugDescription + ) + } + var count = 0 + while let completion = _tryConsumeCompletion(ring: completionRing) { + count += 1 + if completion.result < 0 { + try consumer(nil, IORingError(completionResult: completion.result), false) + } else { + try consumer(completion, nil, false) + } + if count == maximumCount { + break + } + } + try consumer(nil, nil, true) + } + } + + internal func _blockingConsumeOneCompletion( + extraArgs: UnsafeMutablePointer? = nil + ) throws -> IOCompletion { + var result: IOCompletion? = nil + try _blockingConsumeCompletionGuts(minimumCount: 1, maximumCount: 1, extraArgs: extraArgs) { + (completion: consuming IOCompletion?, error, done) in + if let error { + throw error + } + if let completion { + result = consume completion + } + } + return result.take()! + } + + public func blockingConsumeCompletion( + timeout: Duration? = nil + ) throws -> IOCompletion { + if let timeout { + var ts = __kernel_timespec( + tv_sec: timeout.components.seconds, + tv_nsec: timeout.components.attoseconds / 1_000_000_000 + ) + return try withUnsafePointer(to: &ts) { tsPtr in + var args = io_uring_getevents_arg( + sigmask: 0, + sigmask_sz: 0, + pad: 0, + ts: UInt64(UInt(bitPattern: tsPtr)) + ) + return try _blockingConsumeOneCompletion(extraArgs: &args) + } + } else { + return try _blockingConsumeOneCompletion() + } + } + + public func blockingConsumeCompletions( + minimumCount: UInt32 = 1, + timeout: Duration? = nil, + consumer: (consuming IOCompletion?, IORingError?, Bool) throws -> Void + ) throws { + if let timeout { + var ts = __kernel_timespec( + tv_sec: timeout.components.seconds, + tv_nsec: timeout.components.attoseconds / 1_000_000_000 + ) + return try withUnsafePointer(to: &ts) { tsPtr in + var args = io_uring_getevents_arg( + sigmask: 0, + sigmask_sz: 0, + pad: 0, + ts: UInt64(UInt(bitPattern: tsPtr)) + ) + try _blockingConsumeCompletionGuts( + minimumCount: minimumCount, maximumCount: UInt32.max, extraArgs: &args, + consumer: consumer) + } + } else { + try _blockingConsumeCompletionGuts( + minimumCount: minimumCount, maximumCount: UInt32.max, consumer: consumer) + } + } + + // public func peekNextCompletion() -> IOCompletion { + + // } + + public func tryConsumeCompletion() -> IOCompletion? { + return _tryConsumeCompletion(ring: completionRing) + } + + func _tryConsumeCompletion(ring: borrowing CQRing) -> IOCompletion? { + let tail = ring.kernelTail.pointee.load(ordering: .acquiring) + let head = ring.kernelHead.pointee.load(ordering: .acquiring) + + if tail != head { + // 32 byte copy - oh well + let res = ring.cqes[Int(head & ring.ringMask)] + ring.kernelHead.pointee.store(head &+ 1, ordering: .releasing) + return IOCompletion(rawValue: res) + } + + return nil + } + + internal func handleRegistrationResult(_ result: Int32) throws { + //TODO: error handling + } + + public mutating func registerEventFD(_ descriptor: FileDescriptor) throws { + var rawfd = descriptor.rawValue + let result = withUnsafePointer(to: &rawfd) { fdptr in + return io_uring_register( + ringDescriptor, + IORING_REGISTER_EVENTFD, + UnsafeMutableRawPointer(mutating: fdptr), + 1 + ) + } + try handleRegistrationResult(result) + } + + public mutating func unregisterEventFD() throws { + let result = io_uring_register( + ringDescriptor, + IORING_UNREGISTER_EVENTFD, + nil, + 0 + ) + try handleRegistrationResult(result) + } + + public mutating func registerFileSlots(count: Int) -> RegisteredResources< + IORingFileSlot.Resource + > { + precondition(_registeredFiles == nil) + precondition(count < UInt32.max) + let files = [UInt32](repeating: UInt32.max, count: count) + + let regResult = files.withUnsafeBufferPointer { bPtr in + io_uring_register( + self.ringDescriptor, + IORING_REGISTER_FILES, + UnsafeMutableRawPointer(mutating: bPtr.baseAddress!), + UInt32(truncatingIfNeeded: count) + ) + } + + // TODO: error handling + _registeredFiles = files + return registeredFileSlots + } + + public func unregisterFiles() { + fatalError("failed to unregister files") + } + + public var registeredFileSlots: RegisteredResources { + RegisteredResources(resources: _registeredFiles ?? []) + } + + public mutating func registerBuffers(_ buffers: some Collection) + -> RegisteredResources + { + precondition(buffers.count < UInt32.max) + precondition(_registeredBuffers == nil) + //TODO: check if io_uring has preconditions it needs for the buffers (e.g. alignment) + let iovecs = buffers.map { $0.to_iovec() } + let regResult = iovecs.withUnsafeBufferPointer { bPtr in + io_uring_register( + self.ringDescriptor, + IORING_REGISTER_BUFFERS, + UnsafeMutableRawPointer(mutating: bPtr.baseAddress!), + UInt32(truncatingIfNeeded: buffers.count) + ) + } + + // TODO: error handling + _registeredBuffers = iovecs + return registeredBuffers + } + + public mutating func registerBuffers(_ buffers: UnsafeMutableRawBufferPointer...) + -> RegisteredResources + { + registerBuffers(buffers) + } + + public struct RegisteredResources: RandomAccessCollection { + let resources: [T] + + public var startIndex: Int { 0 } + public var endIndex: Int { resources.endIndex } + init(resources: [T]) { + self.resources = resources + } + public subscript(position: Int) -> IOResource { + IOResource(resource: resources[position], index: position) + } + public subscript(position: UInt16) -> IOResource { + IOResource(resource: resources[Int(position)], index: Int(position)) + } + } + + public var registeredBuffers: RegisteredResources { + RegisteredResources(resources: _registeredBuffers ?? []) + } + + public func unregisterBuffers() { + fatalError("failed to unregister buffers: TODO") + } + + public func submitPreparedRequests() throws { + try _submitRequests(ring: submissionRing, ringDescriptor: ringDescriptor) + } + + public func submitPreparedRequestsAndConsumeCompletions( + minimumCount: UInt32 = 1, + timeout: Duration? = nil, + consumer: (consuming IOCompletion?, IORingError?, Bool) throws -> Void + ) throws { + //TODO: optimize this to one uring_enter + try submitPreparedRequests() + try blockingConsumeCompletions( + minimumCount: minimumCount, + timeout: timeout, + consumer: consumer + ) + } + + public mutating func prepare(request: __owned IORequest) -> Bool { + var raw: RawIORequest? = request.makeRawRequest() + return _writeRequest( + raw.take()!, ring: &submissionRing, submissionQueueEntries: submissionQueueEntries) + } + + mutating func prepare(linkedRequests: some BidirectionalCollection) { + guard linkedRequests.count > 0 else { + return + } + let last = linkedRequests.last! + for req in linkedRequests.dropLast() { + var raw = req.makeRawRequest() + raw.linkToNextRequest() + _writeRequest( + raw, ring: &submissionRing, submissionQueueEntries: submissionQueueEntries) + } + _writeRequest( + last.makeRawRequest(), ring: &submissionRing, + submissionQueueEntries: submissionQueueEntries) + } + + //@inlinable //TODO: make sure the array allocation gets optimized out... + public mutating func prepare(linkedRequests: IORequest...) { + prepare(linkedRequests: linkedRequests) + } + + public mutating func submit(linkedRequests: IORequest...) throws { + prepare(linkedRequests: linkedRequests) + try submitPreparedRequests() + } + + deinit { + munmap(ringPtr, ringSize) + munmap( + UnsafeMutableRawPointer(submissionQueueEntries.baseAddress!), + submissionQueueEntries.count * MemoryLayout.size + ) + close(ringDescriptor) + } +} diff --git a/Sources/System/IORingError.swift b/Sources/System/IORingError.swift new file mode 100644 index 00000000..fbd70bce --- /dev/null +++ b/Sources/System/IORingError.swift @@ -0,0 +1,10 @@ +//TODO: make this not an enum +public enum IORingError: Error, Equatable { + case missingRequiredFeatures + case operationCanceled + case unknown(errorCode: Int) + + internal init(completionResult: Int32) { + self = .unknown(errorCode: Int(completionResult)) //TODO, flesh this out + } +} diff --git a/Sources/System/Internals/WindowsSyscallAdapters.swift b/Sources/System/Internals/WindowsSyscallAdapters.swift index d56d33e4..706881ec 100644 --- a/Sources/System/Internals/WindowsSyscallAdapters.swift +++ b/Sources/System/Internals/WindowsSyscallAdapters.swift @@ -187,7 +187,7 @@ internal func pwrite( internal func pipe( _ fds: UnsafeMutablePointer, bytesReserved: UInt32 = 4096 ) -> CInt { -  return _pipe(fds, bytesReserved, _O_BINARY | _O_NOINHERIT); + return _pipe(fds, bytesReserved, _O_BINARY | _O_NOINHERIT); } @inline(__always) diff --git a/Sources/System/RawIORequest.swift b/Sources/System/RawIORequest.swift new file mode 100644 index 00000000..50c97c61 --- /dev/null +++ b/Sources/System/RawIORequest.swift @@ -0,0 +1,188 @@ +// TODO: investigate @usableFromInline / @_implementationOnly dichotomy +@_implementationOnly import CSystem +@_implementationOnly import struct CSystem.io_uring_sqe + +//TODO: make this internal +public struct RawIORequest: ~Copyable { + var rawValue: io_uring_sqe + var path: FilePath? //buffer owner for the path pointer that the sqe may have + + public init() { + self.rawValue = io_uring_sqe() + } +} + +extension RawIORequest { + enum Operation: UInt8 { + case nop = 0 + case readv = 1 + case writev = 2 + case fsync = 3 + case readFixed = 4 + case writeFixed = 5 + case pollAdd = 6 + case pollRemove = 7 + case syncFileRange = 8 + case sendMessage = 9 + case receiveMessage = 10 + // ... + case link_timeout = 15 + // ... + case openAt = 18 + case close = 19 + case filesUpdate = 20 + case statx = 21 + case read = 22 + case write = 23 + // ... + case openAt2 = 28 + // ... + case unlinkAt = 36 + } + + public struct Flags: OptionSet, Hashable, Codable { + public let rawValue: UInt8 + + public init(rawValue: UInt8) { + self.rawValue = rawValue + } + + public static let fixedFile = Flags(rawValue: 1 << 0) + public static let drainQueue = Flags(rawValue: 1 << 1) + public static let linkRequest = Flags(rawValue: 1 << 2) + public static let hardlinkRequest = Flags(rawValue: 1 << 3) + public static let asynchronous = Flags(rawValue: 1 << 4) + public static let selectBuffer = Flags(rawValue: 1 << 5) + public static let skipSuccess = Flags(rawValue: 1 << 6) + } + + var operation: Operation { + get { Operation(rawValue: rawValue.opcode)! } + set { rawValue.opcode = newValue.rawValue } + } + + public var flags: Flags { + get { Flags(rawValue: rawValue.flags) } + set { rawValue.flags = newValue.rawValue } + } + + public mutating func linkToNextRequest() { + flags = Flags(rawValue: flags.rawValue | Flags.linkRequest.rawValue) + } + + public var fileDescriptor: FileDescriptor { + get { FileDescriptor(rawValue: rawValue.fd) } + set { rawValue.fd = newValue.rawValue } + } + + public var offset: UInt64? { + get { + if (rawValue.off == UInt64.max) { + return nil + } else { + return rawValue.off + } + } + set { + if let val = newValue { + rawValue.off = val + } else { + rawValue.off = UInt64.max + } + } + } + + public var buffer: UnsafeMutableRawBufferPointer { + get { + let ptr = UnsafeMutableRawPointer(bitPattern: UInt(exactly: rawValue.addr)!) + return UnsafeMutableRawBufferPointer(start: ptr, count: Int(rawValue.len)) + } + + set { + // TODO: cleanup? + rawValue.addr = UInt64(Int(bitPattern: newValue.baseAddress!)) + rawValue.len = UInt32(exactly: newValue.count)! + } + } + + public enum RequestFlags { + case readWriteFlags(ReadWriteFlags) + // case fsyncFlags(FsyncFlags?) + // poll_events + // poll32_events + // sync_range_flags + // msg_flags + case timeoutFlags(TimeOutFlags) + // accept_flags + // cancel_flags + case openFlags(FileDescriptor.OpenOptions) + // statx_flags + // fadvise_advice + // splice_flags + } + + public struct ReadWriteFlags: OptionSet, Hashable, Codable { + public var rawValue: UInt32 + public init(rawValue: UInt32) { + self.rawValue = rawValue + } + + public static let highPriority = ReadWriteFlags(rawValue: 1 << 0) + + // sync with only data integrity + public static let dataSync = ReadWriteFlags(rawValue: 1 << 1) + + // sync with full data + file integrity + public static let fileSync = ReadWriteFlags(rawValue: 1 << 2) + + // return -EAGAIN if operation blocks + public static let noWait = ReadWriteFlags(rawValue: 1 << 3) + + // append to end of the file + public static let append = ReadWriteFlags(rawValue: 1 << 4) + } + + public struct TimeOutFlags: OptionSet, Hashable, Codable { + public var rawValue: UInt32 + + public init(rawValue: UInt32) { + self.rawValue = rawValue + } + + public static let relativeTime: RawIORequest.TimeOutFlags = TimeOutFlags(rawValue: 0) + public static let absoluteTime: RawIORequest.TimeOutFlags = TimeOutFlags(rawValue: 1 << 0) + } +} + +extension RawIORequest { + static func nop() -> RawIORequest { + var req: RawIORequest = RawIORequest() + req.operation = .nop + return req + } + + //TODO: typed errors + static func withTimeoutRequest( + linkedTo opEntry: UnsafeMutablePointer, + in timeoutEntry: UnsafeMutablePointer, + duration: Duration, + flags: TimeOutFlags, + work: () throws -> R) rethrows -> R { + + opEntry.pointee.flags |= Flags.linkRequest.rawValue + opEntry.pointee.off = 1 + var ts = __kernel_timespec( + tv_sec: duration.components.seconds, + tv_nsec: duration.components.attoseconds / 1_000_000_000 + ) + return try withUnsafePointer(to: &ts) { tsPtr in + var req: RawIORequest = RawIORequest() + req.operation = .link_timeout + req.rawValue.timeout_flags = flags.rawValue + req.rawValue.len = 1 + req.rawValue.addr = UInt64(UInt(bitPattern: tsPtr)) + timeoutEntry.pointee = req.rawValue + return try work() + } + } +} \ No newline at end of file diff --git a/Tests/SystemTests/IORequestTests.swift b/Tests/SystemTests/IORequestTests.swift new file mode 100644 index 00000000..4aaf7543 --- /dev/null +++ b/Tests/SystemTests/IORequestTests.swift @@ -0,0 +1,63 @@ +import XCTest + +#if SYSTEM_PACKAGE +@testable import SystemPackage +#else +import System +#endif + +func requestBytes(_ request: consuming RawIORequest) -> [UInt8] { + return withUnsafePointer(to: request) { + let requestBuf = UnsafeBufferPointer(start: $0, count: 1) + let rawBytes = UnsafeRawBufferPointer(requestBuf) + return .init(rawBytes) + } +} + +// This test suite compares various IORequests bit-for-bit to IORequests +// that were generated with liburing or manually written out, +// which are known to work correctly. +final class IORequestTests: XCTestCase { + func testNop() { + let req = IORequest.nop().makeRawRequest() + let sourceBytes = requestBytes(req) + // convenient property of nop: it's all zeros! + // for some unknown reason, liburing sets the fd field to -1. + // we're not trying to be bug-compatible with it, so 0 *should* work. + XCTAssertEqual(sourceBytes, .init(repeating: 0, count: 64)) + } + + func testOpenatFixedFile() throws { + let pathPtr = UnsafePointer(bitPattern: 0x414141410badf00d)! + let fileSlot: IORingFileSlot = IORingFileSlot(resource: UInt32.max, index: 0) + let req = IORequest.opening(FilePath(platformString: pathPtr), + in: FileDescriptor(rawValue: -100), + into: fileSlot, + mode: .readOnly, + options: [], + permissions: nil + ) + + let expectedRequest: [UInt8] = { + var bin = [UInt8].init(repeating: 0, count: 64) + bin[0] = 0x12 // opcode for the request + // bin[1] = 0 - no request flags + // bin[2...3] = 0 - padding + bin[4...7] = [0x9c, 0xff, 0xff, 0xff] // -100 in UInt32 - dirfd + // bin[8...15] = 0 - zeroes + withUnsafeBytes(of: pathPtr) { + // path pointer + bin[16...23] = ArraySlice($0) + } + // bin[24...43] = 0 - zeroes + withUnsafeBytes(of: UInt32(fileSlot.index + 1)) { + // file index + 1 - yes, unfortunately + bin[44...47] = ArraySlice($0) + } + return bin + }() + + let actualRequest = requestBytes(req.makeRawRequest()) + XCTAssertEqual(expectedRequest, actualRequest) + } +} diff --git a/Tests/SystemTests/IORingTests.swift b/Tests/SystemTests/IORingTests.swift new file mode 100644 index 00000000..306516be --- /dev/null +++ b/Tests/SystemTests/IORingTests.swift @@ -0,0 +1,20 @@ +import XCTest + +#if SYSTEM_PACKAGE +import SystemPackage +#else +import System +#endif + +final class IORingTests: XCTestCase { + func testInit() throws { + _ = try IORing(queueDepth: 32) + } + + func testNop() throws { + var ring = try IORing(queueDepth: 32) + try ring.submit(linkedRequests: IORequest.nop()) + let completion = try ring.blockingConsumeCompletion() + XCTAssertEqual(completion.result, 0) + } +}