Condition Variable to solve the Multi Producer and Multi Consumer problem

70 Views Asked by At

I am currently studying Operating System and I came across "Solving concurrency problems using Condition Variables".

The task is to Simulate a Multi-Threaded Web Server with a Queue of size 10 which stores the client's request, using a very minimalistic C code. There are 2 producer threads (Clients) and 2 Consumer threads (Workers).

Now I have a few questions on the same -:

  1. Out of the below two approaches which one should be used (considering that I do not have to use any asynchronous approach and have to keep the code very simple) to store the clients requests -:

Option 1: Overwriting slots

    Example:
    Queue size = 5
    Indexes: [0] [1] [2] [3] [4]

    Client 1 puts request at [0] Queue: [R1] [ ] [ ] [ ] [ ]
    Client 2 puts request at [1] Queue: [R1] [R2] [ ] [ ] [ ]
    Worker takes [0] Queue: [ ] [R2] [ ] [ ] [ ]
    Client 1 puts new request at [0] (reuses slot) Queue: [R3] [R2] [ ] [ ] [ ]

Option 2-: Appending to end

    Example:
    Client 1 puts request at [0]
    Queue: [R1] [ ] [ ] [ ] [ ] Write = 1
    Client 2 puts request at [1] Queue: [R1] [R2] [ ] [ ] [ ] Write = 2
    Worker takes [0] Read = 1
    Client 3 puts request at [2]
    Queue: [ ] [R2] [R3] [ ] [ ] Write = 3
  1. Considering the same task, in what cases is the Mutex required for the queue and in what cases is it not required?

  2. What all can be corrected in the below code to make it work without any flaws?

C code for the above scenario -:

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>

#define MAX_REQUESTS 10

// Shared resource
int requestQueue[MAX_REQUESTS];
int rear = -1;
int front = 0;

pthread_mutex_t queueMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t queueNotEmpty = PTHREAD_COND_INITIALIZER;
pthread_cond_t queueNotFull = PTHREAD_COND_INITIALIZER;

// Producer (Client) function
void* client(void* arg) {

    int clientId = *((int*)arg);

    while (1) {
        // Simulating request generation
        sleep(rand() % 1 + 1);
        printf("Client %d online...\n", clientId);
        pthread_mutex_lock(&queueMutex);
        while (rear == MAX_REQUESTS-1) {
            // Queue is full, waiting for a signal that the queue is not full
            printf("Waiting for queue not full signal...\n");
            pthread_cond_wait(&queueNotFull, &queueMutex);
            printf("Client %d received signal, checking condition again...\n", clientId);
        }

        requestQueue[++rear] = clientId;
        printf("Client %d generated a request. Total requests: %d\n", clientId, rear+1);
        // Signaling worker threads that the queue is not empty
        pthread_cond_broadcast(&queueNotEmpty);
        pthread_mutex_unlock(&queueMutex);
    }
    return NULL;
}

// Consumer (Worker) function
void* worker(void* arg) {

    int workerId = *((int*)arg);

    while (1) {
        printf("Worker %d ready...\n", workerId);
        pthread_mutex_lock(&queueMutex);
        while (front > rear) {
            // Queue is empty, waiting for a signal that the queue is not empty
            printf("Waiting for queue not empty signal...\n");
            pthread_cond_wait(&queueNotEmpty, &queueMutex);
            printf("Worker %d received signal, checking condition again...\n", workerId);
        }

        int clientId = requestQueue[front++];
        printf("Worker %d processed a request from Client %d. Total requests: %d\n", workerId, clientId, rear+1);
        rear--;
        // Signaling clients that the queue is not full
        pthread_cond_broadcast(&queueNotFull);
        pthread_mutex_unlock(&queueMutex);
        // Simulating request processing
        sleep(rand() % 2 + 1);
    }
    return NULL;
}

int main() {

    srand(time(NULL));
    pthread_t clients[2];
    pthread_t workers[2];
    int clientIds[2] = {1, 2};
    int workerIds[2] = {1,2};

    for (int i = 0; i < 2; ++i) {
        pthread_create(&clients[i], NULL, client, &clientIds[i]);
        pthread_create(&workers[i], NULL, worker, &workerIds[i]);
    }

    for (int i = 0; i < 2; ++i) {
        pthread_join(clients[i], NULL);
        pthread_join(workers[i], NULL);
    }
    return 0;
}

Code output -:

[Code output 1](https://i.stack.imgur.com/F7p74.jpg)
[Code output 2](https://i.stack.imgur.com/CGjig.jpg)
[Code output 3](https://i.stack.imgur.com/Tlqeq.jpg)
1

There are 1 best solutions below

2
Jim Rogers On

If there is any logical pairing of Client1 with Worker1 and Client2 with Worker2 then allocate 5 Queue elements to the Client1 <-> Worker1 pair and allocate the remaining 5 Queue elements to the Client2 <-> Worker2 pair. The condition variable is needed to control when a Client writes to a memory location and when a Worker reads from the memory location.

If there is no logical pairing of Clients and Workers then the Queue should be treated as a circular Queue with Clients writing to "empty" Queue elements and Workers only reading from "non-empty" Queue elements. The determination of "empty" and "non-empty" falls upon the condition variables used in the program.

It is common for the non-paired example to implement a Client write index variable shared by the clients as well as a Worker read index variable to be shared by the Workers. It can also be useful for a "full" indication to be shared by the Clients and an "empty" indication to be shared by the Workers.

The Clients cannot write to a full Queue. The Workers cannot read from an empty Queue.

EDIT: The problem you are trying to solve is called the producer-consumer problem. The simplest version of the producer-consumer problem has one producer, which you call a Client, and one consumer, which you call a Worker. A simple semaphore may be used to control access to the shared buffer when there is only one producer and one consumer. The more general problem of having N producers and M consumers requires a more sophisticated access control scheme.

The URL referenced above speaks of Monitors, which work very well for the general problem of N producers and M consumers. A detailed description of the use of semaphores and monitors explains some of the disadvantages of semaphores compared to monitors.

All this complexity is simplified when using a programming language that supports monitors. I use the Ada programming language. Ada has monitors built into the core language. Following is an example of a generic producer consumer implementation. The producer consumer implementation is written in an Ada package.

Ada packages have two parts. The package specification defines the interface to the package and serves roughly the same purpose as a C header file. The package body contains the implementation of the producer-consumer library and is similar to a .c file in its purpose.

The Ada package specification is:

-----------------------------------------------------------------------
-- Producer-consumer with bounded buffer
-----------------------------------------------------------------------
generic
   Capacity : Positive;
package Bounded_PC is
   task type Producer is
      entry set_id(Id : in Positive);
      entry Stop;
   end Producer;

   task type Consumer is
      entry set_id(Id : in Positive);
      entry Stop;
   end Consumer;

end Bounded_PC;

The generic parameter named Capacity allows the programmer to implement this package with a custom buffer size. If you want to create a 10 element buffer then pass the value 10 to this parameter when creating an instance of this package.

This package specification declares two task types. Tasks are Ada's name for threads. A task type is a pattern for a task. Multiple instances of a task type may be used in a single program.

The interfaces to the two task types are remarkably similar. The producer task has an entry named set_id which allows the programmer to assign an Id number to a particular instance of the producer task type. The second entry is named Stop and takes no parameters. The Consumer task type has identical entries to the Producer task type.

Task entries in Ada are specific calls that can be made from one task to another to communicate directly between tasks.

I am sure you have already noticed that there is no information in the package specification about the buffer shared between the Producer tasks and the Consumer tasks. The buffer is defined in the package body where it is programmatically visible only to the Producer and Consumer tasks.

The package body contains all the details needed to implement the Producer-Consumer pattern.

with Ada.Text_IO;     use Ada.Text_IO;

package body Bounded_PC is
   subtype Index_T is Positive range 1 .. Capacity;
   type Buf_Array is array (Index_T) of Integer;

   ------------
   -- Buffer --
   ------------

   protected Buffer is
      entry Write (Item : in Integer);
      entry Read (Item : out Integer);
      function Is_Empty return Boolean;
   private
      Buf         : Buf_Array;
      Write_Index : Index_T := 1;
      Read_Index  : Index_T := 1;
      Count       : Natural := 0;
   end Buffer;

   protected body Buffer is

      entry Write (Item : in Integer) when Count < Capacity is
      begin
         Buf (Write_Index) := Item;
         Write_Index       := (Write_Index mod Capacity) + 1;
         Count             := Count + 1;
      end Write;

      entry Read (Item : out Integer) when Count > 0 is
      begin
         Item       := Buf (Read_Index);
         Read_Index := (Read_Index mod Capacity) + 1;
         Count      := Count - 1;
      end Read;

      function Is_Empty return Boolean is
      begin
         return Count = 0;
      end Is_Empty;


   end Buffer;

   --------------
   -- Producer --
   --------------

   task body Producer is
      Value : Integer := 0;
      Me    : Positive;
   begin
      accept set_id (Id : in Positive) do
         Me := Id;
      end set_id;

      loop
         select
            accept Stop;
            Put_Line("Producer" & Me'Image & " is terminating.");
            exit;
         else
            select
               Buffer.Write (Value);
               Put_Line ("Producer" & Me'Image & " wrote" & Value'Image);
               Value := Value + 1;
            or
               delay 0.001;
               Put_Line ("Producer" & Me'Image & " is waiting ....");
            end select;
         end select;
      end loop;
   end Producer;

   --------------
   -- Consumer --
   --------------

   task body Consumer is
      Value : Integer;
      Me    : Positive;
      Done  : Boolean := False;
   begin
      accept set_id (Id : in Positive) do
         Me := Id;
      end set_id;

      loop
         select
            accept Stop;
            Done := True;
         else
            select
               Buffer.Read (Value);
               Put_Line ("Consumer" & Me'Image & " read" & Value'Image);
            or
               delay 0.001;
               if Done and then Buffer.Is_Empty then
                  Put_Line("Consumer" & Me'Image & " terminating.");
                  exit;
               else
                  Put_Line ("Consumer" & Me'Image & " is waiting ....");
               end if;
            end select;
         end select;
      end loop;
   end Consumer;

end Bounded_PC;

Inside the package body you will find implementations for the Producer and Consumer task types. You will also find an implementation of an Ada protected object. The protected object implements the shared buffer, which implicitly uses monitors to control access to the buffer.

The protected object named Buffer, like a package, is described in two parts. The protected object specification identifies the interfaces to the object as well as the private data members of the object. The protected body contains the implementation of the interface.

The protected object named Buffer describes three methods which can be called. The Write entry allows a Producer to write an Integer value to the Buffer. The Read entry allows a Consumer to read an Integer value from the Buffer. The Is_Empty function simply reports whether or not the buffer is empty. The Is_Empty is used to simplify some of the internal logic.

The private data members are:

  • Buf -> the bounded buffer implemented as an array
  • Write_Index -> The array index used by the Write entry, initialized to 1
  • Read_Index -> The array index used by the Read entry, initialized to 1
  • Count -> The number of occupied elements in the array, initialized to 0 The protected body contains the implementations for Write, Read and Is_Empty.

The Write protected entry accepts an Integer argument when Count < Capacity. This is the Ada implementation of a condition variable. The call to Write will only complete when the condition evaluates to True. All calls to Write when the condition evaluates to False cause the calling task to be suspended in an entry queue implicitly handled by the protected object. When the Write entry is executed it copies the value of the parameter Item to the location in the Buf array indexed by Write_Index. Write_Index is then incremented modulo Capacity so that the array is treated as a circular buffer. Finally Count is incremented and the Write entry completes.

The Read entry passes an Integer value out to the calling task when Count > 0. The Read entry has its own implicit entry queue dealing with all Consumer tasks waiting for Count to exceed 0. When Read executes it copies the value of Buf indexed by Read_Index to the parameter Item. It then increments Read_Index modulo Capacity and finally decrements Count.

The Is_Empty function simply returns the Boolean value resulting from the comparison of Count with the value 0. In other words, it returns True when Count is 0 and False when it is not.

The Producer task body contains two local variables named Value and Me. Upon starting the Producer task instance is waits for another task, in this example the main task, to call its Set_Id entry. When called it copies the value passed through Set_Id to the local Me variable. The task then iterates through a loop until another task calls its Stop entry. The exit reserved word causes the loop to terminate. The Stop entry is selectively called, which is much like a form of a conditional call. If the Stop entry has not been called then the logic follows the "else" branch where the Buffer.Write entry is called. If the buffer is not full the task completes the Write call, outputs a message and increments Value. If the Buffer.Write entry is blocked because the buffer is full the task delays (waits) for 0.001 seconds and outputs a message. The loop continues to execute until the Stop entry call is encountered.

The Consumer is designed to read all the values in Buffer, even after Consumer is told to stop.

The Consumer task has three local variables. Value is the value read from Buffer. Me is the Id number of the instance of Consumer. Done is a Boolean variable used to identify when Stop has been called.

Consumer selectively calls the Read protected entry. If Read completes Buffer outputs the value read. If Read must wait for a value to be written to Buffer it delays (waits) 0.001 seconds and then checks the value of Done and the value returned by Is_Empty to determine when the buffer is empty and the Consumer task should terminate.

The main procedure for this program is

with Bounded_PC;

procedure Main is
   package Int_Pck is new Bounded_PC (10);
   use Int_Pck;

   P1 : Producer;
   P2 : Producer;
   C1 : Consumer;
   C2 : Consumer;
begin
   P1.set_id (1);
   P2.set_id (2);
   C1.set_id (1);
   C2.set_id (2);
   delay 0.003;
   P1.Stop;
   P2.Stop;
   delay 0.01;
   C1.Stop;
   C2.Stop;
end Main;

The main procedure creates an instance of Bounded_PC with a 10 element buffer. Two instances of Producer are created named P1 and P2. Two instances of Consumer are created named C1 and C2. All four tasks begin executing immediately. All four of the tasks immediately suspend at the Set_Id entry, waiting for an ID number.

The main procedure then passes ID numbers to each task, delays for 0.003 seconds, calls P1.Stop, P2.Stop. After another delay of 0.01 seconds main calls C1.Stop and C2.Stop.

The output of an example run of the program is

Producer 1 wrote 0
Producer 2 wrote 0
Consumer 2 read 0
Producer 1 wrote 1
Consumer 2 read 1
Producer 1 wrote 2
Consumer 1 read 0
Consumer 1 read 2
Producer 2 wrote 1
Consumer 1 read 3
Consumer 1 read 2
Producer 2 wrote 2
Consumer 2 read 1
Producer 2 wrote 3
Consumer 1 read 3
Producer 2 wrote 4
Producer 1 wrote 3
Consumer 2 read 4
Producer 1 wrote 4
Producer 2 wrote 5
Producer 1 wrote 5
Producer 1 wrote 6
Producer 2 wrote 6
Producer 1 wrote 7
Producer 1 wrote 8
Consumer 2 read 4
Consumer 2 read 5
Consumer 2 read 6
Consumer 2 read 6
Producer 1 wrote 9
Producer 2 wrote 7
Producer 2 wrote 8
Producer 2 wrote 9
Producer 2 wrote 10
Producer 1 wrote 10
Producer 2 wrote 11
Producer 2 wrote 12
Producer 1 wrote 11
Consumer 1 read 5
Consumer 2 read 7
Consumer 1 read 8
Consumer 2 read 7
Producer 1 wrote 12
Producer 2 wrote 13
Producer 1 wrote 13
Producer 2 wrote 14
Consumer 2 read 10
Consumer 1 read 9
Consumer 2 read 8
Consumer 1 read 9
Consumer 1 read 11
Producer 2 wrote 15
Consumer 1 read 12
Producer 2 wrote 16
Producer 2 wrote 17
Consumer 1 read 11
Producer 2 wrote 18
Producer 2 wrote 19
Producer 2 wrote 20
Consumer 1 read 13
Producer 1 wrote 14
Consumer 2 read 10
Producer 2 wrote 21
Producer 1 wrote 15
Consumer 2 read 13
Consumer 1 read 12
Producer 2 wrote 22
Consumer 2 read 14
Consumer 1 read 14
Consumer 1 read 16
Consumer 1 read 17
Consumer 1 read 18
Consumer 1 read 19
Consumer 1 read 20
Producer 2 wrote 23
Consumer 2 read 15
Producer 2 wrote 24
Consumer 2 read 15
Producer 2 wrote 25
Producer 2 wrote 26
Producer 2 wrote 27
Producer 2 wrote 28
Producer 2 wrote 29
Producer 1 wrote 16
Producer 2 wrote 30
Consumer 2 read 22
Producer 1 wrote 17
Consumer 2 read 16
Producer 2 wrote 31
Consumer 2 read 23
Consumer 1 read 21
Producer 1 wrote 18
Producer 2 wrote 32
Consumer 1 read 25
Producer 1 wrote 19
Consumer 1 read 26
Producer 2 wrote 33
Consumer 2 read 24
Producer 1 wrote 20
Consumer 2 read 28
Consumer 1 read 27
Consumer 1 read 30
Consumer 1 read 17
Consumer 1 read 31
Consumer 1 read 18
Consumer 1 read 32
Producer 2 wrote 34
Consumer 1 read 19
Producer 2 wrote 35
Producer 1 wrote 21
Producer 2 wrote 36
Producer 1 wrote 22
Producer 1 wrote 23
Producer 1 wrote 24
Producer 1 wrote 25
Producer 2 wrote 37
Consumer 1 read 33
Consumer 2 read 29
Consumer 1 read 20
Producer 2 wrote 38
Producer 1 wrote 26
Producer 2 wrote 39
Consumer 2 read 34
Consumer 1 read 21
Consumer 2 read 35
Producer 1 wrote 27
Consumer 1 read 36
Consumer 1 read 37
Consumer 1 read 23
Consumer 1 read 24
Consumer 1 read 25
Consumer 1 read 26
Producer 1 wrote 28
Producer 2 wrote 40
Producer 1 wrote 29
Producer 2 wrote 41
Producer 2 wrote 42
Producer 2 wrote 43
Producer 2 wrote 44
Producer 1 wrote 30
Consumer 2 read 22
Consumer 1 read 38
Producer 2 wrote 45
Consumer 2 read 39
Producer 1 wrote 31
Consumer 1 read 27
Producer 2 wrote 46
Consumer 2 read 40
Producer 1 wrote 32
Consumer 2 read 29
Producer 2 wrote 47
Consumer 2 read 41
Producer 1 wrote 33
Consumer 2 read 30
Producer 2 wrote 48
Consumer 2 read 42
Consumer 2 read 43
Consumer 2 read 44
Producer 1 wrote 34
Consumer 2 read 45
Consumer 2 read 31
Consumer 2 read 46
Consumer 2 read 32
Producer 2 wrote 49
Consumer 2 read 47
Producer 2 wrote 50
Consumer 2 read 33
Consumer 2 read 48
Producer 1 wrote 35
Producer 2 wrote 51
Producer 1 wrote 36
Consumer 2 read 34
Producer 1 wrote 37
Producer 2 wrote 52
Producer 2 wrote 53
Producer 2 wrote 54
Producer 1 wrote 38
Producer 1 wrote 39
Consumer 2 read 49
Consumer 1 read 28
Producer 2 wrote 55
Consumer 1 read 50
Consumer 2 read 35
Producer 1 wrote 40
Producer 2 wrote 56
Producer 1 wrote 41
Consumer 2 read 36
Consumer 1 read 51
Consumer 2 read 52
Producer 2 wrote 57
Consumer 2 read 38
Consumer 2 read 53
Consumer 2 read 54
Consumer 2 read 39
Consumer 2 read 55
Consumer 2 read 40
Producer 1 wrote 42
Consumer 2 read 56
Producer 1 wrote 43
Producer 1 wrote 44
Producer 2 wrote 58
Producer 1 wrote 45
Consumer 2 read 41
Producer 1 is terminating.
Consumer 2 read 57
Consumer 2 read 42
Consumer 2 read 58
Consumer 2 read 43
Consumer 2 read 44
Consumer 2 read 45
Consumer 2 read 59
Consumer 1 read 37
Producer 2 wrote 59
Producer 2 is terminating.
Consumer 2 is waiting ....
Consumer 1 is waiting ....
Consumer 2 is waiting ....
Consumer 1 is waiting ....
Consumer 2 is waiting ....
Consumer 1 is waiting ....
Consumer 2 is waiting ....
Consumer 1 terminating.
Consumer 2 terminating.