Shared memory condition variable block after process crash

60 Views Asked by At

I'm running some process with POSIX cond var on shared memory, orig question: boost-ipc-condition-variable-blocks.

I run 2, 3 or 4 processes and if one of them stops, the other two get block on the cond var or on the mutex, it get solved only after the shared memory is deleted and recreated, so I understand that the cond var / mutex on the shared memory is corrupted.

OS: Ubuntu 18.04.5 LTS

Arch: aarch64

Found this question: process-shared-condition-variable-how-to-recover-after-one-process-dies, and implemented the solution using pthread_mutex_consistent, but it didn't helped,I put pthread_mutex_consistent on several places in the code (after error / after mutex lock / after pthread_cond_timedwait).
Code:

#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <pthread.h>
#include <signal.h>
#include <errno.h>

/***************************************************************************************************/

/* Simple ticket lock queue with pthreads
 * https://stackoverflow.com/questions/48011696/process-shared-condition-variable-how-to-recover-after-one-process-dies
 */

typedef struct ticket_lock {
    pthread_mutex_t mutex;
    pthread_cond_t  cond;
    int queue_head, queue_tail;
} ticket_lock_t;

static void
ticket_init(ticket_lock_t *t)
{
    pthread_mutexattr_t mattr;
    pthread_mutexattr_init(&mattr);
    pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
    pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
    pthread_mutex_init(&t->mutex, &mattr);
    pthread_mutexattr_destroy(&mattr);

    pthread_condattr_t cattr;
    pthread_condattr_init(&cattr);
    pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
    pthread_cond_init(&t->cond, &cattr);
    pthread_condattr_destroy(&cattr);

    t->queue_head = t->queue_tail = 0;
}

static void
ticket_broadcast(ticket_lock_t *ticket)
{
    printf("####### pthread_cond_broadcast\n");
    pthread_cond_broadcast(&ticket->cond);
}

static int
ticket_lock(ticket_lock_t *ticket)
{
    printf("####### pthread_mutex_lock\n");
    pthread_mutex_lock(&ticket->mutex);
    int queue_me = ticket->queue_tail++;

    while (queue_me > ticket->queue_head) {
        time_t sec = time(NULL) + 2;  /* 5s timeout */
        struct timespec ts = { .tv_sec = sec, .tv_nsec = 0 };

        printf("%i: waiting, current: %i  me: %i\n", getpid(), ticket->queue_head, queue_me);

        printf("####### pthread_cond_timedwait\n");
        int ret_wait = pthread_cond_timedwait(&ticket->cond, &ticket->mutex, &ts);
        int err_wait = errno;
        printf("retval_wait: %i\n", ret_wait);
        printf("error_wait: %s\n", strerror(err_wait));
        if (ret_wait == 0)
          continue;

        int ret_consis = pthread_mutex_consistent(&ticket->mutex);
        int err_consis = errno;
        printf("retval_consis: %i\n", ret_consis);
        printf("error_consis: %s\n", strerror(err_consis));
        if (ret_wait == EOWNERDEAD){
                perror("EOWNERDEAD");
        } else if (ret_wait == ETIMEDOUT){
                perror("ETIMEDOUT");
        } else {
              perror("pthread_cond_timedwait");
              exit(1);
        }

        /* Timeout, kick current user... */
        printf("kicking stale ticket %i\n", ticket->queue_head);
        pthread_mutex_consistent(&ticket->mutex);
        ticket->queue_head++;
        ticket_broadcast(ticket);
    }

    pthread_mutex_unlock(&ticket->mutex);
    return queue_me;
}

static void
ticket_unlock(ticket_lock_t *ticket, int me)
{
    pthread_mutex_lock(&ticket->mutex);
    if (ticket->queue_head == me) {  /* Normal case: we haven't timed out. */
        ticket->queue_head++;
        ticket_broadcast(ticket);
    }
    pthread_mutex_unlock(&ticket->mutex);
}


/***************************************************************************************************/
/* Shared memory */

#define SHM_NAME    "fifo_sched"
#define SHM_MAGIC   0xdeadbeef

struct sched_shm {
    int  size;
    int  magic;
    int  ready;

    /* sched stuff */
    ticket_lock_t queue;
};

static unsigned int shm_size = 256;
static struct sched_shm *shm = 0;

/* Create new shared memory segment */
static void
create_shm()
{
       int fd = shm_open(SHM_NAME, O_RDWR | O_CREAT | O_TRUNC, 0644);
       assert(fd != -1);
       int r = ftruncate(fd, shm_size);  assert(r == 0);
       void *pt = mmap(0, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
       assert(pt != MAP_FAILED);
       fprintf(stderr, "Created shared memory.\n");

       shm = pt;
       memset(shm, 0, sizeof(*shm));
       shm->size = shm_size;
       shm->magic = SHM_MAGIC;
       shm->ready = 0;

       ticket_init(&shm->queue);

       shm->ready = 1;
}

/* Attach existing shared memory segment */
static int
attach_shm()
{
    int fd = shm_open(SHM_NAME, O_RDWR, 0);
    if (fd == -1)  return 0;  /* Doesn't exist yet... */

    shm = mmap(0, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
    int err = errno;
    if (shm == MAP_FAILED){
                perror("mmap");
                printf("error: %i", err);
                exit(1);
          }
    fprintf(stderr, "Mapped shared memory.\n");

    assert(shm->magic == SHM_MAGIC);
    assert(shm->ready);
    return 1;
}

static void
shm_init()
{
    fprintf(stderr, "shm_init()\n");
    assert(shm_size >= sizeof(struct sched_shm));
    if (!attach_shm())
        create_shm();
}


/***************************************************************************************************/

int main()
{
    shm_init();

    while (1) {
        int ticket = ticket_lock(&shm->queue);
        printf("%i: start %i\n", getpid(), ticket);
        printf("%i: done  %i\n", getpid(), ticket);
        ticket_unlock(&shm->queue, ticket);
    }
    return 0;
}

Running 3 processes.
Logs of the first process (before one process crash and blocking):

####### pthread_mutex_lock
13254: waiting, current: 484123  me: 484125
####### pthread_cond_timedwait
retval_wait: 0
error_wait: No such file or directory
13254: waiting, current: 484124  me: 484125
####### pthread_cond_timedwait
retval_wait: 0
error_wait: No such file or directory
13254: start 484125
13254: done  484125
####### pthread_cond_broadcast

Logs for the Second / Third processes (before one process crash and blocking):

####### pthread_mutex_lock
23427: waiting, current: 695284  me: 695286
####### pthread_cond_timedwait
retval_wait: 0
error_wait: Success
23427: waiting, current: 695285  me: 695286
####### pthread_cond_timedwait
retval_wait: 0
error_wait: Success
23427: start 695286
23427: done  695286
####### pthread_cond_broadcast

After one process stop the other two are block on the mutex lock, the timedwait or on the broadcast.

Am I using the pthread_mutex_consistent wrong? are the pthread_cond_t and pthread_mutex_t not robust and safe for shared memory multiprocess usage?

0

There are 0 best solutions below