How to implement functional parallel-map in c?

72 Views Asked by At

A functional map is a function that applies a callback function to each element in an array and returns a list of callback return values. For example, in pseudocode, map(["hello", "world"], fn(x) => x + " meow") would return ["hello meow", "world meow"]

Since function pointers can be passed as parameters in C, it is possible to implement a functional map like below:

void** fp_map(void** array, size_t len, void* (*execute)(void*))
{
    // Allocate memory for return items
    void** returns = malloc(sizeof(void*) * len);
    if (returns == NULL) err(42, "Malloc failed, buy more ram");

    // Map values
    for (int i = 0; i < len; ++i)
        returns[i] = execute(array[i]);

    return returns;
}

If I write the following anonymous function in my main method, it would map ["hello", "world"] to ["hello meow", "world meow"]:

int main() {
    char* arr[] = {"hello", "world"};

    char** arr2 = fp_map((void**) arr, 2, ({ void* _func_ (void* x) {
        char* buf = malloc(sizeof(char) * (strlen(x) + 7));
        strcpy(buf, x);
        strcat(buf, " meow");
        return buf;
    }; _func_; }));

    for (int i = 0; i < 3; ++i)
        printf("%s\n", arr2[i]);
}

Now, I want to implement a parallel map to speed things up. Since this is purely functional, calls to the callback function with the same parameters would return the same return values. How can I use multithreading so that each call to execute() runs on a different thread, but still have the results return in an ordered array?

1

There are 1 best solutions below

0
KamilCuk On BEST ANSWER

I have written the following code, in which I create a context for the thread, then for every calculation I spawn a separate thread. Join all the threads and return the value.

#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <threads.h>

#define ERRORON(expr) \
    do { \
        if (expr) { \
            fprintf(stderr, "ERROR: %s\n", #expr); \
            exit(1); \
        } \
    } while (0)

#define ARRLEN(x) (sizeof(x) / sizeof(*x))

struct mythread_context {
    void **returns;
    void *(*execute)(void *);
    void **array;
    size_t i;
};

int mythread(void *arg) {
    const struct mythread_context *ctx = arg;
    // Execute the stuff to execute.
    ctx->returns[ctx->i] = ctx->execute(ctx->array[ctx->i]);
    return 0;
}

void **fp_map(void **array, size_t len, void *(*execute)(void *)) {
    // Allocate memory for return items
    void **returns = malloc(sizeof(*returns) * len);
    ERRORON(!returns);
    // Allocate memory for threads and contextes.
    thrd_t *threads = malloc(sizeof(*threads) * len);
    ERRORON(!threads);
    struct mythread_context *ctxs = malloc(sizeof(*ctxs) * len);
    ERRORON(!ctxs);
    for (size_t i = 0; i < len; ++i) {
        const struct mythread_context thisctx = {
            .returns = returns,
            .execute = execute,
            .array = array,
            .i = i,
        };
        ctxs[i] = thisctx;
        // Start a thread for every returns, execute and array index.
        int ret = thrd_create(&threads[i], mythread, &ctxs[i]);
        ERRORON(ret != thrd_success);
    }
    for (size_t i = 0; i < len; ++i) {
        // Join all threads. They will assing to returns separately concurrently.
        int ret = thrd_join(threads[i], NULL);
        ERRORON(ret != thrd_success);
    }
    free(threads);
    free(ctxs);
    return returns;
}

void *appnend_to_char(void *x) {
    char *buf = malloc(sizeof(char) * (strlen(x) + 7));
    strcpy(buf, x);
    strcat(buf, " meow");
    return buf;
}

int main() {
    const char *arr[] = {"hello", "world"};
    char **arr2 = (char **)fp_map((void **)arr, ARRLEN(arr), appnend_to_char);
    for (size_t i = 0; i < ARRLEN(arr); ++i) {
        printf("%s\n", arr2[i]);
    }
    // free memory
    for (size_t i = 0; i < ARRLEN(arr); ++i) {
        free(arr2[i]);
    }
    free(arr2);
}

Alternatively, you can just seamlessly integrate with OpenMP, with just:

void **fp_map(void **array, size_t len, void *(*execute)(void *)) {
    void **returns = malloc(sizeof(*returns) * len);
    ERRORON(!returns);
    size_t i;
    #pragma omp parallel for
    for (size_t i = 0; i < len; ++i) {
        returns[i] = execute(array[i]);
    }
    return returns;
}

Notes:

  • ({ is a GCC extension, not part of C language. There are no lambdas or anonymous functions in C programming langauge.
  • I am not sure if C11 threads.h should be used or rather POSIX pthreads should be preferred. The interface is very similar.
  • The context is rather big, it could be optimized. The count of malloc coudl also be optimized.