Go concurrency idioms don’t work for duplex sockets

2022-01-30 ed.: I originally claimed that concurrent method calls on net.Conn weren’t safe, but actually the docs explicitly do guarantee that they are. However, the concern still applies to the io.ReadWriter interface.

The I/O routines in Go’s standard library have an inherently blocking interface: for example, the net.Conn implementation of Read will park the goroutine until the read succeeds or a deadline expires.

Usually that’s okay, because you can wrap that synchronous call in a function that communicates asynchronously over a channel once it’s done:

type readResult struct{
  buf []byte
  err error
}

func ReadAsync(
    r io.Reader,
    ch chan readResult) {
  buf := make([]byte, 512)
  n, err := r.Read(buf)
  if n > 0 {
    ch <- readResult{buf[:n], err}
  } else {
    ch <- readResult{nil, err}
  }
}

ch := make(chan readResult)
rw := dialSomething()
go ReadAsync(rw, ch)

for {
  select {
  case recv := <-ch:
    // do something with recv...
  // multiplex other I/O...
  }
}

This is the idiomatic approach. There's no busy-waiting: underneath the hood Go is actually using asynchronous syscalls (epoll/kqueue) to multiplex all the goroutines that are blocked on I/O. It scales to reading from multiple files/sockets at once (at least until the memory pressure from goroutine and read buffer overheads adds up).

However, this idiom does not work for duplex sockets with an io.ReadWriter interface where we want to multiplex reads and writes asynchronously on the same socket. Wanting a duplex socket is thoroughly reasonable for network sockets or UNIX domain sockets. Take the example below, where we want to copy messages from the channel srcCh to the rw socket, and also print any messages received from rw:

var rw io.ReadWriter
// Initialize rw

ch := make(chan readResult)
for {
  go ReadAsync(rw, ch)

  select {
  // If we have bytes for sending,
  // send them over the socket.
  case sendBytes := <-srcCh:
    // UNSAFE!  Concurrent access to rw :(
    if _, err := rw.Write(sendBytes); err != nil {
      // ...
    }
  // If the socket received a
  // message, print it.
  case recv := <-ch:
    fmt.Println("%v", recv.buf)
    // ...
  }
}

Some error handling has been skipped for brevity here, but there's a more serious concurrency problem. The socket rw is potentially being mutated by two goroutines concurrently: one calling Read() from ReadAsync(), and the other calling Write() from the select block.

For a io.ReadWriter, this isn't safe, because Read and Write potentially mutate opaque internal state, and there are no guarantees in the io docs that this doesn't happen. The compiler (or even the runtime) isn't smart enough to warn about it either, so this unsafe code is extra nefarious because it looks idiomatic. In general there isn't a way to safely multiplex a read from a io.ReadWriter with another goroutine that might call Write.

There's one pragmatic solution, which comes at the cost of generality. There are also three workarounds I've considered, none of which are very appealing.

Solution: use an actual net.Conn

Go's io package API design usually makes things simple: the APIs are blocking, and you can wrap things in a goroutine and a channel if you want to multiplex. The APIs feel familiar if you're used to the equivalent C APIs.

Duplex sockets appear to be an edge case where these design decisions actually work against simplicity, because the idiomatic pattern (that you'd use if you were reading and writing to separate files) is unsafe, and the high-level APIs (io.Reader and such) aren't flexible enough to provide an efficient alternative.

Others have called for the Reader to expose a "ready" API using channels, which I think would be nice. But I also think there's a less intrusive design, whereby io.ReadWriter implementations internally multiplex reads and writes using Go's existing epoll/kqueue-based runtime, and the documentation provides a guarantee that Read and Write can be called concurrently.

This is actually the case for net.Conn! According to the docs:

Multiple goroutines may invoke methods on a Conn simultaneously.

In the examples I gave (network and UNIX domain sockets), you do have a net.Conn interface, so using that type is safe. The downside is that your functions (which need nothing more than Read and Write) require the less general net.Conn interface as a parameter, or must rely on fragile documentation concerning the concurrency guarantees of the io.ReadWriter.

Alternative 1: no multiplexing

We can also use the net.Conn API to handle duplex events by setting a deadline and alternating between reading and writing:

conn.SetReadDeadline(timeout)
for {
  // First check if there is
  // anything to read.
  buf := make([]byte, 512)
  n, err := conn.Read(buf)
  switch {
  case errors.Is(err, os.ErrDeadlineExceeded):
    // Nothing to read; this is fine.
  case err != nil:
    // Actual error handling...
  default:
    // Do something with buf...
  }
  // conn is safe to use:
  // there's no other goroutine calling
  // functions on it.

  // Now check if there's anything
  // ready for sending.
  select {
  case sendBytes := <-srcCh:
    if _, err := conn.Write(sendBytes);
        err != nil {
      // ...
    }
  default:
    // Nothing to send; this is fine.
  }
}

This is now correct, but introduces a horrible trade-off between stutter (high timeouts adding send latency) vs high CPU usage polling for I/O.

Alterative 2: syscall APIs

We can use syscall.RawConn to get access to a raw file descriptor, and EpollWait or Select (from the deprecated syscall or newer golang.org/x/sys/unix packages) to multiplex I/O. This is gross because:

  1. You need to rewrite all your I/O to pass around file descriptors, instead of channels
  2. It's much less portable (BSD and Darwin have kqueue instead of Linux's epoll)
  3. There's much more boilerplate to write - and most of it would be re-implementing the multiplexing already inside Go's runtime/standard library

But at least you get true multiplexing/asynchronous I/O.

Alternative 3: use a third-party library

If you're dealing with I/O that's supported by lesismal/nbio, you can use that for real non-blocking I/O. However, its callback-based design doesn't let you multiplex events arriving over arbitrary channels.

There might be other third-party packages that do something similar?