C mutex getting blocked infinitely

144 Views Asked by At

So I gotta implement a queue in C with a system to protect it and make it accessible through multiple threads. To do so, I've used the producers-consumers logic:

#include <stdlib.h>
#include <threads.h>
#include <stdbool.h>
#include "queue.h"
#include <stdio.h>

int metidos=1;
int quitados=1;


// circular array
typedef struct _queue {
    int size;
    int used;
    int first;
    void **data;
    mtx_t * mutex;
    cnd_t * full;
    cnd_t * empty;
    bool terminado;
} _queue;

void q_terminar(queue q){
    printf("Entra en q_terminar\n");
    mtx_lock(q->mutex);
    q->terminado=true;
    mtx_unlock(q->mutex);
    cnd_broadcast(q->empty);
}

queue q_create(int size) {
    queue q = malloc(sizeof(_queue));

    q->size  = size;
    q->used  = 0;
    q->first = 0;
    q->data  = malloc(size * sizeof(void *));
    q->mutex = malloc(sizeof (mtx_t));
    q->full = malloc(sizeof(cnd_t));
    q->empty = malloc(sizeof(cnd_t));
    q->terminado=false;
    mtx_init(q->mutex, mtx_plain);
    cnd_init(q->full);
    cnd_init(q->empty);

    return q;
}

int q_elements(queue q) {
    mtx_lock(q->mutex);
    int res= q->used;
    mtx_unlock(q->mutex);
    return res;
}

int q_insert(queue q, void *elem) {
    if(q->terminado==true){
        return 1;
    }
    printf("Entra en insert\n");
    mtx_lock(q->mutex);
    while(q->used == q->size){
        printf("ESperando para insertar\n");
        cnd_wait(q->full, q->mutex);
        printf("Recibiendo señal para insertar\n");
    }
    //if(q->size == q->used) return -1;
    q->data[(q->first + q->used) % q->size] = elem;
    q->used++;

    printf("Insertado, este es el elemento %d en ser insertado\n",metidos);
    printf("En la cola hay %d elementos\n",q->used);
    metidos++;
    if(q->used == 1){
        cnd_broadcast(q->empty);
        printf("Enviando señal para despertar a los que borran\n");
    }

    mtx_unlock(q->mutex);

    return 0;
}

void *q_remove(queue q) {
    printf("Entra en remove\n");
    void *res;
    mtx_lock(q->mutex);
    if(q->terminado == true){
        mtx_unlock(q->mutex);
        return NULL;
    }
    while(q->used ==0 && q->terminado==false){
        printf("Esperando para quitar\n");
        cnd_wait(q->empty, q->mutex);
        printf("Recibiendo señal para quitar\n");
    }

    if(q->used == 0) {
        mtx_unlock(q->mutex);
        return NULL;
    }
    res = q->data[q->first];
    q->first = (q->first + 1) % q->size;
    q->used--;
    cnd_signal(q->full);
    printf("Quitado, este es el elemento %d en ser quitado\n", quitados);
    printf("En la cola hay %d elementos\n",q->used);
    quitados++;
    if(q->used == q->size-1){
        cnd_broadcast(q->full);
        printf("Enviando señal para despertar a los que insertan\n");
    }

    mtx_unlock(q->mutex);
    return res;
}

void q_destroy(queue q) {
    mtx_destroy(q->mutex);
    cnd_destroy(q->full);
    cnd_destroy(q->empty);
    free(q->full);
    free(q->empty);
    free(q->mutex);
    free(q->data);
    free(q);
}

Now, in the main file, I must separate the call the function "sum" does to get_entries in a separate thread, so I've made a function called get_entries_thread

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <dirent.h>
#include <stdlib.h>
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <openssl/evp.h>
#include <threads.h>

#include "options.h"
#include "queue.h"


#define MAX_PATH 1024
#define BLOCK_SIZE (10*1024*1024)
#define MAX_LINE_LENGTH (MAX_PATH * 2)


struct file_md5 {
    char *file;
    unsigned char *hash;
    unsigned int hash_size;
};

struct thread_get_entries_args{
    int id;
    char *dir;
    queue q;
};

struct thread_get_entries_info {
    thrd_t id;
    struct thread_get_entries_args * entries_args;
    cnd_t condicion;
};

thrd_t global;


void get_entries(char *dir, queue q);


void print_hash(struct file_md5 *md5) {
    for(int i = 0; i < md5->hash_size; i++) {
        printf("%02hhx", md5->hash[i]);
    }
}


void read_hash_file(char *file, char *dir, queue q) {
    FILE *fp;
    char line[MAX_LINE_LENGTH];
    char *file_name, *hash;
    int hash_len;

    if((fp = fopen(file, "r")) == NULL) {
        printf("Could not open %s : %s\n", file, strerror(errno));
        exit(0);
    }

    while(fgets(line, MAX_LINE_LENGTH, fp) != NULL) {
        char *field_break;
        struct file_md5 *md5 = malloc(sizeof(struct file_md5));

        if((field_break = strstr(line, ": ")) == NULL) {
            printf("Malformed md5 file\n");
            exit(0);
        }
        *field_break = '\0';

        file_name = line;
        hash      = field_break + 2;
        hash_len  = strlen(hash);

        md5->file      = malloc(strlen(file_name) + strlen(dir) + 2);
        sprintf(md5->file, "%s/%s", dir, file_name);
        md5->hash      = malloc(hash_len / 2);
        md5->hash_size = hash_len / 2;


        for(int i = 0; i < hash_len; i+=2)
            sscanf(hash + i, "%02hhx", &md5->hash[i / 2]);
        printf("Se llama a q_insert\n");
        q_insert(q, md5);
    }

    fclose(fp);
}


void sum_file(struct file_md5 *md5) {
    EVP_MD_CTX *mdctx;
    int nbytes;
    FILE *fp;
    char *buf;

    if((fp = fopen(md5->file, "r")) == NULL) {
        printf("Could not open %s\n", md5->file);
        return;
    }

    buf = malloc(BLOCK_SIZE);
    const EVP_MD *md = EVP_get_digestbyname("md5");

    mdctx = EVP_MD_CTX_create();
    EVP_DigestInit_ex(mdctx, md, NULL);

    while((nbytes = fread(buf, 1, BLOCK_SIZE, fp)) >0)
        EVP_DigestUpdate(mdctx, buf, nbytes);

    md5->hash = malloc(EVP_MAX_MD_SIZE);
    EVP_DigestFinal_ex(mdctx, md5->hash, &md5->hash_size);

    EVP_MD_CTX_destroy(mdctx);
    free(buf);
    fclose(fp);
}


void recurse(char *entry, void *arg) {
    queue q = * (queue *) arg;
    struct stat st;

    stat(entry, &st);

    if(S_ISDIR(st.st_mode))/////

        get_entries(entry, q);
}


void add_files(char *entry, void *arg) {
    queue q = * (queue *) arg;
    struct stat st;

    stat(entry, &st);

    if(S_ISREG(st.st_mode)) {
        printf("Se llama a q_insert\n");
        q_insert(q, strdup(entry));

    }
}


void walk_dir(char *dir, void (*action)(char *entry, void *arg), void *arg) {
    DIR *d;
    struct dirent *ent;
    char full_path[MAX_PATH];

    if((d = opendir(dir)) == NULL) {
        printf("Could not open dir %s\n", dir);
        return;
    }

    while((ent = readdir(d)) != NULL) {
        if(strcmp(ent->d_name, ".") == 0 || strcmp(ent->d_name, "..") ==0)
            continue;

        snprintf(full_path, MAX_PATH, "%s/%s", dir, ent->d_name);

        action(full_path, arg);
    }

    closedir(d);
}


void get_entries(char *dir, queue q) {
    walk_dir(dir, add_files, &q);
    walk_dir(dir, recurse, &q);
}


void check(struct options opt) {
    queue in_q;
    struct file_md5 *md5_in, md5_file;

    in_q  = q_create(opt.queue_size);

    read_hash_file(opt.file, opt.dir, in_q);

    while((md5_in = q_remove(in_q))) {
        printf("Se llama a q_remove in en check\n");
        md5_file.file = md5_in->file;

        sum_file(&md5_file);

        if(memcmp(md5_file.hash, md5_in->hash, md5_file.hash_size)!=0) {
            printf("File %s doesn't match.\nFound:    ", md5_file.file);
            print_hash(&md5_file);
            printf("\nExpected: ");
            print_hash(md5_in);
            printf("\n");
        }

        free(md5_file.hash);

        free(md5_in->file);
        free(md5_in->hash);
        free(md5_in);
    }

    q_destroy(in_q);
}

int get_entries_cast(void*ptr){
    struct thread_get_entries_args * entries_args = ptr;
    get_entries(entries_args->dir, entries_args->q);
    printf("Llamada a q_terminar\n");
    q_terminar(entries_args->q);
    return 0;
}

void start_get_entries_thread(char *dir, queue in_q){
    struct thread_get_entries_info* thread;
    thread = malloc(sizeof(struct thread_get_entries_info));
    if(thread == NULL){
        printf("Not enough memory available.\n");
        exit(1);
    }

    thread->entries_args = malloc(sizeof(struct thread_get_entries_args));
    thread->entries_args->dir=dir;
    thread->entries_args->q=in_q;
    thread->entries_args->id=0;
    if(0!= thrd_create(&thread->id, get_entries_cast, thread->entries_args)){
        printf("FALLO AL CREAR\n");
    }
    global = thread->id;
}


void sum(struct options opt) {
    queue in_q, out_q;
    char *ent;
    FILE *out;
    struct file_md5 *md5;
    int dirname_len;


    in_q  = q_create(opt.queue_size);
    out_q = q_create(opt.queue_size);
    start_get_entries_thread(opt.dir, in_q); //Use thread here instead of calling get_entries
    printf("Va a entrar en remove\n");
    while((ent = q_remove(in_q)) != NULL) {
        md5 = malloc(sizeof(struct file_md5));

        md5->file = ent;
        sum_file(md5);
        printf("Se llama a q_insert\n");
        q_insert(out_q, md5);
    }
    printf("Llamada a q_terminar\n");
    q_terminar(out_q);

    if((out = fopen(opt.file, "w")) == NULL) {
        printf("Could not open output file\n");
        exit(0);
    }

    dirname_len = strlen(opt.dir) + 1; // length of dir + /

    while((md5 = q_remove(out_q)) != NULL) {
        printf("Se llama a q_remove out\n");
        fprintf(out, "%s: ", md5->file + dirname_len);

        for(int i = 0; i < md5->hash_size; i++)
            fprintf(out, "%02hhx", md5->hash[i]);
        fprintf(out, "\n");

        free(md5->file);
        free(md5->hash);
        free(md5);
    }
    //if(thrd_join(thread->id, NULL)){
      //printf("FALLO AL UNIR\n");
    //}
    fclose(out);
    q_destroy(in_q);
    q_destroy(out_q);
}


int main(int argc, char *argv[]) {
    struct options opt;

    opt.num_threads = 5;
    opt.queue_size  = 1;
    opt.check       = true;
    opt.file        = NULL;
    opt.dir         = NULL;

    read_options (argc, argv, &opt);

    if(opt.check)
        check(opt);
    else
        sum(opt);

}

So the thing is, at first it works but then it gets infinitely stuck trying to insert. And I also don't now whether to use thrd_join or not. Thanks in advance.

1

There are 1 best solutions below

2
John Bollinger On

There are several weaknesses in the Queue code's use of synchonization objects. Among them:

  • q_terminar() wakes only the threads waiting on the q->empty CV, not those waiting on the q->full CV. Perhaps you anticipate that this function will be called only when the queue is empty, but it would be safer and very cheap for that function to additionally broadcast to q->full.

  • q_insert() reads q->terminado without first acquiring the queue's mutex. This will typically create a data race. No member of the queue structure other than the condition variables and the mutex itself should be accessed by any thread without holding the mutex locked, or relying on some other effective synchronization mechanism.

  • q->insert() checks q->terminado only once, at function entry. It should check again each time it wakes from a CV wait, and take appropriate action if it finds that member to be true.

  • q_insert() broadcasts to the q->empty CV only when, after an item is inserted, the queue size is exactly 1. This is probably sufficient , but I recommend performing that broadcast unconditionally, because it's easier to reason about that and be confident it's correct in all cases.

  • In q_remove(), I suggest moving the if(q->terminado == true) block after the while and removing the if (q->used == 0) block (the only way control can reach that point with q->used == 0 is if q->termindo == true). The current code is not erroneous in this regard, but it is redundant. If you like, add an assert(q->used == 0) in the if(q->terminado == true) block (I would).

  • It is redundant for q_remove() to both signal the q->full CV and and broadcast to it. The broadcast is probably the one to retain. However, it would be better to perform that broadcast unconditionally, instead of only when the queue size after item removal is exactly q->size - 1.

Additionally,

  • sum() tears down the queues without first joining the second thread. It is safe to destroy a mutex only if no thread holds it locked or can attempt to lock it after its destruction. It is safe to destroy a CV only if no thread is waiting on it, or can attempt to wait on it, signal it, or broadcast to it after its destruction. I don't see how you can be confident that your queue destruction is safe in those regards until the other thread(s) is confirmed to have terminated, and joining that thread appears to be the best way available to get such confirmation.

  • As a more general rule of thumb, you should either join or detach every thread but the initial one.

  • And as @StephanSchlecht oberved in comments, the initial thread enqueues items on out_q, but no other thread dequeues those items. The initial thread attempts to dequeue them only after dequeuing all the items from in_q. Thus, if there are more items than out_q can accommodate at once then the initial thread will eventually fill that queue and block trying to add another item. After that point it will not dequeue any more items from in_q, likely causing the other thread to fill that queue to capacity and block, too. Possible solutions include:

    • another thread could be tasked with handling out_q concurrently with the existing two, or
    • the initial thread could process output items immediately instead of enqueuing them for later processing, or
    • the initial thread could use a different data structure for output items, such as a linked list, that does not have a fixed capacity.