Fastest approach to split and access mmaped data for each thread

51 Views Asked by At

This questions is related to this former question from which I will copy the most related parts (Note the change in File2) :

Problem Description:

I've written a multithreaded C program using POSIX to process two large TSV input files. My goal is to generate output files based on these inputs, but I'm facing performance issues.

  • File 1: Contains country-key pairs (e.g., African 0\nAmerindian 1\n...).

  • File 2: A large n x m matrix with single digit values corresponding to keys from File 1.

    Example:

    0\t3\t4\t1\t2...
    
    0\t3\t4\t1\t2...
    
  • File 3: Another large matrix (n x m+1), with columns of 0, 1, 9, and an index column of integer values which can vary in length . Example:

    111111\t0\t1\t0\t0\t1...
    
    22222233\t0\t1\t1\t1\t1...
    

Objective: For each country, I need to create an output file (country.txt) with the same structure as File 3. The output logic depends on matching keys from File 2 with values in File 3.

Output example:

For Amerindian.txt with key 1 we copy the first value from File 3, the index, then compare each value to the key value of the specific country. So that we observe a 9 for a mismatch at every position despite position 4 for row in File2 so that at this position we copy the value from File3 which leads to the following output:

  111111\t9\t9\t9\t0\t9...

  22222233\t9\t9\t9\t1\t9...

In the former question I was pointed to mmap in C which was suggested as the fastest approach. I implemented this idea into my program and reworked the main and processing function. Which looks like the following:

main:

//library loading, function,variable and input parameter declaration...
//process origins file
FILE *originsFile = fopen(argv[optind], "r");
originCount = readOrigins(originsFile, &origins);
printf("Number of origins parsed: %d\n", originCount);

//create mmap for 
int tsv_fd = open(argv[optind + 1], O_RDONLY);
int txt_fd = open(argv[optind + 2], O_RDONLY);
struct stat tsv_stat, txt_stat;
fstat(tsv_fd, &tsv_stat);
fstat(txt_fd, &txt_stat);


char *tsv_mapped = mmap(NULL, tsv_stat.st_size, PROT_READ, MAP_SHARED, tsv_fd, 0);
char *txt_mapped = mmap(NULL, txt_stat.st_size, PROT_READ, MAP_SHARED, txt_fd, 0);

close(txt_fd);
close(tsv_fd);
// Check for mmap errors
if (tsv_mapped == MAP_FAILED || txt_mapped == MAP_FAILED) {
    perror("Error mapping files");
    // Handle error: close file descriptors, possibly exit
}


// Initialize txtLineCount and tsvLineCount appropriately
txtLineCount = countLinesInFile(txt_mapped, txt_stat.st_size);

//calculate workload for each thread
    int linesPerThread = txtLineCount / numThreads;
    int remainingLines = txtLineCount % numThreads;

//check that workload is reasonable, if txtLineCount exceeds numThreads set numThreads to number of lines
if (txtLineCount < numThreads) {
    numThreads = txtLineCount; // Limit the number of threads to the number of lines
//calculate workload for each thread
    int linesPerThread = txtLineCount / numThreads;
    int remainingLines = txtLineCount % numThreads;
}
    pthread_t threads[numThreads];
    ThreadArg *threadArgs[numThreads];
    
    for (int originIndex = 0; originIndex < originCount; ++originIndex) {
        printf("Processing origin: %s\n", origins[originIndex].name);
    int currentLine = 0;
    for (int i = 0; i < numThreads; ++i) {

printf("Debug: Allocating ThreadArg for thread %d\n", i);

   threadArgs[i] = (ThreadArg *)malloc(sizeof(ThreadArg));
        threadArgs[i]->startLine = currentLine;
        threadArgs[i]->endLine = currentLine + linesPerThread;
        // Assign the memory-mapped file pointers
        if (i < remainingLines) threadArgs[i]->endLine++;
        currentLine = threadArgs[i]->endLine;
        
        threadArgs[i]->tsvMapped = tsv_mapped;
        threadArgs[i]->txtMapped = txt_mapped;
        threadArgs[i]->specificOriginIndex = origins[originIndex].code;
        threadArgs[i]->txt_stat = txt_stat.st_size;
        threadArgs[i]->tsv_stat = tsv_stat.st_size;
        threadArgs[i]->threadnum = i;

        
printf("Debug: Thread %d - startLine: %d, endLine: %d, specificOriginIndex: %d\n", i, threadArgs[i]->startLine, threadArgs[i]->endLine, threadArgs[i]->specificOriginIndex);
   if (pthread_create(&threads[i], NULL, processFile, threadArgs[i])) {
        fprintf(stderr, "Failed to allocate memory for thread arguments\n");
//cleanup and return..

processFile and getLinesFromMappedFile

void *processFile(void *arg) {
    ThreadArg *threadArg = (ThreadArg *)arg;
    char* getLinesFromMappedFile(const char* mappedFile, int startLine, int endLine, size_t fileSize, size_t* byteRange);
    char tempFileName[1024];
    sprintf(tempFileName, "temp_output_%d_%d.tmp", threadArg->specificOriginIndex, threadArg->threadnum);
    FILE *outputFile = fopen(tempFileName, "a"); // Append mode

    if (!outputFile) {
        fprintf(stderr, "Failed to open output file.\n");
        return NULL;
    }
    
    size_t byteRangeTsv;
    const char* segmentTsv = getLinesFromMappedFile(threadArg->tsvMapped, 
                                                    threadArg->startLine, 
                                                    threadArg->endLine, 
                                                    threadArg->tsv_stat, 
                                                    &byteRangeTsv);

    size_t byteRangeTxt;
    const char* segmentTxt = getLinesFromMappedFile(threadArg->txtMapped, 
                                                    threadArg->startLine, 
                                                    threadArg->endLine, 
                                                    threadArg->txt_stat, 
                                                    &byteRangeTxt);

    if (!segmentTsv || !segmentTxt) {
        fprintf(stderr, "Lines not found in one of the files.\n");
        if (outputFile) fclose(outputFile);
        return NULL;
    }
    int tsvIndex;
    int txtIndex;
printf("%ld,%ld", byteRangeTsv, byteRangeTxt);
    // Process the segments as per the original logic
    for (size_t tsvIndex = 0, txtIndex = 0; tsvIndex < byteRangeTsv && txtIndex < byteRangeTxt; ) {
        if(txtIndex == 0){        
        // Copy the first index integer from file3 to the output
        while (segmentTxt[txtIndex] != '\t' && segmentTxt[txtIndex] != '\0') {
            fprintf(outputFile, "%c", segmentTxt[txtIndex]);
            txtIndex++;
            tsvIndex++;
        }

        // Skip the tab character in file3
        if (segmentTxt[txtIndex] == '\t') {
            fprintf(outputFile, "\t");
            txtIndex++;
        }
    }
        // Process the lines
        while (segmentTsv[tsvIndex] != '\n' && segmentTxt[txtIndex] != '\n') {
            if (segmentTsv[tsvIndex] == '\t') {
                fprintf(outputFile, "\t");
             txtIndex++; 
             tsvIndex++; 
            }                        
            if (segmentTsv[tsvIndex] == threadArg->specificOriginIndex) {
                fprintf(outputFile, "%c\t", segmentTxt[txtIndex]);
            } else {
                fprintf(outputFile, "9\t");
            }
             txtIndex++; 
             tsvIndex++;
        }

        fprintf(outputFile, "\n");
        // Move to the start of the next line
        while (segmentTsv[tsvIndex] != '\n' && tsvIndex < byteRangeTsv) tsvIndex++;
        while (segmentTxt[txtIndex] != '\n' && txtIndex < byteRangeTxt) txtIndex++;
        if (segmentTsv[tsvIndex] == '\n') tsvIndex++;
        if (segmentTxt[txtIndex] == '\n') txtIndex++;
    }

    fclose(outputFile);
    return NULL;
}


char* getLinesFromMappedFile(const char* mappedFile, int startLine, int endLine, size_t fileSize, size_t* byteRange) {
    if (mappedFile == NULL || startLine < 0 || endLine < startLine) {
        return NULL; // Invalid parameters
    }

    const char* startPtr = NULL;
    const char* endPtr = NULL;
    int currentLine = 0;

    for (size_t i = 0; i < fileSize; ++i) {
        if (currentLine == startLine && startPtr == NULL) {
            startPtr = mappedFile + i; // Set start pointer at the beginning of startLine
        }

        if (mappedFile[i] == '\n') {
            if (currentLine == endLine) {
                endPtr = mappedFile + i; // Set end pointer at the end of endLine
                break;
            }
            currentLine++;
        }
    }

    if (startPtr == NULL) {
        return NULL; // startLine not found
    }

    // If endPtr is not set, use fileSize as end
    if (endPtr == NULL) {
        endPtr = mappedFile + fileSize;
    }

    // Calculate the byte range to read
    *byteRange = endPtr - startPtr;

    return (char*)startPtr; // Cast to non-const to match function signature
}

Question : What is the most efficient approach, in terms of creating the desired output in the lowest cpu time, to split and access the mmapped-data for each thread?

0

There are 0 best solutions below