I am currently studying Operating System and I came across "Solving concurrency problems using Condition Variables".
The task is to Simulate a Multi-Threaded Web Server with a Queue of size 10 which stores the client's request, using a very minimalistic C code. There are 2 producer threads (Clients) and 2 Consumer threads (Workers).
Now I have a few questions on the same -:
- Out of the below two approaches which one should be used (considering that I do not have to use any asynchronous approach and have to keep the code very simple) to store the clients requests -:
Option 1: Overwriting slots
Example:
Queue size = 5
Indexes: [0] [1] [2] [3] [4]
Client 1 puts request at [0] Queue: [R1] [ ] [ ] [ ] [ ]
Client 2 puts request at [1] Queue: [R1] [R2] [ ] [ ] [ ]
Worker takes [0] Queue: [ ] [R2] [ ] [ ] [ ]
Client 1 puts new request at [0] (reuses slot) Queue: [R3] [R2] [ ] [ ] [ ]
Option 2-: Appending to end
Example:
Client 1 puts request at [0]
Queue: [R1] [ ] [ ] [ ] [ ] Write = 1
Client 2 puts request at [1] Queue: [R1] [R2] [ ] [ ] [ ] Write = 2
Worker takes [0] Read = 1
Client 3 puts request at [2]
Queue: [ ] [R2] [R3] [ ] [ ] Write = 3
Considering the same task, in what cases is the Mutex required for the queue and in what cases is it not required?
What all can be corrected in the below code to make it work without any flaws?
C code for the above scenario -:
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define MAX_REQUESTS 10
// Shared resource
int requestQueue[MAX_REQUESTS];
int rear = -1;
int front = 0;
pthread_mutex_t queueMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t queueNotEmpty = PTHREAD_COND_INITIALIZER;
pthread_cond_t queueNotFull = PTHREAD_COND_INITIALIZER;
// Producer (Client) function
void* client(void* arg) {
int clientId = *((int*)arg);
while (1) {
// Simulating request generation
sleep(rand() % 1 + 1);
printf("Client %d online...\n", clientId);
pthread_mutex_lock(&queueMutex);
while (rear == MAX_REQUESTS-1) {
// Queue is full, waiting for a signal that the queue is not full
printf("Waiting for queue not full signal...\n");
pthread_cond_wait(&queueNotFull, &queueMutex);
printf("Client %d received signal, checking condition again...\n", clientId);
}
requestQueue[++rear] = clientId;
printf("Client %d generated a request. Total requests: %d\n", clientId, rear+1);
// Signaling worker threads that the queue is not empty
pthread_cond_broadcast(&queueNotEmpty);
pthread_mutex_unlock(&queueMutex);
}
return NULL;
}
// Consumer (Worker) function
void* worker(void* arg) {
int workerId = *((int*)arg);
while (1) {
printf("Worker %d ready...\n", workerId);
pthread_mutex_lock(&queueMutex);
while (front > rear) {
// Queue is empty, waiting for a signal that the queue is not empty
printf("Waiting for queue not empty signal...\n");
pthread_cond_wait(&queueNotEmpty, &queueMutex);
printf("Worker %d received signal, checking condition again...\n", workerId);
}
int clientId = requestQueue[front++];
printf("Worker %d processed a request from Client %d. Total requests: %d\n", workerId, clientId, rear+1);
rear--;
// Signaling clients that the queue is not full
pthread_cond_broadcast(&queueNotFull);
pthread_mutex_unlock(&queueMutex);
// Simulating request processing
sleep(rand() % 2 + 1);
}
return NULL;
}
int main() {
srand(time(NULL));
pthread_t clients[2];
pthread_t workers[2];
int clientIds[2] = {1, 2};
int workerIds[2] = {1,2};
for (int i = 0; i < 2; ++i) {
pthread_create(&clients[i], NULL, client, &clientIds[i]);
pthread_create(&workers[i], NULL, worker, &workerIds[i]);
}
for (int i = 0; i < 2; ++i) {
pthread_join(clients[i], NULL);
pthread_join(workers[i], NULL);
}
return 0;
}
Code output -:
[Code output 1](https://i.stack.imgur.com/F7p74.jpg)
[Code output 2](https://i.stack.imgur.com/CGjig.jpg)
[Code output 3](https://i.stack.imgur.com/Tlqeq.jpg)
If there is any logical pairing of Client1 with Worker1 and Client2 with Worker2 then allocate 5 Queue elements to the Client1 <-> Worker1 pair and allocate the remaining 5 Queue elements to the Client2 <-> Worker2 pair. The condition variable is needed to control when a Client writes to a memory location and when a Worker reads from the memory location.
If there is no logical pairing of Clients and Workers then the Queue should be treated as a circular Queue with Clients writing to "empty" Queue elements and Workers only reading from "non-empty" Queue elements. The determination of "empty" and "non-empty" falls upon the condition variables used in the program.
It is common for the non-paired example to implement a Client write index variable shared by the clients as well as a Worker read index variable to be shared by the Workers. It can also be useful for a "full" indication to be shared by the Clients and an "empty" indication to be shared by the Workers.
The Clients cannot write to a full Queue. The Workers cannot read from an empty Queue.
EDIT: The problem you are trying to solve is called the producer-consumer problem. The simplest version of the producer-consumer problem has one producer, which you call a Client, and one consumer, which you call a Worker. A simple semaphore may be used to control access to the shared buffer when there is only one producer and one consumer. The more general problem of having N producers and M consumers requires a more sophisticated access control scheme.
The URL referenced above speaks of Monitors, which work very well for the general problem of N producers and M consumers. A detailed description of the use of semaphores and monitors explains some of the disadvantages of semaphores compared to monitors.
All this complexity is simplified when using a programming language that supports monitors. I use the Ada programming language. Ada has monitors built into the core language. Following is an example of a generic producer consumer implementation. The producer consumer implementation is written in an Ada package.
Ada packages have two parts. The package specification defines the interface to the package and serves roughly the same purpose as a C header file. The package body contains the implementation of the producer-consumer library and is similar to a .c file in its purpose.
The Ada package specification is:
The generic parameter named Capacity allows the programmer to implement this package with a custom buffer size. If you want to create a 10 element buffer then pass the value 10 to this parameter when creating an instance of this package.
This package specification declares two task types. Tasks are Ada's name for threads. A task type is a pattern for a task. Multiple instances of a task type may be used in a single program.
The interfaces to the two task types are remarkably similar. The producer task has an entry named set_id which allows the programmer to assign an Id number to a particular instance of the producer task type. The second entry is named Stop and takes no parameters. The Consumer task type has identical entries to the Producer task type.
Task entries in Ada are specific calls that can be made from one task to another to communicate directly between tasks.
I am sure you have already noticed that there is no information in the package specification about the buffer shared between the Producer tasks and the Consumer tasks. The buffer is defined in the package body where it is programmatically visible only to the Producer and Consumer tasks.
The package body contains all the details needed to implement the Producer-Consumer pattern.
Inside the package body you will find implementations for the Producer and Consumer task types. You will also find an implementation of an Ada protected object. The protected object implements the shared buffer, which implicitly uses monitors to control access to the buffer.
The protected object named Buffer, like a package, is described in two parts. The protected object specification identifies the interfaces to the object as well as the private data members of the object. The protected body contains the implementation of the interface.
The protected object named Buffer describes three methods which can be called. The Write entry allows a Producer to write an Integer value to the Buffer. The Read entry allows a Consumer to read an Integer value from the Buffer. The Is_Empty function simply reports whether or not the buffer is empty. The Is_Empty is used to simplify some of the internal logic.
The private data members are:
The Write protected entry accepts an Integer argument when Count < Capacity. This is the Ada implementation of a condition variable. The call to Write will only complete when the condition evaluates to True. All calls to Write when the condition evaluates to False cause the calling task to be suspended in an entry queue implicitly handled by the protected object. When the Write entry is executed it copies the value of the parameter Item to the location in the Buf array indexed by Write_Index. Write_Index is then incremented modulo Capacity so that the array is treated as a circular buffer. Finally Count is incremented and the Write entry completes.
The Read entry passes an Integer value out to the calling task when Count > 0. The Read entry has its own implicit entry queue dealing with all Consumer tasks waiting for Count to exceed 0. When Read executes it copies the value of Buf indexed by Read_Index to the parameter Item. It then increments Read_Index modulo Capacity and finally decrements Count.
The Is_Empty function simply returns the Boolean value resulting from the comparison of Count with the value 0. In other words, it returns True when Count is 0 and False when it is not.
The Producer task body contains two local variables named Value and Me. Upon starting the Producer task instance is waits for another task, in this example the main task, to call its Set_Id entry. When called it copies the value passed through Set_Id to the local Me variable. The task then iterates through a loop until another task calls its Stop entry. The exit reserved word causes the loop to terminate. The Stop entry is selectively called, which is much like a form of a conditional call. If the Stop entry has not been called then the logic follows the "else" branch where the Buffer.Write entry is called. If the buffer is not full the task completes the Write call, outputs a message and increments Value. If the Buffer.Write entry is blocked because the buffer is full the task delays (waits) for 0.001 seconds and outputs a message. The loop continues to execute until the Stop entry call is encountered.
The Consumer is designed to read all the values in Buffer, even after Consumer is told to stop.
The Consumer task has three local variables. Value is the value read from Buffer. Me is the Id number of the instance of Consumer. Done is a Boolean variable used to identify when Stop has been called.
Consumer selectively calls the Read protected entry. If Read completes Buffer outputs the value read. If Read must wait for a value to be written to Buffer it delays (waits) 0.001 seconds and then checks the value of Done and the value returned by Is_Empty to determine when the buffer is empty and the Consumer task should terminate.
The main procedure for this program is
The main procedure creates an instance of Bounded_PC with a 10 element buffer. Two instances of Producer are created named P1 and P2. Two instances of Consumer are created named C1 and C2. All four tasks begin executing immediately. All four of the tasks immediately suspend at the Set_Id entry, waiting for an ID number.
The main procedure then passes ID numbers to each task, delays for 0.003 seconds, calls P1.Stop, P2.Stop. After another delay of 0.01 seconds main calls C1.Stop and C2.Stop.
The output of an example run of the program is