How to synchronize threads with io_uring?

78 Views Asked by At

I’m new to systems programming and am toying with io_uring. I have the start of a design for a networked program and have identified some CPU-bound work that I think should be offloaded to a thread pool. I’m unsure, though, of the ways to synchronize that work with the ring thread and the tradeoffs associated.

The first solution that came to mind was a structure (e.g. queue) locked with pthread_mutex and pthread_cond. This seems inappropriate because I doubt that either io_uring_enter or pthread_cond_wait return often enough and under the right conditions to live in the same loop. Even if they did, it seems clumsy to introduce another syscall into this hot loop.

My current design involves a pair of file descriptors to be shared between the ring and the thread pool: one for each direction. Since there’s only one process, pointers are suitable as the messaging over the descriptors. If reads and writes are atomic, then the descriptors provide the synchronization: only e.g. read/write calls are needed on the pool side, regular io_uring operations on the other and no need to lock either these operations or the memory underlying the pointers.

I’m also aware of IORING_OP_MSG_RING but since the CPU-bound work can be cleanly separated, I’d rather schedule any subsequent IO on the (single) ring right after.

  • Are there any obvious problems with this approach?
  • Am I right to think that an anonymous pipe is the most appropriate type of descriptor to use here?
  • Is the PIPE_BUF limit the only condition for atomicity?
  • Are there other approaches I’m not considering? Is this among the most performant?
1

There are 1 best solutions below

0
Max Vu On

I found two excerpts from Jens Axboe that name IORING_OP_MSG_RING as a solution to this problem specifically.

From the slide deck (slide 31) of a 2022 Kernel Recipes talk:

[IORING_OP_MSG_RING] Useful for passing eg a work item pointer between threads that each have their own ring

And from the (currently, only) wiki article io_uring and networking in 2023 on the liburing GitHub page:

One use case might be a backend handling new connections and separate threads dealing with said connections, providing a way to pass a connection from one ring to another. io_uring_prep_msg_ring() is a way to set up such an SQE. Or it may be used directly from the thread handling a given connection, to offload expensive work to another thread.

Here’s a demonstration of this pattern, as I understand it:

#define _GNU_SOURCE // for gettid()
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <liburing.h>

typedef struct io_uring        io_uring;
typedef struct io_uring_params io_uring_params;
typedef struct io_uring_sqe    io_uring_sqe;
typedef struct io_uring_cqe    io_uring_cqe;

#define MQSIZE    64 // size of main-thread (work-sending) queue
#define TQSIZE    32 // size of work-thread (work-receiving) queue
#define NJOBS     32 // number of jobs
#define NTHREADS  4  // number of threads

// represents some unit of work
typedef struct job_unit {
  int input;
  int output;
  int done;
} job_unit;

void * work ( void * arg ) {
  io_uring ring;
  io_uring_sqe * sqe;
  io_uring_cqe * cqe;
  int root_fd = *( ( int * ) arg );
  int tid = gettid();

  io_uring_params params = {};

  // since each thread has a ring, only this thread will touch this ring
  params.flags |= IORING_SETUP_SINGLE_ISSUER;

  // the work-thread should process each incoming job sequentially --
  // there's probably little use in having queued jobs available, e.g.
  // between context switches
  // I'm don't feel confident in my understanding of these switches
  // (it is noted that the computation here is negligible)
  params.flags |= IORING_SETUP_COOP_TASKRUN;
  params.flags |= IORING_SETUP_DEFER_TASKRUN;

  // "share async backend", but not SQ/CQ
  params.flags |= IORING_SETUP_ATTACH_WQ;
  params.wq_fd = root_fd;

  assert( 0 == io_uring_queue_init_params( TQSIZE, &ring, &params ) );
  
  // tell main-thread ring about this one by sending its fd
  // IOSQE_CQE_SKIP_SUCCESS helps reduce bookkeeping by eliding the completion
  assert( ( sqe = io_uring_get_sqe( &ring ) ) );
  io_uring_prep_msg_ring( sqe, root_fd, 0, ring.ring_fd, 0 );
  io_uring_sqe_set_flags( sqe, IOSQE_CQE_SKIP_SUCCESS );
  io_uring_submit( &ring );

  while ( 1 ) {
    assert( 0 == io_uring_wait_cqe( &ring, &cqe ) );

    // stop signal: terminate thread
    if ( !cqe->user_data ) {
      io_uring_cqe_seen( &ring, cqe );
      break;
    }

    // mutate the struct in some interesting way
    job_unit * job = ( job_unit * ) cqe->user_data;
    printf( "thread #%d assigned job #%d\n", tid, job->input );
    job->output = job->input * 2; // the "work"
    job->done = 1;

    // send the pointer back
    assert( ( sqe = io_uring_get_sqe( &ring ) ) );
    io_uring_prep_msg_ring( sqe, root_fd, 0, ( uint64_t ) job, 0 );
    io_uring_sqe_set_flags( sqe, IOSQE_CQE_SKIP_SUCCESS );
    io_uring_cqe_seen( &ring, cqe );
    io_uring_submit( &ring );
  }

  printf( "thread #%d exit\n", tid );
  io_uring_queue_exit( &ring );
  return NULL;
}

int main () {
  io_uring       ring;
  io_uring_sqe * sqe = NULL;
  io_uring_cqe * cqe = NULL;
  io_uring_params params = {};
  pthread_t      threads[ NTHREADS ] = {};
  job_unit       jobs[ NJOBS ] = {};        // would usually be from heap
  int            work_fds[ NTHREADS ] = {}; // work-thread ring FDs
  int            work_fd_num = 0;           // dual iter/counter for work_fds
  int i;                                    // loop var

  params.flags |= IORING_SETUP_SINGLE_ISSUER;

  // see similar lines above in work()
  params.flags |= IORING_SETUP_COOP_TASKRUN;
  params.flags |= IORING_SETUP_DEFER_TASKRUN;

  // we don't test for message failure, but I think the style of this example
  // lends itself to the way libuv handles things, including callbacks that
  // can manage their own means of failure
  params.flags |= IORING_SETUP_SUBMIT_ALL;

  assert( 0 == io_uring_queue_init_params( MQSIZE, &ring, &params ) );

  // start threads, passing in our ring's fd
  for ( i = 0 ; i < NTHREADS ; i++ )
    assert( 0 == pthread_create( &threads[ i ], NULL, work, &ring.ring_fd ) );

  // fill out the table of work-thread ring FDs
  for ( i = 0 ; i < NTHREADS ; i++ ) {
    assert( 0 == io_uring_wait_cqe( &ring, &cqe ) );
    work_fds[ work_fd_num++ ] = cqe->user_data;
    io_uring_cqe_seen( &ring, cqe );
  }

  // dispatch jobs
  for ( i = 0 ; i < NJOBS ; i++ ) {
    work_fd_num = ( work_fd_num + 1 ) % NTHREADS;
    jobs[ i ].input = i;
    jobs[ i ].done = 0;
    assert( ( sqe = io_uring_get_sqe( &ring ) ) );
    io_uring_prep_msg_ring( sqe, work_fds[ work_fd_num ], 0,
      ( uint64_t ) &jobs[ i ], 0 );
    io_uring_sqe_set_flags( sqe, IOSQE_CQE_SKIP_SUCCESS );
  }
  io_uring_submit( &ring );

  // collect results
  for ( i = 0; i < NJOBS ; i++ ) {
    assert( 0 == io_uring_wait_cqe( &ring, &cqe ) );
    job_unit * job = ( job_unit * ) cqe->user_data;
    printf( "job %d done\n", job->input );
    io_uring_cqe_seen( &ring, cqe );
  }

  // broadcast shutdown to threads
  for ( i = 0 ; i < NTHREADS ; i++ ) {
    work_fd_num = ( work_fd_num + 1 ) % NTHREADS;
    assert( ( sqe = io_uring_get_sqe( &ring ) ) );
    io_uring_prep_msg_ring( sqe, work_fds[ work_fd_num ], 0, 0, 0 );
    io_uring_sqe_set_flags( sqe, IOSQE_CQE_SKIP_SUCCESS );
  }
  io_uring_submit( &ring );

  // join and report results
  for ( i = 0 ; i < NTHREADS ; i++ )
    assert( 0 == pthread_join( threads[ i ], NULL ) );
  for ( i = 0 ; i < NJOBS ; i++ )
    printf( "%-2d ", jobs[ i ].output );
  printf( "\n" );

  io_uring_queue_exit( &ring );
  return 0;
}

A few interesting points:

  • On reflection, the pointers-over-pipe idea is a particularly bad one with io_uring because the values written need to live until completion. Using the above example as a model, in order to send pointers to the array on the stack, a separate array of pointers would need to be allocated. A queue to manage writes to the queue, essentially.
  • IORING_OP_MSG_RING generates two events: the intended message to the receiver, but also a completion to the sender. The example swallows them with IOSQE_CQE_SKIP_SUCCESS.
  • Using one ring per thread indicates IORING_SETUP_SINGLE_ISSUER and IORING_SETUP_ATTACH_WQ as obvious optimizations. IORING_SETUP_COOP_TASKRUN lays out a compelling case but I’m still unsure whether IORING_SETUP_DEFER_TASKRUN is beneficial here.
  • ThreadSanitizer does not like this. It cites a data race on every access on the struct. Maybe I’ve done something wrong, but I’m wondering whether programs using this pattern are forever prevented from using the tool.

As I’ve indicated above, I’m a bit out of my depth here and so am writing this answer only prospectively and to make available my progress so far. If anyone coming after knows better, I’d be happy to accept another answer or amend this one.