I'm running some process with POSIX cond var on shared memory, orig question: boost-ipc-condition-variable-blocks.
I run 2, 3 or 4 processes and if one of them stops, the other two get block on the cond var or on the mutex, it get solved only after the shared memory is deleted and recreated, so I understand that the cond var / mutex on the shared memory is corrupted.
OS: Ubuntu 18.04.5 LTS
Arch: aarch64
Found this question: process-shared-condition-variable-how-to-recover-after-one-process-dies, and implemented the solution using pthread_mutex_consistent, but it didn't helped,I put pthread_mutex_consistent on several places in the code (after error / after mutex lock / after pthread_cond_timedwait).
Code:
#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <pthread.h>
#include <signal.h>
#include <errno.h>
/***************************************************************************************************/
/* Simple ticket lock queue with pthreads
* https://stackoverflow.com/questions/48011696/process-shared-condition-variable-how-to-recover-after-one-process-dies
*/
typedef struct ticket_lock {
pthread_mutex_t mutex;
pthread_cond_t cond;
int queue_head, queue_tail;
} ticket_lock_t;
static void
ticket_init(ticket_lock_t *t)
{
pthread_mutexattr_t mattr;
pthread_mutexattr_init(&mattr);
pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
pthread_mutex_init(&t->mutex, &mattr);
pthread_mutexattr_destroy(&mattr);
pthread_condattr_t cattr;
pthread_condattr_init(&cattr);
pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
pthread_cond_init(&t->cond, &cattr);
pthread_condattr_destroy(&cattr);
t->queue_head = t->queue_tail = 0;
}
static void
ticket_broadcast(ticket_lock_t *ticket)
{
printf("####### pthread_cond_broadcast\n");
pthread_cond_broadcast(&ticket->cond);
}
static int
ticket_lock(ticket_lock_t *ticket)
{
printf("####### pthread_mutex_lock\n");
pthread_mutex_lock(&ticket->mutex);
int queue_me = ticket->queue_tail++;
while (queue_me > ticket->queue_head) {
time_t sec = time(NULL) + 2; /* 5s timeout */
struct timespec ts = { .tv_sec = sec, .tv_nsec = 0 };
printf("%i: waiting, current: %i me: %i\n", getpid(), ticket->queue_head, queue_me);
printf("####### pthread_cond_timedwait\n");
int ret_wait = pthread_cond_timedwait(&ticket->cond, &ticket->mutex, &ts);
int err_wait = errno;
printf("retval_wait: %i\n", ret_wait);
printf("error_wait: %s\n", strerror(err_wait));
if (ret_wait == 0)
continue;
int ret_consis = pthread_mutex_consistent(&ticket->mutex);
int err_consis = errno;
printf("retval_consis: %i\n", ret_consis);
printf("error_consis: %s\n", strerror(err_consis));
if (ret_wait == EOWNERDEAD){
perror("EOWNERDEAD");
} else if (ret_wait == ETIMEDOUT){
perror("ETIMEDOUT");
} else {
perror("pthread_cond_timedwait");
exit(1);
}
/* Timeout, kick current user... */
printf("kicking stale ticket %i\n", ticket->queue_head);
pthread_mutex_consistent(&ticket->mutex);
ticket->queue_head++;
ticket_broadcast(ticket);
}
pthread_mutex_unlock(&ticket->mutex);
return queue_me;
}
static void
ticket_unlock(ticket_lock_t *ticket, int me)
{
pthread_mutex_lock(&ticket->mutex);
if (ticket->queue_head == me) { /* Normal case: we haven't timed out. */
ticket->queue_head++;
ticket_broadcast(ticket);
}
pthread_mutex_unlock(&ticket->mutex);
}
/***************************************************************************************************/
/* Shared memory */
#define SHM_NAME "fifo_sched"
#define SHM_MAGIC 0xdeadbeef
struct sched_shm {
int size;
int magic;
int ready;
/* sched stuff */
ticket_lock_t queue;
};
static unsigned int shm_size = 256;
static struct sched_shm *shm = 0;
/* Create new shared memory segment */
static void
create_shm()
{
int fd = shm_open(SHM_NAME, O_RDWR | O_CREAT | O_TRUNC, 0644);
assert(fd != -1);
int r = ftruncate(fd, shm_size); assert(r == 0);
void *pt = mmap(0, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
assert(pt != MAP_FAILED);
fprintf(stderr, "Created shared memory.\n");
shm = pt;
memset(shm, 0, sizeof(*shm));
shm->size = shm_size;
shm->magic = SHM_MAGIC;
shm->ready = 0;
ticket_init(&shm->queue);
shm->ready = 1;
}
/* Attach existing shared memory segment */
static int
attach_shm()
{
int fd = shm_open(SHM_NAME, O_RDWR, 0);
if (fd == -1) return 0; /* Doesn't exist yet... */
shm = mmap(0, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
int err = errno;
if (shm == MAP_FAILED){
perror("mmap");
printf("error: %i", err);
exit(1);
}
fprintf(stderr, "Mapped shared memory.\n");
assert(shm->magic == SHM_MAGIC);
assert(shm->ready);
return 1;
}
static void
shm_init()
{
fprintf(stderr, "shm_init()\n");
assert(shm_size >= sizeof(struct sched_shm));
if (!attach_shm())
create_shm();
}
/***************************************************************************************************/
int main()
{
shm_init();
while (1) {
int ticket = ticket_lock(&shm->queue);
printf("%i: start %i\n", getpid(), ticket);
printf("%i: done %i\n", getpid(), ticket);
ticket_unlock(&shm->queue, ticket);
}
return 0;
}
Running 3 processes.
Logs of the first process (before one process crash and blocking):
####### pthread_mutex_lock
13254: waiting, current: 484123 me: 484125
####### pthread_cond_timedwait
retval_wait: 0
error_wait: No such file or directory
13254: waiting, current: 484124 me: 484125
####### pthread_cond_timedwait
retval_wait: 0
error_wait: No such file or directory
13254: start 484125
13254: done 484125
####### pthread_cond_broadcast
Logs for the Second / Third processes (before one process crash and blocking):
####### pthread_mutex_lock
23427: waiting, current: 695284 me: 695286
####### pthread_cond_timedwait
retval_wait: 0
error_wait: Success
23427: waiting, current: 695285 me: 695286
####### pthread_cond_timedwait
retval_wait: 0
error_wait: Success
23427: start 695286
23427: done 695286
####### pthread_cond_broadcast
After one process stop the other two are block on the mutex lock, the timedwait or on the broadcast.
Am I using the pthread_mutex_consistent wrong? are the pthread_cond_t and pthread_mutex_t not robust and safe for shared memory multiprocess usage?