Producer-consumer problem is a classic example of a problem, which requires multiple threads/processes synchronization. It describes two threads – the producer and the consumer. The producer creates a data and puts it into a fixed-size buffer and the consumer reads and removes it from the buffer.
To solve this problem, access to the buffer needs to be synchronized. Otherwise the consumer could try to read from an empty buffer or the producer could try to write to a full buffer. The following implementation extends the problem to multiple producers & consumers and uses condition_variables (C++11) to solve it.
Read more on wikipedia.
Source code
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
#include <iostream> #include <thread> #include <array> #include <vector> #include <mutex> #include <string> #include <condition_variable> #include <queue> #include <algorithm> using namespace std; mutex m; condition_variable producer_cv, consumer_cv; queue<int> buffer; const int BUFFER_SIZE = 10; void consumer_thread(int id) { while (true) { unique_lock<mutex> lk(m); if (!buffer.empty()) { cout << "Consumer " << id << " ate " << buffer.front() << endl; buffer.pop(); producer_cv.notify_all(); } else { cout << "*** Consumer " << id << " is waiting" << endl; consumer_cv.wait(lk, []{ return !buffer.empty(); }); } lk.unlock(); this_thread::sleep_for(chrono::milliseconds(random() % 400 + 800)); } } void producer_thread(int id) { while (true) { unique_lock<mutex> lk(m); if (buffer.size() < BUFFER_SIZE) { int a = random() % 400; cout << "Producer " << id << " produced: " << a << endl; buffer.push(a); consumer_cv.notify_all(); } else { cout << "### Producer " << id << " is waiting" << endl; producer_cv.wait(lk, []{ return buffer.size() < BUFFER_SIZE; }); } lk.unlock(); this_thread::sleep_for(chrono::milliseconds(random() % 400 + 100)); } } int main() { const int consumers_count = 5; const int producers_count = 3; vector<thread> producers; vector<thread> consumers; for (int i = 0; i < consumers_count; i++) consumers.push_back(thread(consumer_thread, i + 1)); for (int i = 0; i < producers_count; i++) producers.push_back(thread(producer_thread, i + 1)); for (int i = 0; i < consumers_count; i++) consumers[i].join(); for (int i = 0; i < producers_count; i++) producers[i].join(); system("pause"); return 0; } |