How to ensure consistency in process synchronization

22 Views Asked by At

I want to implement a message queue between processes. I used the mmap interface for persistence, while also using pthread_mutex_ and pthread_cond_t to ensure process synchronization. But I found that if one of the processes crashes after calling pthread_cond_wait to enter the conditional wait, other processes will permanently block when using pthread_cond_broadcast

I know that pthread_mutex_t has a robust attribute and consistency can be ensured through the pthread_mutex_consistent interface. How to ensure the consistency of pthread_cond_t

#include <stdio.h>
#include <errno.h>
#include <assert.h> 
#include <pthread.h>
#include "message_block.h"

#define MAX_MEM_SIZE (1 * 1024* 1024)

typedef struct
{
    int npos;
    int clen;
    char ptr[0];
} Msg_Node;

typedef struct
{
    unsigned char use;
    unsigned int idle[2];
    unsigned int busy[2];
    pthread_mutex_t idle_mtx;
    pthread_mutex_t busy_mtx;
    pthread_cond_t idle_cond;
    pthread_cond_t busy_cond;
} Msg_Entry;

typedef struct
{
    int fd;
    char *addr;
    char *base;
    Msg_Entry *entry;
} Msg_Blck;

void* MB_Init(char* filename)
{
    Msg_Blck *msgB = NULL;
    Msg_Node *msgN = NULL;

    pthread_mutexattr_t mtxattr;
    pthread_condattr_t condattr;

    msgB = (Msg_Blck*)calloc(1, sizeof(Msg_Blck));
    if (NULL == msgB)
    {
        return NULL;
    }

    msgB->fd = open(filename, O_CREAT|O_RDWR, 0777);
    if (-1 == msgB->fd)
    {
        perror("open failed:");
        goto ERR;
    }

    if (-1 == lseek(msgB->fd, (off_t)MAX_MEM_SIZE, SEEK_SET))
    {
        perror("lseek failed:");
        goto ERR;
    }

    write(msgB->fd, "", 1);

    msgB->addr = mmap(NULL, MAX_MEM_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, msgB->fd, 0);
    if (MAP_FAILED == msgB->addr)
    {
        printf("mmap failed! \n");
        goto ERR;
    }

    msgB->base = msgB->addr + sizeof(Msg_Entry);
    msgB->entry = (Msg_Entry*)(msgB->addr);

    if (__sync_bool_compare_and_swap(&(msgB->entry->use), 0, 1))
    {   
        msgN = (Msg_Node*)(msgB->base + 1);
        msgN->npos = 0;
        msgN->clen = MAX_MEM_SIZE - sizeof(Msg_Entry) - 1;

        msgB->entry->idle[0] = 1;
        msgB->entry->idle[1] = 1;

        msgB->entry->busy[0] = 0;
        msgB->entry->busy[1] = 0;

        pthread_mutexattr_init(&mtxattr);
        pthread_mutexattr_setpshared(&mtxattr, PTHREAD_PROCESS_SHARED);
        pthread_mutexattr_setrobust(&mtxattr, PTHREAD_MUTEX_ROBUST);
        pthread_mutex_init(&(msgB->entry->idle_mtx), &mtxattr);
        pthread_mutex_init(&(msgB->entry->busy_mtx), &mtxattr);
        pthread_mutexattr_destroy(&mtxattr);

        pthread_condattr_init(&condattr);
        pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED);
        pthread_cond_init(&(msgB->entry->idle_cond), &condattr);
        pthread_cond_init(&(msgB->entry->busy_cond), &condattr);
        pthread_condattr_destroy(&condattr);        

        msgB->entry->use = 1;
    }

    return msgB;

ERR:

    if (0 <= msgB->fd)
    {
        close(msgB->fd);
        msgB->fd = -1;
    }

    if (msgB->addr)
    {
        munmap(msgB->addr, MAX_MEM_SIZE);
        msgB->addr = NULL;
    }

    return NULL;
}

char* MB_Get_Empty(void *blk, int size)
{
    assert(blk && (0 < size));
    Msg_Blck *msgB = (Msg_Blck*)blk;
    
    Msg_Node *msgC = NULL;
    Msg_Node *msgH = NULL;
    Msg_Node *msgT = NULL;

    int length = sizeof(Msg_Node) + size; 

    if (EOWNERDEAD == pthread_mutex_lock(&(msgB->entry->idle_mtx)))
    {
        pthread_mutex_consistent(&(msgB->entry->idle_mtx));
    }

    while (msgH = (Msg_Node*)(msgB->base + msgB->entry->idle[0]), msgH->clen < length + sizeof(Msg_Node))
    {
        if (0 < msgH->npos)
        {
            msgC = (Msg_Node*)(msgB->base + msgB->entry->idle[0]);
            msgT = (Msg_Node*)(msgB->base + msgB->entry->idle[1]);
            msgH = (Msg_Node*)(msgB->base + msgC->npos);
            
            msgT->npos = (int)((char*)msgC - msgB->base);
            msgT = msgC;
            msgT->npos = 0;
            
            msgB->entry->idle[0] = (int)((char*)msgH - msgB->base);
            
            msgC = msgH;
            while (0 < msgC->npos)
            {
                Msg_Node *msgN = (Msg_Node*)(msgB->base + msgC->npos);
            
                if (msgB->entry->idle[0] + msgH->clen == msgC->npos)
                {
                    msgC->npos = msgN->npos;
                    msgH->clen += msgN->clen;
                    msgC = msgH;
            
                    continue;
                }
                else if (msgC->npos + msgN->clen == msgB->entry->idle[0])
                {
                    msgB->entry->idle[0] = (int)((char*)msgN - msgB->base);
                
                    msgC->npos = msgN->npos;
                    msgN->npos = msgH->npos;
                    msgN->clen += msgH->clen;
                    msgH = msgN;
                    msgC = msgH;

                    continue;
                }
            
                msgC = (Msg_Node*)(msgB->base + msgC->npos);
            }

            msgB->entry->idle[1] = (int)((char*)msgC - msgB->base);
        
            continue;
        }
    
        pthread_cond_wait(&(msgB->entry->idle_cond), &(msgB->entry->idle_mtx));
    }

    msgC = msgH;
    msgH = (Msg_Node*)((char*)msgC + length);
    msgH->npos = msgC->npos;
    msgH->clen = msgC->clen - length;
    
    msgB->entry->idle[0] += length;
    if (0 == msgH->npos)
    {
        msgB->entry->idle[1] += length;
    }
    
    msgC->npos = 0;
    msgC->clen = length;
    
    pthread_mutex_unlock(&(msgB->entry->idle_mtx));
    
    return msgC->ptr;
}

void MB_Put_Empty(void *blk, char *ptr)
{
    assert(blk && ptr);

    Msg_Blck *msgB = (Msg_Blck*)blk;
    Msg_Node *msgT = NULL;
    Msg_Node *msgN = NULL;

    if (EOWNERDEAD == pthread_mutex_lock(&(msgB->entry->idle_mtx)))
    {
        pthread_mutex_consistent(&(msgB->entry->idle_mtx));
    }

    msgT = (Msg_Node*)(msgB->base + msgB->entry->idle[1]);
    msgN = (Msg_Node*)(ptr - sizeof(Msg_Node)); 

    msgB->entry->idle[1] = (int)((char*)msgN - msgB->base);

    if (msgB->base == (char*)msgT)
    {
        msgB->entry->idle[0] = msgB->entry->idle[1];
    }
    else
    {
        msgT->npos = msgB->entry->idle[1];
    }
    
    pthread_mutex_unlock(&(msgB->entry->idle_mtx));
    pthread_cond_broadcast(&(msgB->entry->idle_cond));
}

char* MB_Get_Full(void *blk)
{
    assert(blk);
    
    Msg_Blck *msgB = (Msg_Blck*)blk;
    Msg_Node *msgH = NULL;

    if (EOWNERDEAD == pthread_mutex_lock(&(msgB->entry->busy_mtx)))
    {
        pthread_mutex_consistent(&(msgB->entry->busy_mtx));
    }

    while (msgH = (Msg_Node*)(msgB->base + msgB->entry->busy[0]), msgB->base == (char*)msgH)
    {
        pthread_cond_wait(&(msgB->entry->busy_cond), &(msgB->entry->busy_mtx));
    }

    msgB->entry->busy[0] = msgH->npos;
    if (0 == msgH->npos)
    {
        msgB->entry->busy[1] = 0;
    }

    pthread_mutex_unlock(&(msgB->entry->busy_mtx));

    return msgH->ptr;
}

void MB_Put_Full(void *blk, char *ptr)
{
    assert(blk && ptr);

    Msg_Blck *msgB = (Msg_Blck*)blk;
    Msg_Node *msgT = NULL;
    Msg_Node *msgN = NULL;

    if (EOWNERDEAD == pthread_mutex_lock(&(msgB->entry->busy_mtx)))
    {
        pthread_mutex_consistent(&(msgB->entry->busy_mtx));
    }

    msgT = (Msg_Node*)(msgB->base + msgB->entry->busy[1]);
    msgN = (Msg_Node*)(ptr - sizeof(Msg_Node)); 

    msgB->entry->busy[1] = (int)((char*)msgN - msgB->base);

    if (msgB->base == (char*)msgT)
    {
        msgB->entry->busy[0] = msgB->entry->busy[1];
    }
    else
    {
        msgT->npos = msgB->entry->busy[1];
    }
    
    pthread_mutex_unlock(&(msgB->entry->busy_mtx));
    pthread_cond_broadcast(&(msgB->entry->busy_cond));
}
0

There are 0 best solutions below