This questions is related to this former question from which I will copy the most related parts (Note the change in File2) :
Problem Description:
I've written a multithreaded C program using POSIX to process two large TSV input files. My goal is to generate output files based on these inputs, but I'm facing performance issues.
File 1: Contains country-key pairs (e.g., African 0\nAmerindian 1\n...).
File 2: A large n x m matrix with single digit values corresponding to keys from File 1.
Example:
0\t3\t4\t1\t2... 0\t3\t4\t1\t2...File 3: Another large matrix (n x m+1), with columns of 0, 1, 9, and an index column of integer values which can vary in length . Example:
111111\t0\t1\t0\t0\t1... 22222233\t0\t1\t1\t1\t1...
Objective: For each country, I need to create an output file (country.txt) with the same structure as File 3. The output logic depends on matching keys from File 2 with values in File 3.
Output example:
For Amerindian.txt with key 1 we copy the first value from File 3, the index, then compare each value to the key value of the specific country. So that we observe a 9 for a mismatch at every position despite position 4 for row in File2 so that at this position we copy the value from File3 which leads to the following output:
111111\t9\t9\t9\t0\t9...
22222233\t9\t9\t9\t1\t9...
In the former question I was pointed to mmap in C which was suggested as the fastest approach. I implemented this idea into my program and reworked the main and processing function. Which looks like the following:
main:
//library loading, function,variable and input parameter declaration...
//process origins file
FILE *originsFile = fopen(argv[optind], "r");
originCount = readOrigins(originsFile, &origins);
printf("Number of origins parsed: %d\n", originCount);
//create mmap for
int tsv_fd = open(argv[optind + 1], O_RDONLY);
int txt_fd = open(argv[optind + 2], O_RDONLY);
struct stat tsv_stat, txt_stat;
fstat(tsv_fd, &tsv_stat);
fstat(txt_fd, &txt_stat);
char *tsv_mapped = mmap(NULL, tsv_stat.st_size, PROT_READ, MAP_SHARED, tsv_fd, 0);
char *txt_mapped = mmap(NULL, txt_stat.st_size, PROT_READ, MAP_SHARED, txt_fd, 0);
close(txt_fd);
close(tsv_fd);
// Check for mmap errors
if (tsv_mapped == MAP_FAILED || txt_mapped == MAP_FAILED) {
perror("Error mapping files");
// Handle error: close file descriptors, possibly exit
}
// Initialize txtLineCount and tsvLineCount appropriately
txtLineCount = countLinesInFile(txt_mapped, txt_stat.st_size);
//calculate workload for each thread
int linesPerThread = txtLineCount / numThreads;
int remainingLines = txtLineCount % numThreads;
//check that workload is reasonable, if txtLineCount exceeds numThreads set numThreads to number of lines
if (txtLineCount < numThreads) {
numThreads = txtLineCount; // Limit the number of threads to the number of lines
//calculate workload for each thread
int linesPerThread = txtLineCount / numThreads;
int remainingLines = txtLineCount % numThreads;
}
pthread_t threads[numThreads];
ThreadArg *threadArgs[numThreads];
for (int originIndex = 0; originIndex < originCount; ++originIndex) {
printf("Processing origin: %s\n", origins[originIndex].name);
int currentLine = 0;
for (int i = 0; i < numThreads; ++i) {
printf("Debug: Allocating ThreadArg for thread %d\n", i);
threadArgs[i] = (ThreadArg *)malloc(sizeof(ThreadArg));
threadArgs[i]->startLine = currentLine;
threadArgs[i]->endLine = currentLine + linesPerThread;
// Assign the memory-mapped file pointers
if (i < remainingLines) threadArgs[i]->endLine++;
currentLine = threadArgs[i]->endLine;
threadArgs[i]->tsvMapped = tsv_mapped;
threadArgs[i]->txtMapped = txt_mapped;
threadArgs[i]->specificOriginIndex = origins[originIndex].code;
threadArgs[i]->txt_stat = txt_stat.st_size;
threadArgs[i]->tsv_stat = tsv_stat.st_size;
threadArgs[i]->threadnum = i;
printf("Debug: Thread %d - startLine: %d, endLine: %d, specificOriginIndex: %d\n", i, threadArgs[i]->startLine, threadArgs[i]->endLine, threadArgs[i]->specificOriginIndex);
if (pthread_create(&threads[i], NULL, processFile, threadArgs[i])) {
fprintf(stderr, "Failed to allocate memory for thread arguments\n");
//cleanup and return..
processFile and getLinesFromMappedFile
void *processFile(void *arg) {
ThreadArg *threadArg = (ThreadArg *)arg;
char* getLinesFromMappedFile(const char* mappedFile, int startLine, int endLine, size_t fileSize, size_t* byteRange);
char tempFileName[1024];
sprintf(tempFileName, "temp_output_%d_%d.tmp", threadArg->specificOriginIndex, threadArg->threadnum);
FILE *outputFile = fopen(tempFileName, "a"); // Append mode
if (!outputFile) {
fprintf(stderr, "Failed to open output file.\n");
return NULL;
}
size_t byteRangeTsv;
const char* segmentTsv = getLinesFromMappedFile(threadArg->tsvMapped,
threadArg->startLine,
threadArg->endLine,
threadArg->tsv_stat,
&byteRangeTsv);
size_t byteRangeTxt;
const char* segmentTxt = getLinesFromMappedFile(threadArg->txtMapped,
threadArg->startLine,
threadArg->endLine,
threadArg->txt_stat,
&byteRangeTxt);
if (!segmentTsv || !segmentTxt) {
fprintf(stderr, "Lines not found in one of the files.\n");
if (outputFile) fclose(outputFile);
return NULL;
}
int tsvIndex;
int txtIndex;
printf("%ld,%ld", byteRangeTsv, byteRangeTxt);
// Process the segments as per the original logic
for (size_t tsvIndex = 0, txtIndex = 0; tsvIndex < byteRangeTsv && txtIndex < byteRangeTxt; ) {
if(txtIndex == 0){
// Copy the first index integer from file3 to the output
while (segmentTxt[txtIndex] != '\t' && segmentTxt[txtIndex] != '\0') {
fprintf(outputFile, "%c", segmentTxt[txtIndex]);
txtIndex++;
tsvIndex++;
}
// Skip the tab character in file3
if (segmentTxt[txtIndex] == '\t') {
fprintf(outputFile, "\t");
txtIndex++;
}
}
// Process the lines
while (segmentTsv[tsvIndex] != '\n' && segmentTxt[txtIndex] != '\n') {
if (segmentTsv[tsvIndex] == '\t') {
fprintf(outputFile, "\t");
txtIndex++;
tsvIndex++;
}
if (segmentTsv[tsvIndex] == threadArg->specificOriginIndex) {
fprintf(outputFile, "%c\t", segmentTxt[txtIndex]);
} else {
fprintf(outputFile, "9\t");
}
txtIndex++;
tsvIndex++;
}
fprintf(outputFile, "\n");
// Move to the start of the next line
while (segmentTsv[tsvIndex] != '\n' && tsvIndex < byteRangeTsv) tsvIndex++;
while (segmentTxt[txtIndex] != '\n' && txtIndex < byteRangeTxt) txtIndex++;
if (segmentTsv[tsvIndex] == '\n') tsvIndex++;
if (segmentTxt[txtIndex] == '\n') txtIndex++;
}
fclose(outputFile);
return NULL;
}
char* getLinesFromMappedFile(const char* mappedFile, int startLine, int endLine, size_t fileSize, size_t* byteRange) {
if (mappedFile == NULL || startLine < 0 || endLine < startLine) {
return NULL; // Invalid parameters
}
const char* startPtr = NULL;
const char* endPtr = NULL;
int currentLine = 0;
for (size_t i = 0; i < fileSize; ++i) {
if (currentLine == startLine && startPtr == NULL) {
startPtr = mappedFile + i; // Set start pointer at the beginning of startLine
}
if (mappedFile[i] == '\n') {
if (currentLine == endLine) {
endPtr = mappedFile + i; // Set end pointer at the end of endLine
break;
}
currentLine++;
}
}
if (startPtr == NULL) {
return NULL; // startLine not found
}
// If endPtr is not set, use fileSize as end
if (endPtr == NULL) {
endPtr = mappedFile + fileSize;
}
// Calculate the byte range to read
*byteRange = endPtr - startPtr;
return (char*)startPtr; // Cast to non-const to match function signature
}
Question : What is the most efficient approach, in terms of creating the desired output in the lowest cpu time, to split and access the mmapped-data for each thread?