Тема: Синхронизация потоков потребителей

  • Вид работы: Курсовая работа (т)
  • Предмет: Информационное обеспечение, программирование

Оглавление


. Постановка задачи

. Сигнал производителя - потребителя

. Решение проблемы производителя потребителя

. Deadlock #4

. Производитель - потребитель с конечным буфером

. Сигнал конечного буфера производителя потребителя

. Конечный буфер производителя потребителя. Решение

. ОСРВ RTX

. Постановка задачи

10. Листинг программы

. Результаты работы

Список литературы


1. Постановка задачи


В многопоточных программах часто возникает разделение труда между потоками. В общей картине некоторые потоки являются производителями, а некоторые потребителями. Производители создают данные и помещают их в структуру данных, потребители забирают данные и обрабатывают их.

Хорошим примером являются программы, управляемые событиями. "Событие" это то, что происходит для того, что бы программа ответила: пользователь нажимает кнопку или перемещает мышь, блок данных поступает с диска, пакет данных приходит по сети, заканчивается выполняемая операция.

Всякий раз, когда происходит событие, поток-производитель создает объект события и добавляет его в буфер событий. Одновременно с этим, потребительские потоки принимают события из буфера и обрабатывают их. В этом случае потребителей называют "обработчиками событий"

Есть несколько ограничений синхронизации, которые нужны нам для того, что бы система функционировала корректно.

В то время как элемент добавляется или удаляется из буфера, буфер находится в несогласованном состоянии. Таким образом, потоки должны иметь эксклюзивный доступ к буферу.

Если поток-потребитель приходит, в то время, когда буфер пуст, происходит блокировка, пока производитель не добавляет новый элемент.

Предположим, что производители должны выполнить следующие операции многократно:


event = waitForEvent()

buffer.add(event)


Листинг. 1 Основной код производителя

Кроме того, предполагается, что потребители выполнят следующие операции:


event = buffer.get()

event.process()


Листинг. 2 Основной код производителя потребителя

Как было указано выше, доступ к буферу должен быть эксклюзивным, но waitForEvent и event.process могут сработать одновременно.

Задача: Добавить в код производителя и потребителя операторы синхронизации, для обеспечения ее корректной работы.


. Сигнал производителя - потребителя


Переменные, которые мы могли бы использовать:


mutex = Semaphore(1)

items = Semaphore(0)

local event


Листинг. 3 Инициализация производителя-потребителя

Взаимное исключение обеспечивает эксклюзивный доступ к буферу. Когда переменная положительна, это указывает число элементов в буфере, когда она отрицательна, это указывает число потребительских потоков в очереди.

Переменная event является локальной переменной, которая в данном контексте означает, что каждый поток имеет свою версию.

До сих пор мы предполагали, что у всех потоков есть доступ ко всем переменным, но мы назначим переменную к каждому потоку.

Есть несколько способов, как это может быть реализовано в различных условиях.

Если каждый поток имеет свой собственный стек времени выполнения, то любые переменные, находящиеся в стеке, являются переменными конкретного потока.

Если потоки представлены как объекты, мы можем добавить атрибут к каждому объекту потоку

Если потоки имеют уникальные идентификаторы, мы можем использовать идентификаторы в качестве индекса массива или хеш-таблицы, и хранить там данные для каждого потока. В большинстве программы переменные являются локальными, однако в нашем случае переменные будут общими, если они явно не обвялены как локальные.


. Решение проблемы производителя потребителя


Код решения:


1 event = waitForEvent()

mutex.wait()

buffer.add(event)

items.signal()

mutex.signal()


Листинг. 4 Решение производителя

Производитель не должен получить эксклюзивный доступ к буферу, пока не произойдет событие. Несколько потоков могут выполнить waitForEvent одновременно. Items семафоры отслеживают число элементов в буфере. Каждый раз, когда производитель добавляет элемент, он сигнализирует items, постепенно увеличивая его на один.

Код потребителя аналогичен.


1 items.wait()

2 mutex.wait()

event = buffer.get()

mutex.signal()

event.process()


Листинг. 5 Решение потребителя

Буферные операции защищены мьютексами, но прежде чем потребитель доберется до него, он должен уменьшить items. Если items равен нулю или отрицательный, потребитель блокируется, пока производитель не подаст сигнал.

Хотя это решение правильное, есть возможность внести в него небольшое улучшение. Представим себе, что по крайне мере один потребитель стоит в очереди, когда производитель сигнализирует items. Если диспетчер позволит работать потребителю, что произойдет дальше? Произойдет немедленная блокировка мьютексом, который пока еще захвачен производителем.

Блокировка и активация являются средне затратными операциями, их излишнее выполнение может ухудшить качество программы. Таким образом, вероятно, было бы лучше перестроить производителя вот так:


event = waitForEvent()

mutex.wait()

buffer.add(event)

mutex.signal()

items.signal()


Листинг. 6 Улучшенное решение производителя

Теперь мы можем не беспокоиться разблокировать потребителя, пока не будем знать, что работа может быть продолжена (кроме того редкого случая, когда другой производитель захватит мьютекс).

Однако в этом решении есть проблема. Семафор items отслеживает число элементов в очереди. Взглянув на код потребителя, мы видимо, что есть ситуация, в которой несколько потребителей постепенно могут уменьшить items, прежде чем один из них захватит мьютекс и удалит переменную из буфера. По крайне мере, на некоторое время items была бы неточной.

Мы могли бы попытаться решить эту проблему по средствам проверки буфера внутри мьютекса.


mutex.wait()

items.wait()

event = buffer.get()

mutex.signal()

event.process()


Листинг. 7 Нарушенное решение потребителя

Однако, это решение плохое.


4. Deadlock #4


Если потребитель начнет выполнение кода, это может привести к блокировке.


mutex.wait()

items.wait()

event = buffer.get()

mutex.signal()


event.process()


Листинг. 8 Нарушенное решение потребителя

Представим себе, что буфер пуст. Потребитель приходит, захватывает мьютекс, а затем блокируется на items. Когда придет производитель, он заблокируется на мьютексе, что приведет к остановке системы.

Это общая ошибка в коде синхронизации: если всегда ожидать захвата семафора, когда удерживается мьютекс, то существует опасность наступления deadlock. При решении проблемы синхронизации нужно обязательно учитывать этот нюанс.


. Производитель - потребитель с конечным буфером


В примере выше описан метод обработки событий потоками с использованием бесконечного буфера (размер ограничен только физическими ресурсами).

Однако в ядре операционной системы существует лимит на свободное место. Для таких вещей, как запросы к дискам, сетевые пакеты, буфер как правило имеет фиксированный размер. В подобных ситуациях у нас есть дополнительное ограничение синхронизации.

Если производитель приходит, когда буфер полон, он блокируется, пока потребитель не удаляет элемент.

Предположим мы знаем размер буфера. Назовем его BufferSize. Теперь мы имеем семафор, который отслеживает число элементов. Это можно попробовать представить так:


1 if items >= bufferSize:

block()


Листинг. 9 Нарушенное решение с конечным буфером

Но мы не можем это сделать, так как мы не можем узнать текущее значение семафора, есть только операции сигнала и ожидания.

Задача: написать код, который обрабатывает ограничение конечного буфера потребителя - производителя.


. Сигнал конечного буфера производителя потребителя


Добавим второй семафор, чтобы отслеживать число доступного места в буфер.


mutex = Semaphore(1)

items = Semaphore(0)

spaces = Semaphore(buffer.size())


Листинг. 10 Инициализация конечного буфера производителя - потребителя. Когда потребитель удаляет элемент, то он должен подать сигнал об освобождении памяти. Когда приходит производитель, он должен декрементировать свободное место, в этой точке возможна блокировка до следующего сигнала потребителя.


. Конечный буфер производителя потребителя. Решение


Ниже приведено конечное решение:


1 items.wait()

2 mutex.wait()

event = buffer.get()

mutex.signal()

spaces.signal()


event.process()


Листинг. 11 Решение потребителя с конечным буфером

Код производителя симметричен:


event = waitForEvent()


spaces.wait()

mutex.wait()

buffer.add(event)

mutex.signal()

items.signal()


Листинг. 12 Решение производителя с конечным буфером

Для того, что бы избежать deadlock производителей и потребителей, нужно проверить наличие свободных мест до захвата мьютекса. Для лучшей производительности мьютекс освобождают до подачи сигнала.


. ОСРВ RTX


Расширение IntervalZero RTX - программное средство, предназначенное для добавления функциональности "жёсткого" реального времени в системы под управлением операционных систем Microsoft Windows. Программный продукт RTX был с успехом опробован в тысячах различных автоматизированных систем управления, оборонных и аэрокосмических системах, контрольно-измерительной аппаратуре, роботах и т.д. Он позволил добиться повышения их эффективности, возможностей, степени масштабируемости и надёжности функционирования при одновременном сокращении сроков и стоимости разработки новой продукции.


. Постановка задачи


Заправочную станцию обслуживает один заправщик (производитель) и заправляется один потребитель. В единицу времени можно либо сливать топливо, либо заправляться, размер бака потребителя и размер бака заправки конечны. Реализовать синхронизацию между потребителем и производителем.


10. Листинг программы

синхронизация производитель сигнал буфер

//////////////////////////////////////////////////////////////////

//

// RtxApp1.c - C file

//

//////////////////////////////////////////////////////////////////

#include "RtxApp1.h"mutex;items;spaces;GasStation = 20;lReleaseCount = 1;Bubble;BubbleM;Plus = 3;Minus = 4;

#define MSGSTR_SEM "Sem"

#define MUTEX_ENABLEDRTFCNDCL Thread1Cycle (PVOID unused)

{(1)

{(mutex, INFINITE);

{= GasStation + Plus;(Bubble < 20)

{+= Plus;("(PRODUCER)GIVE = %d ", Plus);("(Fuel level = %d\n", GasStation);

}(Bubble > 20)

{("(PRODUCER)want GIVE = %d ", Plus);("Fuel level will be FULL, wait = %d\n", GasStation);

}(Bubble == 20)

{+= Plus;("(PRODUCER)GIVE = %d ", Plus);("Fuel level is FULL = %d\n", GasStation);

}(3000);(mutex, lReleaseCount, NULL);

}

}NO_ERRORS;

}RTFCNDCL Thread2Cycle (PVOID unused)

{(1)

{(mutex, INFINITE);

{(10);= rand()%10;= GasStation - Minus;(BubbleM > 0)

{-= Minus;("(CONSUMER)TAKE = %d ", Minus);("Fuel level = %d\n", GasStation);

}(BubbleM < 0)

{("(CONSUMER)want TAKE = %d ", Minus);("(CONSUMER)Need more GAS, wait = %d\n", GasStation);

}(BubbleM == 0)

{-= Minus;("(CONSUMER)TAKE = %d ", Minus);("Fuel level is EMPTY = %d\n", GasStation);

}(3000);(mutex, lReleaseCount, NULL);

}

}NO_ERRORS;

}_cdecl wmain(int argc, wchar_t **argv, wchar_t **envp)

{

// for periodic timer code_INTEGER liPeriod; // timer period_INTEGER time;hTimer = NULL; // timer handlethreadId;dwStackSize = 0;stackSize = 0;sleepTime = 30000;dwExitCode = 0;("\n");

//

// Create the thread.

//= CreateThread(, , //defaultCycle, //function, //parameters_SUSPENDED,

&threadId

);(hThread1 == NULL)

{("Error: could not create thread. GetLastError = %d\n", GetLastError());ERROR_OCCURED;

}(!RtSetThreadPriority( hThread1, RT_PRIORITY_MAX))

{("Error: could not set thread priority. GetLastError = %d\n", GetLastError());( hThread1, dwExitCode);ERROR_OCCURED;

}

//

// Create the CPU HOG thread.

//= CreateThread(, , //defaultCycle, //function, //parameters_SUSPENDED,

&threadId

);(hThread2 == NULL)

{("Error: could not create thread. GetLastError = %d\n", GetLastError());ERROR_OCCURED;

}(!RtSetThreadPriority(hThread2, RT_PRIORITY_MIN))

{("Error: could not set thread priority. GetLastError = %d\n", GetLastError());( hThread2, dwExitCode);ERROR_OCCURED;

}= RtCreateSemaphore( NULL, 1, 1, MSGSTR_SEM);= RtCreateSemaphore( NULL, 1, 1, MSGSTR_SEM);= RtCreateSemaphore( NULL, 1, 1, MSGSTR_SEM);(mutex==NULL)

{("RtCreateSemaphore failed.");

}(items==NULL)

{("RtCreateSemaphore failed.");

}(spaces==NULL)

{("RtCreateSemaphore failed.");

}(mutex, lReleaseCount, NULL);(

(ResumeThread(hThread1) == RESUME_ERROR)

||

(ResumeThread(hThread2) == RESUME_ERROR)

)

{("Error: could not resume thread. GetLastError = %d\n", GetLastError());(hThread1, dwExitCode);(hThread2, dwExitCode);ERROR_OCCURED;

}(150000);

//

// Stop thread.( !TerminateThread(hThread1, dwExitCode) || !TerminateThread(hThread2, dwExitCode))

{("Error: could not terminate thread. GetLastError = %d\n", GetLastError());ERROR_OCCURED;

}(mutex);(items);(spaces);(0);

}

#include "RtxApp1.h"

//

// RTX periodic timer handler function

//RTFCNDCL TimerHandler(PVOID context)

{a = (PINT)context;

(*a)++;

}

//////////////////////////////////////////////////////////////////

//

// RtxApp1.h - header file

//

//////////////////////////////////////////////////////////////////

#include <windows.h>

#include <wchar.h>

#include <rtapi.h>

#include <time.h>

#include <stdio.h>

//#include <string.h>

//#include <ctype.h>

//#include <conio.h>

#include <stdlib.h>

//#include <math.h>

//#include <errno.h>

// Add DEFINES Here

//**** error codes ****//

#define NO_ERRORS

#define ERROR_OCCURED -1

#define RESUME_ERRORxFFFFFFFF

#define SUPPEND_ERRORxFFFFFFFFhThread1; // handle to the threadhThread2; // handle to the thread

// Add Function prototypes Here

// function prototype for periodic timer function(* nContext

);

// Interrupt handler prototype(* nContext

);


11. Результаты работы


(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 20

(CONSUMER)TAKE = 1 Fuel level = 19

(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 19

(CONSUMER)TAKE = 1 Fuel level = 18

(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 18

(CONSUMER)TAKE = 1 Fuel level = 17

(PRODUCER)GIVE = 3 Fuel level is FULL = 20

(CONSUMER)TAKE = 1 Fuel level = 19

(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 19

(CONSUMER)TAKE = 1 Fuel level = 18

(PRODUCER)want GIVE = 3 Fuel level will be FULL, wait = 18

(CONSUMER)TAKE = 1 Fuel level = 17


Листинг. 13 Результат исполнения


Рис. 1 Результат работы

Список литературы


.The Little Book of SEMAPHORES, Allen B. Downey, 2009