Условная переменная — примитив синхронизации, обеспечивающий блокирование одного или нескольких потоков до момента поступления сигнала от другого потока о выполнении некоторого условия или до истечения максимального промежутка времени ожидания. Условные переменные используются вместе с ассоциированным мьютексом и являются элементом некоторых видов мониторов.
Обзор
правитьКонцептуально, условная переменная — это очередь потоков, ассоциированных с разделяемым объектом данных, которые ожидают выполнения некоторого условия, накладываемого на состояние данных. Таким образом, каждая условная переменная связана с утверждением . Когда поток находится в состоянии ожидания на условной переменной, он не считается владеющим данными и другой поток может изменить разделяемый объект и просигнализировать ожидающим потокам в случае выполнения утверждения .
Примеры использования
правитьПриведенный пример иллюстрирует применение условных переменных для синхронизации потоков производителя и потребителя. Поток-производитель, постепенно увеличивая значение общей переменной, сигнализирует потоку, ожидающему на условной переменной о выполнении условия превышения максимального значения. Ожидающий поток-потребитель, проверяя значение общей переменной, блокируется в случае невыполнения условия превышения максимума. При получении сигнала об истинности утверждения поток «потребляет» разделяемый ресурс, уменьшая значение общей переменной так, чтобы оно не стало меньше допустимого минимума.
POSIX threads
правитьВ библиотеке POSIX Threads для языка C за использование условных переменных отвечают функции и структуры данных с префиксом pthread_cond.
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#define STORAGE_MIN 10
#define STORAGE_MAX 20
/* Разделяемый ресурс */
int storage = STORAGE_MIN;
pthread_mutex_t mutex;
pthread_cond_t condition;
/* Функция потока потребителя */
void *consumer(void *args)
{
puts("[CONSUMER] thread started");
int toConsume = 0;
while(1)
{
pthread_mutex_lock(&mutex);
/* Если значение общей переменной меньше максимального,
* то поток входит в состояние ожидания сигнала о достижении
* максимума */
while (storage < STORAGE_MAX)
{
pthread_cond_wait(&condition, &mutex);
}
toConsume = storage-STORAGE_MIN;
printf("[CONSUMER] storage is maximum, consuming %d\n", \
toConsume);
/* "Потребление" допустимого объема из значения общей
* переменной */
storage -= toConsume;
printf("[CONSUMER] storage = %d\n", storage);
pthread_mutex_unlock(&mutex);
}
return NULL;
}
/* Функция потока производителя */
void *producer(void *args)
{
puts("[PRODUCER] thread started");
while (1)
{
usleep(200000);
pthread_mutex_lock(&mutex);
/* Производитель постоянно увеличивает значение общей переменной */
++storage;
printf("[PRODUCER] storage = %d\n", storage);
/* Если значение общей переменной достигло или превысило
* максимум, поток потребитель уведомляется об этом */
if (storage >= STORAGE_MAX)
{
puts("[PRODUCER] storage maximum");
pthread_cond_signal(&condition);
}
pthread_mutex_unlock(&mutex);
}
return NULL;
}
int main(int argc, char *argv[])
{
int res = 0;
pthread_t thProducer, thConsumer;
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&condition, NULL);
res = pthread_create(&thProducer, NULL, producer, NULL);
if (res != 0)
{
perror("pthread_create");
exit(EXIT_FAILURE);
}
res = pthread_create(&thConsumer, NULL, consumer, NULL);
if (res != 0)
{
perror("pthread_create");
exit(EXIT_FAILURE);
}
pthread_join(thProducer, NULL);
pthread_join(thConsumer, NULL);
return EXIT_SUCCESS;
}
C++
правитьСтандарт C++11 добавил в язык поддержку многопоточности. Работа с условными переменными обеспечивается средствами, объявленными в заголовочном файле condition_variable
#include <cstdlib>
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
#define STORAGE_MIN 10
#define STORAGE_MAX 20
int storage = STORAGE_MIN;
std::mutex globalMutex;
std::condition_variable condition;
/* Функция потока потребителя */
void consumer()
{
std::cout << "[CONSUMER] thread started" << std::endl;
int toConsume = 0;
while(true)
{
std::unique_lock<std::mutex> lock(globalMutex);
/* Если значение общей переменной меньше максимального,
* то поток входит в состояние ожидания сигнала о достижении
* максимума */
if (storage < STORAGE_MAX)
{
condition.wait(lock , []{return storage >= STORAGE_MAX;} ); // Атомарно _отпускает мьютекс_ и сразу же блокирует поток
toConsume = storage-STORAGE_MIN;
std::cout << "[CONSUMER] storage is maximum, consuming "
<< toConsume << std::endl;
}
/* "Потребление" допустимого объема из значения общей
* переменной */
storage -= toConsume;
std::cout << "[CONSUMER] storage = " << storage << std::endl;
}
}
/* Функция потока производителя */
void producer()
{
std::cout << "[PRODUCER] thread started" << std::endl;
while (true)
{
std::this_thread::sleep_for(std::chrono::milliseconds(200));
std::unique_lock<std::mutex> lock(globalMutex);
++storage;
std::cout << "[PRODUCER] storage = " << storage << std::endl;
/* Если значение общей переменной достигло или превысило
* максимум, поток потребитель уведомляется об этом */
if (storage >= STORAGE_MAX)
{
std::cout << "[PRODUCER] storage maximum" << std::endl;
condition.notify_one();
}
}
}
int main(int argc, char *argv[])
{
std::thread thProducer(producer);
std::thread thConsumer(consumer);
thProducer.join();
thConsumer.join();
return 0;
}
Qt 4
правитьcw.h
#ifndef CW_H
#define CW_H
#include <QThread>
#include <QMutex>
#include <QWaitCondition>
#include <QDebug>
#define STORAGE_MIN 10
#define STORAGE_MAX 20
extern int storage;
extern QMutex qmt;
extern QWaitCondition condition;
class Producer : public QThread
{
Q_OBJECT
private:
void run()
{
qDebug() << "[PRODUCER] thread started";
while (1)
{
QThread::msleep(200);
qmt.lock();
++storage;
qDebug() << "[PRODUCER] storage = " << storage;
/* Если значение общей переменной достигло или превысило
* максимум, поток потребитель уведомляется об этом */
if (storage >= STORAGE_MAX)
{
qDebug() << "[PRODUCER] storage maximum";
condition.wakeOne();
}
qmt.unlock();
}
}
};
class Consumer : public QThread
{
Q_OBJECT
private:
void run()
{
qDebug() << "[CONSUMER] thread started";
int toConsume = 0;
while(1)
{
qmt.lock();
/* Если значение общей переменной меньше максимального,
* то поток входит в состояние ожидания сигнала о достижении
* максимума */
if (storage < STORAGE_MAX)
{
condition.wait(&qmt);
toConsume = storage-STORAGE_MIN;
qDebug() << "[CONSUMER] storage is maximum, consuming "
<< toConsume;
}
/* "Потребление" допустимого объема из значения общей
* переменной */
storage -= toConsume;
qDebug() << "[CONSUMER] storage = " << storage;
qmt.unlock();
}
}
};
#endif /* CW_H */
main.cpp
#include <QCoreApplication>
#include "cw.h"
int storage = STORAGE_MIN;
QMutex qmt;
QWaitCondition condition;
int main(int argc, char *argv[])
{
QCoreApplication app(argc, argv);
Producer prod;
Consumer cons;
prod.start();
cons.start();
return app.exec();
}
Python
правитьВ языке Python условные переменные реализованы в виде экземпляров класса Condition
модуля threading
. В следующем примере одна и та же условная переменная используется в потоках производителя и потребителя с применением синтаксиса менеджера контекста[1]
# Поток-потребитель
with cond_var: # в контексте условия cond_var
while not an_item_is_available(): # пока элемент недоступен
cond_var.wait() # ждать
get_an_item() # получить элемент
# Поток-производитель
with cond_var: # в контексте условия cond_var
make_an_item_available() # произвести элемент
cond_var.notify() # известить потребителей
Ada '95
правитьВ языке Ада нет необходимости в использовании условных переменных. Для организации мониторов с блокированием задач возможно использовать защищенные типы данных.
with Ada.Text_IO;
procedure Main is
task Producer; -- объявление задачи производителя
task Consumer; -- объявление задачи потребителя
type Storage_T is range 10 .. 20; -- тип диапазон для общего ресурса
-- монитор (защищенный объект), разделяемый производителем и потребителем
protected type Storage is
entry Put; -- операция "произвести" единицу ресурса
entry Get; -- операция "потребить" допустимое количество ресурса
entry Value(val : out Storage_T); -- аксессор значения переменной
private
-- скрытая переменная с минимальным начальным значением из диапазона типа
StorageData : Storage_T := Storage_T'First;
end Storage;
-- реализация монитора Storage
protected body Storage is
entry Put when StorageData < Storage_T'Last is
begin
StorageData := StorageData + 1;
if StorageData >= Storage_T'Last then
Ada.Text_IO.Put_Line("[PRODUCER] storage maximum");
end if;
end;
entry Get when StorageData >= Storage_T'Last is
To_Consume : Storage_T;
begin
To_Consume := StorageData - Storage_T'First;
StorageData := StorageData - To_Consume;
Ada.Text_IO.Put_Line("[CONSUMER] consuming");
end Get;
entry Value(val : out Storage_T) when true is
begin
val := StorageData;
end;
end Storage;
-- экземпляр монитора Storage
Storage1 : Storage;
-- реализация задачи производителя
task body Producer is
v : Storage_T;
begin
Ada.Text_IO.Put_Line("[PRODUCER] Task started");
loop
delay 0.2;
Storage1.Put;
Storage1.Value(v);
Ada.Text_IO.Put("[PRODUCER] ");
Ada.Text_IO.Put_Line(v'Img);
end loop;
end Producer;
-- реализация задачи потребителя
task body Consumer is
begin
Ada.Text_IO.Put_Line("[CONSUMER] Task started");
loop
Storage1.Get;
end loop;
end Consumer;
begin
null;
end Main;
Примечания
править- ↑ The Python Standard Library, 16.2. threading — Higher-level threading interface . Дата обращения: 9 января 2014. Архивировано 9 января 2014 года.
В статье не хватает ссылок на источники (см. рекомендации по поиску). |