src/sys/ioqueue

Source   Edit  

A per-thread eventqueue for dispatching over I/O

This is not meant to be a full-fledged eventqueue but rather a supplementary for other queues implementation.

Types

Event {.pure.} = enum
  Read,                     ## The resource is ready to be read from.
  Write,                    ## The resource is ready to be written to.
  PriorityRead,             ## The resource has high-priority data available to be read.
  Error,                    ## There is an error associated with the resource.
  Hangup                     ## The resource has been hung up, usually this means
                             ## a peer has closed its end of the channel.
Events that the operating system can signal. Source   Edit  
PrematureCloseDefect = object of Defect
  id*: int ## The unique ID of the resource, typically the resource handle,
           ## but is dependant on the target operating system.

A defect raised when a resource was invalidated while there is a waiter for it in the queue.

This is considered a programming error due to the fact that some operating system might keep reporting events for the "closed" resource since it might be kept alive by other hidden references (ie. dup on a fd will cause epoll to keep reporting event for the original even if the original is closed).

To avoid this, unregister the resource from the queue before invalidating it.

Source   Edit  
ReadyEvent = range[Read .. PriorityRead]
Events that can be registered to be waited for Source   Edit  

Procs

proc hash(fd: FD): Hash {.borrow, ...raises: [], tags: [], forbids: [].}
Source   Edit  
proc newPrematureCloseDefect(id: int): ref PrematureCloseDefect {....raises: [],
    tags: [], forbids: [].}
Creates a PrematureCloseDefect Source   Edit  
proc persist(fd: AnyFD)

Mark fd as a long-term event producer.

This allows the queue to skip registration of the fd with the OS in subsequent waits and might provide a sizable speed up.

However, this means poll() will always return when an event occurs on fd even if it is not being waited on and might degrade performance.

Deassociation can be done via unregister() <#unregister,AnyFD>_.

Note: Any FD marked as persistent must be unregistered before closing, even on Windows. Failure to do so will raise a Defect. This error checking will only happen on non-release builds.

Currently this is only implemented for Windows.

** Platform specific details **

  • On Windows, fd is permanently bound to the queue for the duration of its lifetime and cannot be unbound via unregister(), which also prevents it from being bound to any other queue.
  • On Windows, fd is set to skip posting a packet to IOCP if the operation is finished synchronously.
Source   Edit  
proc poll(runnable: var seq[Continuation]; timeout = none(Duration)) {.
    ...raises: [OSError, KeyError], tags: [], forbids: [].}

Poll the operating system for events and add continuations of which the resources they are waiting for are ready to runnable.

If timeout is none(Duration), wait indefinitely until the operating system signals an event. If timeout is DurationZero, returns immediately iff there aren't any continuations ready to be run.

timeout is not precise, and the actual wait time depends on the target operating system.

If the queue is empty, returns immediately.

Source   Edit  
proc run() {....raises: [OSError, KeyError, Exception, IOError],
             tags: [RootEffect, WriteIOEffect], forbids: [].}
Wait for events and run all pending continuations in the queue. Stops when the queue is empty. Source   Edit  
proc running(): bool {....raises: [], tags: [], forbids: [].}
Whether there are any continuations within the queue Source   Edit  
proc unregister(fd: AnyFD)

If fd was registered in the queue, remove it alongside its continuation.

Does nothing if the fd is not in the queue.

Platform specific details

  • On Windows, unregister will abort all ongoing IO in fd and its resources will only be collected in the next poll() iff the queue is still running.
  • On Windows, poll() might still be interrupted by activities on fd even after unregistration since fd will only be detached from IOCP after it and its duplicates are closed.
Source   Edit  
proc unregister(handle: Handle[AnyFD])
An overload of unregister for Handle Source   Edit  
proc wait(c: Continuation; fd: AnyFD; event: ReadyEvent): Continuation

Wait for the specified fd to be ready for the given event.

For higher efficiency, only wait for ready state when the fd signalled that it is not ready.

Only one continuation can be queued for any given fd per thread. If more than one is queued, ValueError will be raised.

Note: Any fd registered into the queue (via this procedure) should be unregistered before it is closed as the semantics differs between operating system for when an FD is closed while in the queue. If such scenario is detected, PrematureCloseDefect will be raised.

Platform specific details

  • This interface is not implemented on Windows since IOCP can be used to cover every use cases of this interface.
Source   Edit  
proc wait(c: Continuation; fd: Handle[AnyFD]; event: ReadyEvent): Continuation
An overload of wait for Handle. Source   Edit  
proc wait(fd: AnyFD; event: ReadyEvent) {.cpsMustJump, cpsMagicCall.}
Source   Edit  
proc wait(fd: Handle[AnyFD]; event: ReadyEvent) {.cpsMustJump, cpsMagicCall.}
Source   Edit  

Templates

template asyncio(prc: typed): untyped
Convenience alias to {.cps: Continuation.} for procedures wishing to use ioqueue. Source   Edit