There is a queue with priorities and four threads – two produce and enter priority values into the queue (the value and priority are the same) and two extract priority values from it.
And I need to separate Consumer Threads with this logic:
The first thread of the threads extracting priority values from the queue can only extract values from 1 to 5, the second - values from 6 to 10.
Program should use ReentrantLock and Condition for a synchronization.
I have this program code:
Queue class:
import java.util.concurrent.locks.*;
public class Queue implements PriorityQueue {
private int[] heap; // массив кучи
private int space; // количество элементов в очереди
private ReentrantLock lock; // блокировка
private Condition notFull; // объект управления "не полный"
private Condition notEmpty; // объект управления "не пустой"
public Queue(){
heap = new int[10];
space = 0;
lock = new ReentrantLock();
notFull = lock.newCondition();
notEmpty = lock.newCondition();
}
public void insert(int val){
lock.lock();
try {
while (full()) {
notFull.await();
}
int cur = space;
heap[space++] = val;
while (cur > 0 && heap[cur] > heap[(cur - 1) / 2]) {
change(cur, (cur - 1) / 2);
cur = ((cur - 1) / 2);
}
notEmpty.signalAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public int deleteMax(){
lock.lock();
try {
while (empty()) {
notEmpty.await();
}
int res = heap[0];
space--;
heap[0] = heap[space];
int current = 0;
while (current * 2 + 1 < space) {
int max;
int left = current * 2 + 1;
int right = current * 2 + 2;
if (right < space && heap[right] > heap[left])
max = right;
else max = left;
if (heap[current] < heap[max]) {
change(current, max);
current = max;
} else break;
}
notFull.signalAll();
return res;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return -1;
} finally {
lock.unlock();
}
}
public boolean full(){
return space == 10;
}
public boolean empty(){
return space == 0;
}
private void change(int one, int two){
int hold = heap[one];
heap[one] = heap[two];
heap[two] = hold;
}
public int getMax(){
return heap[0];
}
public void printQueue(){
lock.lock();
int l = heap.length;
System.out.println("Queue: ");
for (int i = 0; i < l; i++){
System.out.printf("%3s", heap[i]);
}
System.out.println();
lock.unlock();
}
}
Producer class:
import java.util.Random;
public class Producer implements Runnable {
private Queue queue;
private Random random = new Random();
public Producer(Queue queue) {
this.queue = queue;
}
public void run() {
while (true) {
try {
int k = random.nextInt(1, 3);
for (int i = 0; i < k; i++) {
int val = random.nextInt(1, 10);
System.out.println(Thread.currentThread().getName() + " Producer add " + val);
queue.insert(val);
queue.printQueue();
}
Thread.sleep(3000);
} catch (InterruptedException e) {
System.out.println("-------ERRORRRRRR");
Thread.currentThread().interrupt();
}
}
}
}
Consumer class:
import java.util.Random;
public class Consumer implements Runnable {
private Queue queue;
private int bound1; // либо 1, либо 6
private int bound2; // либо 5, либо 10
public Consumer(Queue queue, int bound1, int bound2) {
this.queue = queue;
this.bound1 = bound1;
this.bound2 = bound2;
}
public void run(){
while(true){
try {
int val = queue.getMax();
if (bound1 <= val && val <= bound2){
System.out.println(Thread.currentThread().getName() + " Consumer take " + queue.deleteMax());
queue.printQueue();
}else {
//System.out.println( Thread.currentThread().getName() + " ------------val isnt correct---------");
}
Thread.sleep(3000);
}catch (InterruptedException e){
System.out.println("-------ERRORRRRRR");
Thread.currentThread().interrupt();
}
}
}
}
Main class:
public class Main {
public static void main(String[] args){
Queue queue = new Queue();
Thread p1 = new Thread(new Producer(queue));
Thread p2 = new Thread(new Producer(queue));
Thread c1 = new Thread(new Consumer(queue, 1, 5));
Thread c2 = new Thread(new Consumer(queue, 6, 10));
p1.start();
p2.start();
c1.start();
c2.start();
}
}
As you can see I separates Consumer Threads in Consumer class using bound1 and bound2.
But now I have to change this implementation and use the third (or more) Condition object and make separation inside Queue class.
I tried, but honestly I don't clearly understand how to implement separation using Condition. There's my attempt, but the Consumer threads are extracting all values without separation.
public class Queue implements PriorityQueue {
private int[] heap; // массив кучи
private int space; // количество элементов в очереди
private ReentrantLock lock; // блокировка
private Condition notFull; // объект управления "не полный"
private Condition notEmpty_1_5; // объект управления "не пустой"
private Condition notEmpty_6_10;
public Queue(){
heap = new int[10];
space = 0;
lock = new ReentrantLock();
notFull = lock.newCondition();
notEmpty_1_5 = lock.newCondition();
notEmpty_6_10 = lock.newCondition();
}
public void insert(int val){
lock.lock();
try {
while (full()) {
notFull.await();
}
int cur = space;
heap[space++] = val;
while (cur > 0 && heap[cur] > heap[(cur - 1) / 2]) {
change(cur, (cur - 1) / 2);
cur = ((cur - 1) / 2);
}
System.out.println(Thread.currentThread().getName() + " Producer add " + val);
printQueue();
if (val >= 1 && val <= 5) notEmpty_1_5.signalAll();
else notEmpty_6_10.signalAll();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public int deleteMax(){
lock.lock();
try {
int res = heap[0];
while (empty()) {
if (res >= 1 && res <= 5) notEmpty_1_5.await();
else notEmpty_6_10.await();
}
space--;
heap[0] = heap[space];
int current = 0;
while (current * 2 + 1 < space) {
int max;
int left = current * 2 + 1;
int right = current * 2 + 2;
if (right < space && heap[right] > heap[left])
max = right;
else max = left;
if (heap[current] < heap[max]) {
change(current, max);
current = max;
} else break;
}
System.out.println(Thread.currentThread().getName() + " Consumer take " + res);
printQueue();
notFull.signalAll();
return res;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return -1;
} finally {
lock.unlock();
}
}
Can you help me and explain the correct way to make this separation?