Gala. Параллельное программирование в Delphi

Последнее обновление: 24 января 2004

Посвящается моей жене Гале

Параллельное программирование - программирование в терминах параллельных процессов и их взаимодействий - добавляет к традиционному последовательному программированию новое измерение и дает программисту инструмент декомпозиции сложных задач, многие из которых естественно и удобно описываются как группа взаимодействующих друг с другом параллельных процессов. Обычно выделяют два больших класса систем, требующих распараллеливания - синхронные и асинхронные. Потребность в синхронных системах (их называют еще векторными, матричными или системами массового параллелизма) возникает наиболее часто в матричных вычислительных или поисковых алгоритмах для увеличения скорости получения решения. Адекватная реализация синхронных систем напрямую связана с многопроцессорностью и здесь рассматриваться не будет. Потребность в асинхронном распараллеливании наиболее ярко проявляется в интерактивных задачах и задачах управления в реальном времени. Особенность этих задач, которая позволяет эффективно применять параллельное программирование даже на однопроцессорной машине, состоит в том, что отдельные компоненты задачи большую часть времени проводят в состоянии ожидания каких-либо событий - именно это позволяет большому числу процессов эффективно разделять процессорное время единственного процессора.

Так получилось, что большая часть программистских задач, которые мне приходилось решать, наиболее адекватно декомпозировались именно в терминах параллельных процессов и их взаимодействий - именно этим определяется мой интерес к этой теме. Существуют языки, которые непосредственно поддерживают конструкции параллельного программирования, например, Ада и Оккам, но первый - чересчур громоздкий и сложный, для него нет доступного и качественного компилятора, работающего в среде Windows, а второй - Оккам - достаточно специфичный язык, разработанный специально для программирования транспьютеров фирмы Inmos [2]. В тоже время есть доступные и качественные Windows-инструменты: объектно-ориентированные языки последовательного программирования C++ и Delphi, которые можно естественно расширить для реализации концепций параллельного программирования, используя понятие класса. В статье описывается библиотека классов Gala для Delphi. Если мне придется программировать в среде C++ Builder или Visual C++, то я перепишу Gala на C++. Подобная библиотека (2kLib) была ранее сделана мной для DOS и Borland C++ [1].

Реализация библиотеки Gala опирается на низкоуровневые возможности параллельного программирования, предоставляемые платформой Win32 (операционные системы Windows 95, 98, NT, 2000) - потоки, сигналы, мьютексы, семафоры, критические секции, сообщения. Библиотека позволяет выполнять декомпозицию задачи с использованием асинхронных взаимодействующих параллельных процессов и разделяемых ресурсов, существующих в рамках одной Windows-программы. Delphi предоставляет класс TThread, но его функциональность ограничивается, в основном, взаимодействием с VCL-компонентами, которые не являются потокобезопасными. TThread не дает никаких методов взаимодействия с другими параллельными процессами (потоками). Этот класс удобно использовать, когда в программе малое количество статически создаваемых процессов (потоков), взаимодействующих с основным VCL-потоком и не взаимодействующих между собой. Если же задача раскладывается на достаточно большое число динамически создаваемых, уничтожаемых и взаимодействующих между собой процессов, то использование TThread становится очень сложным и ненадежным. Непосредственное же использование разнообразных низкоуровневых средств синхронизации Win32 чрезвычайно трудоемко, чревато ошибками и громоздко (хотя и более эффективно).

Сказка

В некотором царстве, в некотором государстве жил-был Бармалеус. И был у него театр. Рано утречком Бармалеус открывал театр и убирался в нем, готовил сцену и ждал вечера, когда начнется спектакль. К вечеру собирались актеры и режиссер. А надо сказать, что режиссер был весьма необыкновенным - он каждый день придумывал новый спектакль, и актеры каждый день играли новые роли без всяких репетиций. Режиссер был очень авторитарным человеком - он требовал от актеров точного исполнения действий, которые он видел в своем воображении, но поскольку он, кроме этого, был культурным и вежливым человеком, то свои требования он выражал сообщениями-записками, которые посылал каждому актеру. А начинались сообщения со слов - "Будьте так добры, сделайте, пожалуйста, то-то и то-то". Все актеры были профессионалами и легко понимали режиссера. И были актеры такими же вежливыми и культурными, они экономили процессорное время и не строили козни другим актерам. А если им для выполнения своих действий что-то было нужно, то они направляли такие же вежливые сообщения-записочки рабочим сцены. И никто не ругался ненормативной лексикой и не пил за кулисами водку.

Вот в эту сказочку мне захотелось поиграть и я начал строить такой театр. Режиссер - это их величество Алгоритм, царствующий в каждой программе-спектакле, а актеры - это параллельные процессы, активизирующиеся при получении сообщений и взаимодействующие с другими параллельными процессами с помощью каналов передачи сообщений. А собственно театр - это библиотека классов Gala, с помощью которой я играю в эту сказку.

Действующие лица и исполнители

ГалаТеатр
Это то место, где происходит рождение, жизнь и смерть ГалаПроцессов. В программе может быть только один объект GalaTheater (так же как объект Application). ГалаТеатр позволяет объединять процессы в группы-кластеры. Как правило, процессы взаимодействуют только внутри группы. В программе может быть одна или несколько групп. Можно провести такую аналогию - группа это сцена. В театре может быть несколько сцен, на каждой из которых разыгрывается свой собственный спектакль. ГалаТеатр предоставляет методы для создания и завершения группы процессов, отладочной трассировки и протоколирования работы процессов.
ГалаПроцесс
Это основное действующее лицо в ГалаТеатре. Все процессы в ГалаТеатре порождаются от базового класса TGalaProcess, который имеет виртуальный абстрактный метод Execute - именно в нем описывается алгоритм функционирования процесса. Базовый ГалаПроцесс инкапсулирует поток (thread) Win32 без использования Delphi-класса TThread и имеет методы для:
  • завершения процесса, реализации действий при нормальном и принудительном завершении;
  • получения и изменения его приоритета;
  • приостановки, запуска и приостановки на заданный промежуток времени;
  • взаимодействия с основным VCL-потоком, с другими параллельными процессами и разделяемыми ресурсами;
  • отладочной трассировки.
Объект ГалаПроцесса явно создается конструктором, но не уничтожается явно при завершении процесса, а только освобождает основные системные ресурсы. Уничтожение объектов ГалаПроцессов выполняется ГалаТеатром и только для всей группы взаимодействующих процессов - это позволяет корректно обрабатывать ситуации взаимодействия с уже завершенным процессом. Операция принудительного завершения заканчивает процесс даже в том случае, если он приостановлен или ожидает какого-либо события. Принудительное завершение процесса выглядит как директива-предписание, но завершающие действия выполняются процессом самостоятельно. ГалаПроцесс может владеть каналами для взаимодействия с другими ГалаПроцессами и посылать сообщения основному VCL-потоку. Для взаимодействия с другими параллельными процессами и разделяемыми ресурсами используется модель асимметричного рандеву - как в языке Ада. По сравнению с симметричным рандеву, принятым в языке Оккам, асимметричное рандеву имеет три важных преимущества. Во-первых, это позволяет явно разделить процессы на два больших класса - клиенты и серверы, во-вторых, позволяет делать независимую (библиотечную) реализацию серверов, так как при асимметричном рандеву серверам не требуется знание своих клиентов и, в-третьих, унифицировать взаимодействия Процесс-Процесс и Процесс-РазделяемыйРесурс. Для выполнения недетерминированного ожидания по своим каналам ГалаПроцесс имеет метод AlternativeAccept, который подобен оператору отбора select / or языка Ада [7], или оператору Alt языка Оккам [2]. Метод AlternativeAccept ожидает первого поступившего запроса по заданным каналам или возбуждает исключительную ситуацию EGalaTimeout, если взаимодействие не произошло за указанное время.
ГалаКонтейнер
Пассивный ресурс, разделяемый несколькими ГалаПроцессами. Объекты этого класса порождаются от базового класса TGalaContainer. ГалаКонтейнер может владеть каналами, посредством которых ГалаПроцессы получают доступ к разделяемому ресурсу. Сам контейнер ничего не делает, он только владеет каналами, которые все и делают. Каналы ГалаКонтейнера позволяют выполнять доступ к совместно используемым данным одновременно многим процессам в режиме взаимного исключения. Если контейнер не может в данный момент времени удовлетворить запрос, то он приостанавливает вызывающий процесс, а после завершения очередной операции в контейнере активизирует все ожидающие процессы так, чтобы они опять вступили в конкуренцию за обладание ресурсами контейнера. ГалаКонтейнеры подобны процессам-серверам, но более эффективны и потребляют меньше системных ресурсов, чем ГалаПроцессы. Контейнеры можно использовать вместо процессов в тех случаях, когда назначение контейнера только в том, чтобы предоставить доступ нескольким процессам к одному ресурсу. В отличие от процессов, контейнеры создаются, как правило, не динамически, а статически.
ГалаСообщение
Позволяет взаимодействовать ГалаПроцессу с основным VCL-потоком с помощью посылки окну (форме) синхронного Windows-сообщения методом Send. Для ответа на такое сообщение в VCL-форме определяется соответствующий метод. Данные передаются в сообщении с помощью бестипового указателя, что позволяет выполнить двунаправленный обмен данными любого типа. На время обработки сообщения, вызвавший процесс приостанавливается.
ГалаКанал
Позволяет ГалаПроцессам взаимодействовать друг с другом и с ГалаКонтейнерами. ГалаКаналами владеют либо ГалаПроцессы, либо ГалаКонтейнеры. Каждый канал может получать сообщения одновременно от нескольких процессов. Объект-ГалаКанал может быть типа TGalaProcessChannel или TGalaContainerChannel в зависимости от того, кто будет создавать и владеть этим каналом. Создаются объекты каналов явно в конструкторах ГалаПроцесса или ГалаКонтейнера с помощью функции CreateChannel, а уничтожаются автоматически при уничтожении владельцев. Действия над данными, реализуемые при передаче сообщения в канал, описываются во входной (серверной) процедуре - она вызывается каналом в режиме взаимоисключения. Охраняющая функция определяет условие, при котором входная процедура может быть вызвана. Входная процедура соответствует входу entry оператора accept, а охраняющая функция - оператору when (язык Ада [7]). Для взаимодействия с ГалаПроцессами в канале определен метод Send, принимающий указатели на посылающий процесс и на посылаемые-принимаемые данные. Использование бестипового указателя позволяет передавать по каналу данные любого типа в обоих направлениях.
ГалаСигнал
Кроме взаимодействий с другими процессами, ГалаПроцессы могут ожидать сигналы. Сигналы несут в себе только факт наступления некоторых событий, как связанных с другими процессами, так и с внешними событиями, происходящими в операционной системе. ГалаСигналы представляются собой набор таких средств, инкапсулирующих примитивы, непосредственно предоставляемые операционной системой - события, мьютексы, семафоры и файловые уведомления. Использование их необязательно, но иногда более удобно, особенно в тех случаях, когда не требуется обмен данными, а важен сам факт наступления некоторого события. Все ГалаСигналы порождаются от базового класса TGalaSignal и имеют метод Wait. Метод Wait позволяет ограничить время ожидания и отслеживает факт получения ожидающим процессом директивы завершения, что дает возможность выполнить корректное принудительное завершение процесса, находящегося в состоянии ожидания.

Действующие лица и исполнители. Продолжение

А теперь - все то же самое, но более подробно. В этом разделе мы рассмотрим все существенные для использования свойства и методы ГалаТеатра и его обитателей. Более подробное описание вы можете найти в help-файле, а еще больше подробностей вы можете увидеть прямо в исходных текстах.

ГалаТеатр

TGalaTheater = class
public
  constructor Create;
  destructor  Destroy; override;

  function  GetNewGroup: Integer;
  procedure ResumeGroup(aGroup: Integer);
  procedure SuspendGroup(aGroup: Integer);
  function  TerminateAllProcesses: Boolean;
  function  TerminateGroup(aGroup: Integer): Boolean;
  function  TryToDestroyAllProcesses: Boolean;
  function  TryToDestroyGroup(aGroup: Integer): Boolean;
  procedure DestroyAllProcesses;
  procedure DestroyGroup (aGroup: Integer);
  property  Log(const S: string);

  property  PriorityClass: Integer read GalaGetPriorityClass
            write GalaSetPriorityClass;
  property  NotificationWindow: HWND read FNotificationWindow
            write FNotificationWindow;
end;

var
  GalaTheater: TGalaTheater;

Объект GalaTheater создается конструктором Create и уничтожается деструктором Destroy. Аналогично объекту Application в программе может быть только один объект GalaTheater. Удобное место для создания и уничтожения ГалаТеатра - методы FormCreate и FormDestroy (или FormClose) главной формы приложения. Процессы в ГалаТеатре существуют группами. Группа характеризует связанность процессов, - процессы взаимодействуют между собой только внутри группы. Исключение составляют серверные (системные) процессы, которые объединяются в одну группу и предоставляют сервисы процессам остальных групп. Функция GetNewGroup позволяет получить уникальный номер группы в том случае, если в вашей программе будет несколько групп, создающихся и уничтожающихся динамически. Если в программе одна или несколько статических групп, то идентификаторы (номера) групп можно задать константами (начиная от 1). Перед закрытием ГалаТеатра нужно завершить все (еще работающие) процессы. Закрыть ГалаТеатр можно только из VCL-потока: функция TerminateAllProcesses вызывает функцию завершения (Terminate) для всех еще активных процессов (ах, эти актеры - им только дай поиграть! - прямо как дети малые). Функция TryToDestroyAllProcesses сначала вызывает TerminateAllProcesses и только если все процессы уже завершены, уничтожает их. Если в процессе уничтожения обнаруживается новый процесс, то ему посылается уведомление о завершении, уничтожение процессов прекращается и функция возвращает False. Такое явление может случиться в том случае, если ГалаПроцесс создал новый процесс и только после этого получил уведомление о завершении. Функция возвращает True, если все процессы ГалаТеатра успешно уничтожены. Аналогично работают функции TerminateGroup и TryToDestroyGroup, но в отличие от предыдущих функций они закрывают и уничтожают только процессы указанной группы и могут быть вызваны не только из VCL-потока, но и из процесса другой группы. Процедура SuspendGroup позволяет одновременно приостановить, а ResumeGroup - активизировать все процессы группы. Процедуры DestroyAllProcesses и DestroyGroup уничтожают все процессы ГалаТеатра или процессы заданной группы и дожидаются полного завершения операции уничтожения. Свойство PriorityClass позволяет изменить класс приоритета всех процессов ГалаТеатра. Свойство NotificationWindow задает окно, в которое будут посылаться уведомления о создании, уничтожении и трассировке процесса. Эта информация полезна при отладке. Если окно не задано, то уведомляющие сообщения не посылаются. Свойство Log выводит отладочную строку в файл протокола (обеспечивая взаимное исключение процессов).

ГалаПроцесс

TGalaProcess = class
protected
  FSuspendedOnStart: Boolean;
  FStackSize:        Integer;

  function  CreateChannel(aEntry: TGalaEntry;
            aGuard: TGalaGuard = nil): TGalaProcessChannel;
  procedure Execute; virtual; abstract;
  procedure OnNormalTermination; virtual;
  procedure OnPrematureTermination; virtual;
  procedure OnUnhandledException(E: Exception); virtual;

  procedure Trace(const S: string);
  procedure Log(const S: string);
  procedure Pause(aTime: Cardinal);
  procedure Send(aMessageId: Cardinal; aData: Pointer = nil;
            aForm: TForm = nil;  aTimeout: Cardinal = INFINITE);
  procedure Accept(aChannel: TGalaProcessChannel;
            aTimeout: Cardinal = INFINITE);
  procedure AlternativeAccept(aChannels: array of TGalaProcessChannel;
            aTimeout: Cardinal = INFINITE);
  procedure Wait(aSignal: TGalaSignal;
            aTimeout: Cardinal = INFINITE);
  function  AlternativeWait(aSignals: array of TGalaSignal;
            aTimeout: Cardinal = INFINITE): Integer;
  procedure WaitCompletion(const aProcesses: array of TGalaProcess;
            aTimeout: Cardinal = INFINITE);

public
  ProcessName:     string;
  FreeOnTerminate: Boolean;

  constructor Create(aGroup: Integer; aParentForm: TForm = nil);
  destructor  Destroy; override;

  procedure Terminate;
  procedure Suspend;
  procedure Resume;

  property Handle: THandle read FHandle;
  property Suspended: Boolean read FSuspended;
  property Terminated: Boolean read FTerminated;
  property Finished: Boolean read FFinished;
  property Priority: Integer read GetPriority write SetPriority;
end;

ГалаПроцесс создается явным вызовом его конструктора Create, которому передается номер группы и ссылка на форму, с которой процесс будет наиболее плотно взаимодействовать. Как правило, это форма, из которой вызван конструктор процесса. Если в программе только одна группа процессов, то номер группы можно задавать константой 1. Ошибка при создании процесса будет возбуждать исключительную ситуацию EGalaObjectCreationFail. Собственно тело процесса программируется в методе Execute, который обязательно должен быть переопределен в наследуемом классе. Если процесс завершается нормально, то он освобождает основные системные ресурсы, но объект процесса продолжает существовать и уничтожается только ГалаТеатром - и только вместе со своей группой. Попытка взаимодействия с завершенным процессом будет возбуждать исключение EGalaProcessWasTerminated. Для принудительного завершения процесса нужно вызвать процедуру Terminate. Действие ее состоит в следующем: она устанавливает флажок Terminated и активизирует внутренний сигнал процесса FTerminationEvent, который выводит процесс из состояния ожидания (если процесс в нем находится). В ходе выполнения метода Execute процесс должен периодически контролировать свойство Terminated и выполнять завершающие действия, например:

while not Terminated do begin
  // действия процесса
end;
// завершающие действия

Принудительное завершение - это очень важное событие и оно учитывается всеми остальными объектами ГалаТеатра. Синхронизация процессов всегда связана с ожиданием, при котором процесс переходит в спящее состояние - ожидание некоторого события. И если это событие уже не произойдет (например, завершился другой процесс, генерирующий это событие), то ожидающий процесс никогда не проснется, что приведет к зависанию программы. Поэтому уход процесса в спящее ожидание всегда охраняется таймаутом (необязательным) и реакцией на принудительное завершение (обязательной). Если в то время, когда процесс находится в спящем состоянии, он получает директиву принудительного завершения, то процесс активизируется и в нем возбуждается исключительная ситуация EGalaPrematureTermination. Если процесс явно ее не перехватывает, то нормальная работа процесса завершается и вызывается виртуальное событие OnPrematureTermination, которое может быть переписано в наследуемом классе. При нормальном завершении вызывается виртуальное событие OnNormalTermination. Конечно, процесс может выполнить нормальные завершающие действия и в методе Execute, но выделение их в отдельное событие более логично. Обычно принудительное завершение ГалаПроцесса находится в компетенции ГалаТеатра (и VCL-потока), но этим правом могут воспользоваться также ГалаПроцессы, если у них есть на это веские основания (приходится мириться с демократией в разумных пределах).

При возникновении необработанной явным образом исключительной ситуации, управление передается обработчику события OnUnhandledException, который выводит информационное сообщение и завершает выполнение ГалаПроцесса. Вы его можете переписать в наследуемом классе для реализации собственной индивидуальной реакции на необработанные исключения.

В конструкторе процесса можно задать:

По ходу работы сам процесс может менять свой приоритет, а также процессы могут менять приоритеты друг друга. Значения приоритетов следующие:

-3 // низший приоритет (Idle)
-2 // на 2 пункта ниже нормального
-1 // на 1 пункт ниже нормального
 0 // нормальный приоритет
 1 // на 1 пункт выше нормального
 2 // на 2 пункта выше нормального
 3 // высший приоритет (Time Critical)

Процесс может приостанавливать свою работу на заданный промежуток времени вызовом метода Pause, которому передается значение времени в миллисекундах. Если во время паузы процессу будет передана директива принудительного завершения, то пауза будет прервана и возбуждена исключительная ситуация EGalaPrematureTermination.

Для взаимодействия с основным VCL-потоком ГалаПроцесс посылает VCL-форме сообщение с помощью метода Send. Этот метод имеет аргументы:

Такой способ взаимодействия с VCL-потоком обусловлен тем, что VCL-компоненты не являются потокобезопасными, то есть попытка одновременного обращения к VCL-компоненту из двух и более потоков приведет к разрушению данных компонента и, как правило, к аварийному прекращению программы. Поэтому необходим взаимоисключающий доступ к VCL-компонентам. Операционная система Windows позволяет сделать такой доступ с помощью функций SendMessage и SendMessageTimeout. Метод Send инкапсулирует вызовы этих функций и упаковку данных в стандартное сообщение Windows. Рандеву с VCL-потоком заключается в посылке окну формы заданного синхронного сообщения. При этом потоки синхронизируются - то есть, если VCL-поток не готов обслужить данный поток (поток посылающего ГалаПроцесса), то он ставится в очередь и выполнение потока будет приостановлено. Если VCL-поток готов, то он выполняет взаимодействие, после чего оба потока будут продолжаться параллельно. В процессе рандеву VCL-поток принимает Windows-сообщение, в поле LParam которого находится указатель на объект данных, получает данные и при необходимости изменяет их, возвращая, таким образом, результат рандеву. Для реализации второй стыковочной части рандеву, форма должна иметь метод такого вида:

procedure OnXXX(var Mes: TMessage); message GM_ON_XXX;

где Mes - это структура, в поле LParam которой записана ссылка на посланные данные. В ходе своего выполнения метод может получить указатель на конкретный тип с помощью, например, такой операции:

with TAnyData(Mes.LParam) do begin
  // действия с данными
end;

где TAnyData - любой тип данных. В качестве данных может выступать ссылка на сам посылающий процесс. В качестве идентификатора сообщения можно использовать константы со значением GM_USER и больше (префикс GM означает - Gala Message). Метод может принимать необязательный аргумент - время ожидания в миллисекундах. Если за указанное время взаимодействие не произошло, то возбуждается исключительная ситуация EGalaTimeout. Этот параметр можно использовать для предотвращения зависания программы при интенсивном взаимодействии ГалаПроцессов с VCL-потоком, при котором формы настолько заняты, что просто перестают реагировать на мышь и клавиатуру.

ГалаПроцесс может ожидать рандеву с другими ГалаПроцессами по каналам, которыми он владеет. Для реализации рандеву по одному каналу ГалаПроцесс вызывает метод Accept, для недетерминированного ожидания по нескольким своим каналам - метод AlternativeAccept, который ожидает рандеву по любому из заданных каналов. Более подробное описание взаимодействия будет дано ниже, при рассмотрении ГалаКаналов. Время ожидания по обоим методам может быть ограничено указанием таймаута. Приведем пример соответствия конструкций ГалаТеатра и языка Ада. Текст на Аде:

entry Entry1;
entry Entry2;
function CanEntry2 return BOOLEAN;
. . .
  select
    accept Entry1;
  or
    when CanEntry2 =>
      accept Entry2;
  or
    delay 0.1;
  end select;
. . .

В этом примере программа ожидает недетерминированного взаимодействия по входам Entry1 и Entry2 с ограничением по времени в 0.1 секунду. Вход Entry2 ожидается только в том случае, если функция CanEntry2 возвращает TRUE. Соответствующий код для Gala-библиотеки:

Entry1, Entry2: TGalaProcessChannel;
procedure DoEntry1(aData: Pointer);
procedure DoEntry2(aData: Pointer);
function  CanEntry2: Boolean;
. . . 
Entry1 := CreateChannel(DoEntry1);
Entry2 := CreateChannel(DoEntry2, CanEntry2);
. . .
AlternativeAccept([Entry1, Entry2], 100);

В отличие от языка Ада, где входы реализуются как процедуры, входы в Gala-библиотеке реализуются как каналы, содержащие соответствующие процедуры и охраняющие функции. В данном случае канал1 вызывает процедуру-обработчик DoEntry1, а канал2 - DoEntry2 и CanEntry2.

Кроме ожидания рандеву по каналам, ГалаПроцессы могут ожидать сигналы. Для ожидания одиночного сигнала ГалаПроцесс вызывает метод Wait. Для альтернативного недетерминированного ожидания нескольких событий (сигналов) ГалаПроцесс вызывает метод AlternativeWait, которому передается массив объектов-сигналов. Время ожидания по обоим методам может быть ограничено заданием таймаута. Объединить альтернативное ожидание и рандеву и сигналов в одном действии невозможно, так они имеют фундаментальные отличия: при рандеву происходит обмен данными, при ожидании - нет, рандеву предполает двух активных партнеров (ГалаПроцессов), а ожидание - только одного. Для реализации такого объединения можно использовать следующий прием: создается промежуточный процесс, ожидающий только сигналы и преобразующий их в сообщения, передаваемые по каналам процессу, выполняющему недетерминированное ожидание.

ГалаКаналы

ГалаКаналы - это исполнители, реализующие взаимодействия между процессами. Каналы ГалаПроцессов и ГалаКонтейнеров внешне полностью совпадают, но имеют различную реализацию:

TGalaContainerChannel = class
public
  procedure Send(aSender: TGalaProcess; aData: Pointer = nil;
                aTimeout: Cardinal = INFINITE);
  property Entry: TGalaEntry read FEntry write FEntry;
  property Guard: TGalaGuard read FGuard write FGuard;
end;

TGalaProcessChannel = class
public
  procedure Send(aSender: TGalaProcess; aData: Pointer = nil;
                aTimeout: Cardinal = INFINITE);
  property Entry: TGalaEntry read FEntry write FEntry;
  property Guard: TGalaGuard read FGuard write FGuard;
end;

Создаются каналы с помощью вызова метода CreateChannel в конструкторах ГалаКонтейнера и ГалаКанала, а уничтожаются автоматически в их деструкторах. Метод CreateChannel у ГалаПроцесса точно такой же, как и у ГалаКонтейнера и имеет аргументы:

Функция должна быть методом ГалаПроцесса или ГалаКонтейнера и иметь тип:

TGalaGuard = function: Boolean of object;

Процедура-обработчик соответствует входу entry для оператора accept, а охраняющая функция - телу оператору when (язык Ада [7]). Обработчик вызывается всегда в режиме взаимного исключения и только в том случае, когда оба взаимодействующих процесса готовы к рандеву и охраняющая функция возвращает True. Отметим, что если охраняющая функция не задана, то это эквивалентно функции, которая всегда возвращает True, то есть вход всегда разрешен.

Метод Send является входом канала. Процесс, инициирующий рандеву, вызывает именно этот метод для передачи в канал сообщения. Метод Send принимает 2 аргумента: ссылку на процесс-отправитель и указатель на бестиповые данные. Если данные не заданы, то используется ссылка на процесс-оправитель. Третий, необязательный, аргумент - таймаута, время, в течение которого клиентский процесс готов ждать рандеву.

Продемонстрируем использование каналов на примере примитивно простого процесса-сервера, реализующего разделяемый несколькими процессами стек данных (целых чисел).

TStack = class(TGalaProcess)
private
  FData: array[0..MAX_STACK] of Integer;
  SP:    Integer; // указатель стека

  procedure DoPush(d: Pointer);
  procedure DoPop(d: Pointer);
  function  CanPush: Boolean;
  function  CanPop: Boolean;

protected
  procedure Execute; override;

public
  Push, Pop: TGalaProcessChannel;

  constructor Create(aGroup: Integer);
end;

var
  Stack: TStack;

// Реализация

constructor TStack.Create(aGroup: Integer);
begin
  inherited Create(aGroup);
  // Создаем каналы
  Push := CreateChannel(DoPush, CanPush);
  Pop  := CreateChannel(DoPop,  CanPop);
  SP   := 0; // Инициализируем указатель стека
end;

procedure TStack.Execute;
begin
  // Пока процесс не завершается принудительно, выполняем
  // альтернативный прием по каналам Push и Pop
  while not Terminated do
    AlternativeAccept([Push, Pop]);
end;

procedure TStack.DoPush(d: Pointer);
begin
  // Ложим значение в стек
  FData[SP] := PInteger(d)^;
  Inc(SP);
end;

procedure TStack.DoPop(d: Pointer);
begin
  // Извлекаем значение из стека
  Dec(SP);
  PInteger(d)^ := FData[SP];
end;

function TStack.CanPush: Boolean;
begin
  // Разрешаем, если стек не полон
  result := SP < MAX_STACK;
end;

function TStack.CanPop: Boolean;
begin
  // Разрешаем, если стек не пуст
  result := SP <> 0;
end;

// фрагмент кода процесса-клиента для работы со стеком:
   var
     i: Integer;
   Stack.Push.Send(self, @i);
   Stack.Pop.Send(self, @i);

Этот пример был бы еще проще, если стек реализовать не как ГалаПроцесс, а как ГалаКонтейнер - но я использовал процесс, чтобы дополнительно показать реализацию метода Execute и использование метода AlternativeAccept. Фактически контейнер и реализует недетерминированный прием по всем своим каналам, только более простым способом, чем это делает процесс.

ГалаСигналы

Эта группа примитивов инкапсулирует средства синхронизации, предоставляемые операционной системой Windows. Все объекты-сигналы порождаются от базового класса TGalaSignal:

TGalaSignal = class
protected
  FHandle: THandle;

  procedure AfterWaiting(p: TGalaProcess); virtual;

public
  constructor Create(aHandle: THandle);

  procedure Wait(aProcess: TGalaProcess;
            aTimeout: Cardinal = INFINITE);
  property  Handle: THandle read FHandle;
end;

Метод Wait - общий для всех ГалаСигналов и позволяет ГалаПроцессам ожидать наступления события, связанного с системным объектом Handle с ограничением по времени. Отметим, что метод отслеживает факт получения ожидающим процессом директивы принудительного завершения и активизирует процесс с возбуждением исключительной ситуации EGalaPrematureTermination. При возникновении таймаута возбуждается исключение EGalaTimeout.

TGalaEvent = class(TGalaSignal)
public
  constructor Create(aManualReset: Boolean;
                     aInitialState: Boolean = False;
                     aName: PChar = nil);
  procedure SetState(aState: Boolean); virtual;
  procedure Pulse; virtual;
end;

Конструктор ГалаСобытия имеет 3 аргумента: режим сброса (ManualReset), начальное состояние и необязательное имя. Если ManualReset = False, то событие автоматически становится несигнализирующим после вызова ожидающим процессом метода Wait. Если ManualReset = True, то событие остается сигнализирующим и его нужно явно сбрасывать вызовом SetState(False). Имя события имеет смысл только при взаимодействии нескольких Windows-программ, это относится также к ГалаМьютексам и ГалаСемафорам. Метод Pulse позволяет установить событие и потом сразу сбросить.

TGalaMutex = class(TGalaSignal)
public
  constructor Create(aOwned: Boolean; aName: PChar = nil);
  procedure Release; virtual;
end;

ГалаМьютекс инкапсулируют понятие одиночного ресурса, которым может владеть в один момент времени только один процесс. Слово "Mutex" происходит из соединения двух слов - "Mutual" и "Exclusive", что означает взаимно-исключающее. При создании мьютекса можно сразу завладеть ресурсом, если аргумент aOwned равен True. Для захвата ресурса процесс вызывает метод Wait, для освобождения ресурса - метод Release. Возможен вложенный захват ресурса, то есть, процесс захвативший мьютекс, может захватить его еще раз, но сколько раз захвачен мьютекс, столько раз его нужно освободить.

TGalaSemaphore = class(TGalaSignal)
public
  constructor Create(aMaxCount: Integer;
              aInitialCount: Integer = -1;
              aName: PChar = nil);
  procedure Release(aCount: Integer = 1); virtual;
end;

ГалаСемафор похож на ГалаМьютекс, но, в отличие от мьютекса, инкапсулируют понятие множественного, а не одиночного, ресурса. При создании семафора задается количество единиц разделяемого ресурса (aMaxCount). При создании можно также захватить одну или больше единиц ресурса (aInitialCount), если аргумент не задан, то все ресурсы свободны. Для захвата ресурса процесс вызывает метод Wait, для освобождения ресурса - метод Release. За один раз можно освободить одну или несколько единиц ресурса, но захватить можно только одну за один раз. Пример использования семафора - торговый прилавок с тремя продавцами. Одновременно может быть обслужено не более 3х клиентов: остальные клиенты должны ждать освобождения ресурса - продавца.

TGalaChangeNotification = class(TGalaSignal)
public
  constructor Create(const aDirectory: string;
                     aSubtree: Boolean; aFilter: Cardinal);
  procedure NewDir(const aDirectory: string); virtual;
end;

ГалаУведомление инкапсулирует Windows-уведомления о различных событиях-изменениях, происходящих в файловой системе. Конструктор имеет параметры: имя каталога, для которого производится уведомление о событиях, флаг, указывающий, нужно ли отлеживать события в подкаталогах и фильтр событий, указывающий, какие конкретно события нужно отслеживать. Для ожидания события процесс просто вызывает метод Wait. В процессе отслеживания можно изменить имя каталога методом NewDir.

TGalaDeque = class(TGalaSignal)
public
  constructor Create(aDequeSize: Integer = 0);
  function  Count: Integer; virtual;
  procedure PutLast(aObject: TObject); virtual;
  function  GetLast: TObject; virtual;
  procedure PutFirst(aObject: TObject); virtual;
  function  GetFirst: TObject; virtual;
  function  PeekFirst: TObject; virtual;
  function  PeekLast: TObject; virtual;
end;

ГалаДек - это универсальная очередь, которая может быть использована для передачи асинхронных сообщений между процессами, между контейнерами, между процессом и VCL-потоком, процессом и контейнером. Сообщение - это объект любого типа, порождаемого от TObject. Передача сообщения не связана с синхронизацией и ожиданием. Очередь может быть ограниченной или неограниченной длины. Попытка записи в заполненную очередь будет вызывать исключение EGalaOverflow. Когда очередь непуста, то состояние ГалаДека - сигнализирующее, если в очереди нет ни одного сообщения, то состояние ГалаДека - несигнализирующее. Процессы могут ждать поступления сообщений в очередь, используя обычные для всех ГалаСигналов методы. Сообщения могут посылаться либо в начало, либо в конец очереди и извлекаться также либо из начала, либо из конца очереди. Ответственным за создание объекта-сообщения является процесс, посылающий сообщение, а ответственным за использование и уничтожение объекта-сообщения - принимающий процесс. Кроме методов, связанных с изменением состояния очереди, есть два инспектирующих метода (Peek), которые позволяют проконтролировать первый или последний элемент.

TGalaMessage = class(TGalaSignal)
public
  constructor Create;
  destructor  Destroy; override;
  procedure Release; virtual;
  procedure Send(aWnd: THandle; aMessage: Integer); virtual;
  procedure Reply; virtual;
end;

Гала-сообщение - это Гала-сигнал, который можно использовать для передачи данных и команд и последующего ожидания ответа. Он удобен для взаимодействия Гала-процесса с другими объектами, не являющимися Гала-процессами, например VCL-потоком или очередями. Отличительная особенность Гала-сообщения состоит в том, что этот объект осуществляет подсчет ссылок и уничтожается самостоятельно (подобно COM-объектам). Использование Гала-сообщения позволяет послать ответное сообщение даже в том случае, если ожидающий процесс уже уничтожился. Метод Send посылает сообщение окну и увеличивает счетчик ссылок, метод Reply уведомляет о получении Гала-сообщения установкой его в активное состояние (которое сбрасывается при успешном ожидании) и уменьшает счетчик ссылок. Начальное состояние счетчика ссылок равно 1. Принудительное уменьшение счетчика ссылок выполняет метод Release - его вызов означает, что объект больше не нужен. Если счетчик ссылок становится равным нулю, то объект самоуничтожается.

Кроме указанных примитивов в Gala-библиотеке определен служебный класс ГалаЗамок (TGalaLatch), инкапсулирующий понятие критической секции. Он используется внутри многих объектов библиотеки и может быть использован как самостоятельный в тех случаях, когда требуется очень высокая степень эффективности и за это можно заплатить трудоемкостью и сложностью при программировании. Объект имеет два метода: Lock и Unlock. Процесс может заблокировать (закрыть) замок методом Lock - в этом случае все остальные процессы, попытавшиеся вызвать метод Lock будут приостановлены до того момента, когда захвативший процесс не вызовет метод Unlock. При использовании замка нужно быть очень осторожным, так как закрытый замок не дает ожидающим процессам никакого шанса на продолжение. Использование замка практически всегда нужно выполнять в блоке try-finally, например:

var List: TGalaLatch;
. . . 
List.Lock;
try
  // действия со списком
finally
  List.Unlock;
end;

Некоторые объекты VCL имеют встроенную поддержку разделяемого доступа, например: TThreadList, TCanvas.

Примеры использования ГалаБиблиотеки

Дальнейшее описание библиотеки будем проводить на примерах-спектаклях. В этом разделе вы узнаете не только о том, как можно использовать ГалаБиблиотеку, но также о типичных задачах, возникающих при параллельном программировании и о том, как их можно решить с помощью ГалаТеатра. Исходный текст библиотеки находится в файлах Gala.pas, GalaContainer.pas, GalaSignals.pas. Исходный код примеров смотрите в соответствующих файлах с именами ExampleXX в подкаталоге Examples. А здесь я буду давать только анонс-афишу и рецензию.

Несколько предварительных замечаний о том, как организованы примеры. Для всех примеров существует одна-единственная программа (файлы ExamplesForm.pas и ExamplesForm.dfm). Процессы каждого примера образуют группу, причем можно одновременно запускать не только несколько примеров, но также несколько экземпляров одного и того же примера. Главная форма программы имеет 3 элемента - выпадающий список, содержащий имена примеров, кнопку Старт и таблицу состояния ГалаПроцессов, в которой отражаются все активные процессы и их отладочные сообщения.

Главная форма регистрируется вызовом GalaTheater.NotificationWindow и имеет три метода - обработчика трех стандартных уведомляющих ГалаСообщений:

procedure OnProcessStart(var Mes: TMessage);
            message GM_PROCESS_ START;
procedure OnProcessTrace(var Mes: TMessage);
            message GM_PROCESS_TRACE;
procedure OnProcessTermination(var Mes: TMessage);
            message GM_PROCESS_TERMINATE;

Спектакль 01

Название
Example01: Самый простой - проще не бывает
Действующие лица
Циклический процесс Счетчик
Декорации
Нет
Сценарий
Процесс TCounter каждые 100 миллисекунд передает форме сообщение, содержащее монотонно увеличивающееся число. А форма-декорация его отображает в TLabel. Спектакль начинается с нажатия кнопки Старт и заканчивается нажатием одноименной кнопки Финиш. Ну чем ни таймер?
Цель примера
Показать взаимодействие ГалаПроцессов с VCL-компонентами.

Собственно процесс выглядит следующим образом:

procedure TProcess01.Execute;
var
  Counter: Integer;
begin
  Counter := 0;
  while not Terminated do begin
    Inc(Counter);
    Send(GM_DRAW_01, @Counter);
    Pause(100);
  end;
end;

В цикле (вплоть до завершения процесса по кнопке Финиш) значение счетчика увеличивается на 1, вызывается метод Send, передающий форме сообщение с номером GM_DRAW_01, а в качестве параметра - указатель на переменную-счетчик. После этого процесс приостанавливается на 100 миллисекунд. Для обработки сообщения в форме имеется метод:

procedure TForm01.OnDraw(var Mes: TMessage); message GM_DRAW_01;
begin
  LabelCounter.Caption := IntToStr(PInteger(Mes.LParam)^);
end;

Метод OnDraw извлекает значение счетчика из Windows-сообщения, преобразует в строку и отображает.

Для создания процесса сначала запрашивается динамический номер группы (так как можно запускать несколько экземпляров примера параллельно), а потом уже вызывается конструктор процесса. Для завершения процесса просто вызывается процедура уничтожения группы с ожиданием:

procedure TForm01.ButtonStartStopClick(Sender: TObject);
begin
  if Group = 0 then begin
    Group := GalaTheater.GetNewGroup;
    TProcess01.Create(Self, Group);
  end
  else begin
    GalaTheater.DestroyGroup(Group);
    Group := 0;
  end;
end;

Спектакль 02

Название
Example02: Размножающиеся процессы
Действующие лица
Шарики - динамически создаваемые и уничтожаемые процессы
Декорации
Нет
Сценарий
Процессы ведут себя как движущиеся шарики (квадратики), отражающиеся от границ поля. Каждый шарик имеет свой цвет, скорость, время жизни и период размножения (случайные величины). Каждый шарик пытается создать себе подобного, но может размножаться только в том случае, если находится на некотором расстоянии от границы поля и общее число шариков не превышает заданного максимума (который можно динамически изменять).
Цель примера
Показать следующие возможности библиотеки:
  1. динамическое создание и уничтожение процессов;
  2. защита от перегрузки системы при большом количестве процессов;
  3. реакция на нормальное и принудительное завершение;

Жизненный цикл каждого процесса таков (в тексте опускается несущественный для параллельного программирования текст, а опущенные параметры и части кода записываются на псевдокоде):

procedure TProcess02.Execute;

begin
  // шарику даем меньший приоритет, чем VCL-процессу
  Priority := -1;
  // Посылка форме сообщения о создании нового процесса, поскольку
  // именно форма отслеживает общее число процессов
  Send(GM_PROCESS_START);
  while (not Terminated) and не_истекло_время_жизни do begin
    расчет_и_проверка_новых_координат;
    if настало_время_размножения then begin
      расчет_координат_потомка;
      if создание_потомка_возможно then begin
        try
          // Создание потомка
          TProcess02.Create(Group, ParentForm, координаты, направление);
        except
          // Недостаточно системных ресурсов
          on EGalaProcessCreationFail do
            Beep;
        end;
      end;
    end;
    // Отображение шарика на форме
    try
      // Время ожидания рандеву с VCL-потоком ограничено. Если
      // форма перегружена запросами и не может нарисовать шарик
      // за 200 мс, то шарик умирает (от тоски, потому что летать
      // не может). В качестве параметров процесс передает форме
      // ссылку на самого себя
      Send(GM_DRAW_02, Self, ParentForm, 200);
      // Перемещение шарика на новую позицию
      PrevPoint := NextPoint;
      // Пауза для того, чтобы шарики бегали не так быстро
      Pause(Speed);
    except
      // Рандеву с VCL-потоком не состоялось,
      // процесс завершает сам себя (увы, в жизни всяко бывает)
      on EGalaTimeout do
        Terminate;
    end;
  end;
end;

Действия, совершаемые при нормальном и аварийном завершении в данном случае одинаковы - шарик стирает себя с экрана и уменьшает общее число шариков:

procedure TProcess02.OnPrematureTermination;
begin
  OnNormalTermination;
end;

procedure TProcess02.OnNormalTermination;
begin
  Color := clWhite;
  // Стираем себя
  Send(GM_DRAW_02);
  // Посылаем форме уведомление о своем завершении
  Send(GM_PROCESS_TERMINATE);
end;

Форма отвечает на сообщения процессов тремя процедурами - обработчиками сообщений: GM_PROCESS_START, GM_PROCESS_TERMINATE и GM_DRAW_02:

procedure TForm02.OnStart(var Mes: TMessage);
begin
  Inc(Count);
  LabelProcessCount.Caption := IntToStr(Count);
end;

procedure TForm02.OnTermination(var Mes: TMessage);
begin
  Dec(Count);
  LabelProcessCount.Caption := IntToStr(Count);
end;

procedure TForm02.OnDraw(var Mes: TMessage);
begin
  with TProcess02(Mes.LParam) do begin
    отрисовка_шарика;
  end;
end;

Если внимательно наблюдать за поведением шариков, особенно при их большом количестве, то можно заметить следующее - иногда количество шариков превышает заданный максимум. Почему это происходит? Если посмотреть на то, как процесс создается, можно заметить, что сначала процесс-родитель запрашивает у формы - можно ли создавать потомка? Затем процесс создает потомка и уже сам потомок дает форме сообщение о создании, которое увеличивает действительное число процессов. Т. е. между проверкой возможности создания и реальным созданием проходит время, в которое операционная система может перейти к выполнению другого процесса-шарика, который также готов к размножению. То есть, подтверждение на создание потомка могут получить несколько процессов-родителей прежде, чем потомки действительно будут созданы. Я сознательно не стал усложнять алгоритм подтверждения, чтобы продемонстрировать проблемы, которые могут возникнуть в системе с вытесняющей многозадачностью. Корректно эту проблему в данном примере можно было бы решить с помощью счетного семафора TGalaSemaphore или с помощью другого параллельного процесса. Такой процесс регистрировал бы запрос на создание и уменьшал бы значение счетчика - но это потребует также обработки ситуации, в которой процесс-родитель получил подтверждение на создание потомка, но не смог его создать.

Есть еще одна проблема, которую можно заметить при внимательном наблюдении за большим количеством процессов. Если нажать на кнопку Финиш, которая завершает всю группу шариков, то иногда все шарики удаляются, и вдруг, неожиданно, появляется один или несколько новых шариков, которые уничтожаются повторным нажатием на кнопку Финиш. Причина "ошибки" та же, что и предыдущей - вытесняющая многозадачность. Процесс-родитель может обнаружить, что он должен принудительно завершиться уже после того, как создал потомка, а потомок не успеет получить сообщение о принудительном завершении, так как он только начал создаваться и еще не включен в список активных процессов, которым посылается сигнал принудительного завершения. Как и в предыдущем случае, я не стал усложнять алгоритм. Решением этой проблемы было бы, например, следующее - при создании процесс запрашивает у формы признак существования данной (своей) группы и, если группа уже не существует (номер группы равен 0), то процесс сразу завершается.

Спектакль 03

Название
Example03: Буфер между процессами
Примечание
Этот и три последующих примера внешне очень похожи - формы всех примеров наследуются от базовой формы и все процессы этих примеров порождаются от базовых процессов (файлы PMU.pas и PMU.dfm)
Взаимодействующие лица
  • Циклический процесс Плюс - генерирует последовательность положительных целых чисел (от нуля с шагом 1). Промежуток времени между генерацией двух соседних чисел задается случайной паузой.
  • Циклический процесс Минус - тезка предыдущего, генерирует последовательность отрицательных целых чисел.
  • Циклический процесс Пользователь (потребитель) - служит получателем данных от процессов-производителей Плюс и Минус и выводит полученные данные на экран. Процесс может принимать данные только раз в полсекунды.
  • ГалаКонтейнер с незамысловатым именем Буфер - может хранить посланные ему от производителей числа и отдавать их потребителю.
Декорации
Нет
Сценарий
Очень прост - двое производят, третий потребляет, а четвертый работает складом готовой продукции. Интрига сюжета в том, что Буфер имеет ограниченную емкость, а все три процесса могут создаваться независимо друг от друга и фактически ничего не знают друг о друге. Процессы-производители могут передавать свои данные только в том случае, если в буфере есть место, процесс-потребитель может что-то взять из буфера, если он не пуст, в остальных случаях процессы вынуждены ждать.
Цель примера
Показать взаимодействие нескольких асинхронных процессов с помощью разделяемого ресурса - буфера, который и является ключевой фигурой в сюжете.

Рассмотрим буфер подробнее:

TBuffer03 = class(TGalaContainer)
protected
  Buffer: array[TBufferSize03] of Integer;
  Count:  Integer;
  PPut:   TBufferSize03;
  PGet:   TBufferSize03;

  function  CanPut: Boolean;
  procedure DoPut(aData: Pointer);
  function  CanGet: Boolean;
  procedure DoGet(aData: Pointer);

public
  Put: TGalaContainerChannel;
  Get: TGalaContainerChannel;

  constructor Create;
end;

Буфер предоставляет для наружного использования два своих канала - один для записи, другой для чтения. Вся реализация буфера скрыта в приватной области и включает собственно буфер данных, количество записанных данных и индексы записи и чтения. Создаются каналы в конструкторе контейнера, а уничтожаются автоматически при завершении процесса. Сам контейнер создается при создании формы (то есть, существует вечно, в отличие от динамически создаваемых процессов).

constructor TBuffer03.Create;
begin
  inherited Create;
  Count := 0;
  PPut  := Low(TBufferSize03);
  PGet  := Low(TBufferSize03);
  Put   := CreateChannel(DoPut, CanPut);
  Get   := CreateChannel(DoGet, CanGet);
end;

Все действия буфера сосредоточены в процедурах и функциях каналов и достаточно понятны без комментариев, так как реализуется хрестоматийный алгоритм кольцевого буфера:

function TBuffer03.CanPut: Boolean;
begin
  result := Count < MAX_BUFFER_COUNT_03;
end;

procedure TBuffer03.DoPut(aData: Pointer);
begin
  Buffer[PPut] := PInteger(aData)^;
  Inc(PPut);
  if PPut > High(TBufferSize03) then
    PPut := Low(TBufferSize03);
  Inc(Count);
end;

function TBuffer03.CanGet: Boolean;
begin
  result := Count > 0;
end;

procedure TBuffer03.DoGet(aData: Pointer);
begin
  PInteger(aData)^ := Buffer[PGet];
  Inc(PGet);
  if PGet > High(TBufferSize03) then
    PGet := Low(TBufferSize03);
  Dec(Count);
end;

Действия процессов-производителей по взаимодействию с буфером тривиальны - они посылают буферу сообщение в канал, после чего увеличивают свое значение и выполняют случайную задержку. Посылка значения в канал Put выполняется так:

Buffer.Put.Send(Self, @Counter);

Действия потребителя также просты, но мы рассмотрим их подробнее:

procedure TProcessUser03.Execute;
var
  i: Integer;
begin
  while not Terminated do begin
    Trace('Жду');
    Buffer.Get.Send(Self, @i);
    Send(GM_USER_PM, @i);
    Trace('Потребляю');
    Pause(500);
  end;
end;

Для приема данных из буфера, процесс посылает ему сообщение в канал Get, передавая указатель на значение. Для вывода на экран полученных данных, процесс взаимодействует с формой. В этом примере вводится новое понятие - отладочная трассировка, которая выполняется процедурой Trace. Эта процедура выводит указанную строку в список процессов главной формы примеров. Запускайте в разном сочетании процессы и наблюдайте их состояния. Good luck.

Спектакль 04

Название
Example04: Взаимодействие процессов по одному каналу
Действующие лица
Те же, там же, действие второе… но их осталось только трое...
Сценарий
В этом спектакле заняты только 3 актера - процессы, а буфера взаимодействия между ними нет. Здесь процессы взаимодействуют между собой непосредственно, то есть процессы-производители посылают сообщения непосредственно в канал процесса-потребителя. Процесс-потребитель играет роль сервера, а производители - роли клиентов. Для всех своих клиентов (и их могло быть не два, а сколько угодно) сервер предоставляет один общий канал. Серверу не обязательно знать о своих клиентах, но клиентам необходимо знать о сервере и его канале. Поскольку клиенты не могут функционировать без сервера, то при закрытии сервера они также закрываются.
Цель примера
Продемонстрировать взаимодействие процессов по типу "многие к одному" с помощью канала ГалаПроцесса.

Для процессов-производителей ситуация практически не изменилась по сравнению с предыдущим примером. Для них безразлично в чей канал посылать сообщение, единственное отличие состоит в том, что серверный процесс в отличие от буфера может существовать или не существовать. Эта ситуация в клиентских процессах обрабатывается следующим образом:

if Assigned(User) then begin
  try
    User.Put.Send(Self, @Counter);
  except
    on E: EGalaProcessWasTerminated do
      Terminate;
  end;
end;

Здесь интересно то, что наличие сервера проверяется в 2 этапа - взаимодействие выполняется только в том случае, если сервер есть, но если при взаимодействии оказалось, что сервер уже завершил свою работу, то клиенты также завершают свою работу. Внимательный читатель, анализируя пример из файла PMU.pas, может заметить такую "непонятность" - при завершении сервера клиент завершаются так:

if Assigned(Plus) then begin
  Plus.Terminate;
  Plus := nil;
end;

Но ведь завершение процесса не означает его уничтожения! Кажется, что в данном случае мы теряем ссылку на завершенный процесс и уже не можем уничтожить сам объект процесса. На самом деле это не так - при своем завершении процесс переводится из очереди активных процессов в очередь завершенных и уничтожается только при завершении группы. Таким образом, при многократном открытии-закрытии процессов в очереди завершенных процессов появится несколько экземпляров одного и того же типа одной и той же группы, которые уничтожатся только при закрытии формы, когда будет уничтожаться вся группа.

Теперь рассмотрим подробнее процесс-сервер:

constructor TProcessUser04.Create(aGroup: Integer; aParentForm: TForm);
begin
  inherited Create(aGroup, aParentForm);
  Put := CreateChannel(DoPut);
end;

procedure TProcessUser04.DoPut(aData: Pointer);
begin
  Send(GM_USER_PM, aData);
  Trace('Потребляю');
  Pause(500);
end;

procedure TProcessUser04.Execute;
begin
  while not Terminated do begin
    Trace('Жду');
    Accept(Put);
  end;
end;

В данном случае канал содержит только канальную процедуру и не содержит охраняющей функции, так как в данном случае канал всегда доступен. Канальная процедура просто передает значение для индикации в форму и имитирует занятость процесса паузой. Тело сервера очень просто - в цикле выполняется процедура приема из канала.

Спектакль 05

Название
Example05: Взаимодействие процессов с альтернативой
Сценарий
Этот пример очень похож на предыдущий, но для каждого клиента у сервера выделен собственный канал.
Цель примера
Продемонстрировать использование оператора недетерминированного ожидания по нескольким каналам. В данном примере их два, но может быть сколько угодно.

Клиентские процессы здесь практически такие же, как в предыдущем примере, но с маленьким отличием - каждый работает со своим каналом. Серверный процесс также практически не изменился. Добавился еще один канал, а операция приема из канала заменилась на операцию недетерминированного приема:

constructor TProcessUser05.Create(aGroup: Integer; aParentForm: TForm);
begin
  inherited Create(aGroup, aParentForm);
  PutPlus  := CreateChannel(DoPutPlus);
  PutMinus := CreateChannel(DoPutMinus);
end;

procedure TProcessUser05.DoPutPlus(aData: Pointer);
begin
  Send(GM_USER_PM, aData);
  Trace('Потребляю +');
  Pause(500);
end;

procedure TProcessUser05.DoPutMinus(aData: Pointer);
begin
  Send(GM_USER_PM, aData);
  Trace('Потребляю -');
  Pause(500);
end;

procedure TProcessUser05.Execute;
begin
  while not Terminated do begin
    Trace('Жду');
    AlternativeAccept([PutPlus, PutMinus]);
  end;
end;

Спектакль 06

Название
Example06: Взаимодействие процессов с условной альтернативой
Сценарий
Этот пример аналогичен предыдущему, но прием данных от каждого клиента зависит от некоторого условия, в данном случае от состояния флажка CheckBox.
Цель примера
Продемонстрировать использование оператора условного недетерминированного ожидания.

Основное отличие от предыдущего примера состоит в том, что каналы имеют не только канальные процедуры (входы рандеву), но и охраняющие функции. Кроме того, в этом примере демонстрируется недетерминированное ожидание с ограничением по времени. Клиенты полностью аналогичны предыдущему примеру, а сервер наследуется от сервера предыдущего примера:

TProcessUser06 = class(TProcessUser05)
public
  constructor Create(aGroup: Integer; aParentForm: TForm);

protected
  procedure Execute; override;
  function  CanPutPlus: Boolean;
  function  CanPutMinus: Boolean;
end;

constructor TProcessUser06.Create(aGroup: Integer; aParentForm: TForm);
begin
  inherited Create(aGroup, aParentForm);
  // поскольку каналы были созданы в базовом классе, здесь мы
  // только задаем для каналов охраняющие функции
  PutPlus.Guard  := CanPutPlus;
  PutMinus.Guard := CanPutMinus;
end;

function TProcessUser06.CanPutPlus: Boolean;
begin
  result := (ParentForm as TForm06).EnablePlus;
end;

function TProcessUser06.CanPutMinus: Boolean;
begin
  result := (ParentForm as TForm06).EnableMinus;
end;

procedure TProcessUser06.Execute;
begin
  while not Terminated do begin
    Trace('Жду');
    try
      AlternativeAccept([PutPlus, PutMinus], 2000);
    except
      on E: EGalaTimeout do begin
        Trace('Таймаут');
        Pause(500);
      end;
      else
        raise;
    end;
  end;
end;

Если сервер не получает входных сообщений в течение 2 секунд, то возбуждается исключительная ситуация, о которой сервер сообщает с помощью трассировочного сообщения. Обратите внимание на следующее - если исключение не EGalaTimeout, то происходит повторное возбуждение оператором raise. Это нужно делать всегда, чтобы не потерять других возможных исключений.

Спектакль 07

Название
Example07: Пять обедающих философов (возможность дедлока)
Примечание
Этот и три последующих примера внешне очень похожи - формы всех примеров наследуются от базовой формы и все процессы этих примеров порождаются от базовых процессов (файлы Phil5.pas и Phil5.dfm)
Сценарий
Задача о пяти обедающих философах - это классическая задача параллельного программирования, которую придумал замечательный программист и ученый Эдсгер Дейкстра [5]. Сюжет таков: в давние времена один богатый филантроп пожертвовал свой капитал на учреждение некоего пансиона, чтобы дать пристанище пяти знаменитым философам. У каждого философа была своя комната, где он мог предаваться размышлениям. Была у них и общая столовая с круглым столом, вокруг которого стояло пять стульев, каждый помечен именем того философа, которому он предназначался. Звали философов Phil1, Phil2, Phil3, Phil4 и Phil5, и за столом они располагались в этом же порядке против часовой стрелки. Слева от каждого философа лежала золотая вилка, а в центре стола стояла большая миска спагетти, содержимое которой постоянно пополнялось. Предполагалось, что большую часть времени философ проводил в размышлениях, но, почувствовав голод, шел в столовую, садился на свой стул, брал слева от себя вилку и приступал к еде. Но такова уж сложная природа спагетти, что их не донести до рта без помощи второй вилки. Поэтому философу приходилось брать вилку и справа от себя. Закончив трапезу, он клал на место обе вилки, выходил из-за стола и возвращался к своим размышлениям. Разумеется, одной вилкой философы могли пользоваться только по очереди. Если же вилка требовалась другому философу, ему приходилось ждать, пока она освободится [5]. В этой простой на вид задаче встречается несколько подводных камней, которые можно назвать так - проблемы доступа к ограниченному числу ресурсов. В этом примере задача просто ставится, но никак не решается.
Действующие лица
Пять философов. Жизнь каждого философа представляет собой повторение цикла из семи событий:
  1. размышляет,
  2. хочет кушать,
  3. садится,
  4. берет левую вилку,
  5. берет правую вилку,
  6. ест,
  7. кладет обе вилки
Пять вилок. Каждая вилка - существо в высшей степени пассивное. Роль вилки проста - ее неоднократно берет и кладет кто-нибудь из соседних с ней философов.
Цель этого и трех следующих примеров
Не только в том, чтобы показать, как с помощью ГалаБиблиотеки решается классическая задача параллельного программирования, но и продемонстрировать использование понятий, которые не встретились в предыдущих примерах: массив процессов, ГалаМьютексы, массив ГалаМьютексов, одновременный запуск процессов, отображение состояния параллельных процессов в VCL-форме без непосредственного взаимодействия процесса и формы.

Декларация вилок и философов выглядит так:

TFork = class(TGalaMutex)
protected
  procedure AfterWaiting(p: TGalaProcess); override;

public
  Owner: TGalaProcess;

  procedure Release; override;
end;

TPhilosopher = class(TGalaProcess)
protected
  Index:               TPhilIndex;
  LeftFork, RightFork: TFork;
  State:               TPhilState;

  procedure  NewState(aState: TPhilState; aTime: Cardinal = 0);
  procedure  Meditate;
  procedure  WantToEat;
  procedure  Sit;
  procedure  Eat;
  procedure  BePerplexed;
  procedure  GetLeftFork(aTimeout: Cardinal = INFINITE);
  procedure  GetRightFork(aTimeout: Cardinal = INFINITE);
  procedure  PutLeftFork;
  procedure  PutRightFork;

public
  constructor Create(aGroup: Integer;
              aParentForm: TForm; aIndex: TPhilIndex);
end;

Каждый философ имеет свой индекс (свой стул), состояние и индексы своих вилок. Для удобства последующего программирования, реализация действий философов (хотя эти действия тривиально просты) сосредоточена в соответствующих процедурах.

constructor TPhilosopher.Create(aGroup: Integer;
  aParentForm: TForm; aIndex: TPhilIndex);
begin
  inherited Create(aGroup, aParentForm);
  Index := aIndex;
  // Узнавание правой вилки
  RightFork := (ParentForm as TFormPhil5).Fork[Index];
  // Узнавание левой вилки
  if Index = High(TPhilIndex) then
    LeftFork := (ParentForm as TFormPhil5).Fork[Low(TPhilIndex)]
  else
    LeftFork := (ParentForm as TFormPhil5).Fork[Succ(Index)];
  // Начальное состояние философа - думает, размышляет
  State := stToMeditate;
  // Даем философу приличное имя
  ProcessName := 'Философ ' + IntToStr(Index);
  // Это действие совсем необязательно, размер стека
  // задается для полноты картины
  FStackSize := 8000;
  // Процесс при своем создании не активизируется
  FSuspendedOnStart := True;
end;

Изменение состояний философов выполняется с помощью вспомогательных процедур (для ясности изложения). Основное назначение - установка состояния, но дополнительно контролируется также получение директивы принудительного завершения, делается отладочная трассировка и пауза, которую философ выполняет после изменения состояния.

procedure TPhilosopher07.Execute;
begin
  while True do begin
    Meditate;     // думает
    WantToEat;    // хочет есть
    Sit;          // садится
    GetLeftFork;  // берет левую вилку
    GetRightFork; // берет правую вилку
    Eat;          // ест
    PutRightFork; // кладет правую вилку
    PutLeftFork;  // кладет левую вилку
  end;
end;

Объекты философов и вилок декларируются так:

Philosopher: array[TPhilIndex] of TPhilosopher;
Fork:        array[TPhilIndex] of TFork;

a создаются при создании формы следующим образом:

procedure TFormPhil5.FormCreate(Sender: TObject);
var
  i: TPhilIndex;
begin
  Group := GalaTheater.GetNewGroup;
  // сначала создаем вилки, так как они сразу же
  // потребуются философам
  for i := Low(TPhilIndex) to High(TPhilIndex) do
    Fork[i] := TFork.Create(False);
  // Потом создаем философов
  for i := Low(TPhilIndex) to High(TPhilIndex) do
    Philosopher[i] := CreatePhilosopher(i);
  // И, наконец, активизируем сразу всех философов
  GalaTheater.ResumeGroup(Group);
end;

Вилки реализуются ГалаМьютексами - это наиболее адекватная и простая идея: мьютексом в один момент времени может владеть только один процесс (или мьютекс может быть свободен). Другой процесс, пожелавший захватить мьютекс, вынужден будет ждать.

Отрисовка состояния всех вилок и философов выполняется в обработчике события таймера формы - через каждые 100 мс картинка, отображающая состояния будет меняться. Все очень просто и без всяких взаимодействий.

Спектакль 08

Название
Example08: Пять обедающих философов (решение - таймаут)
Сценарий
Предположим, что все философы проголодаются примерно в одно и то же время. Все они сядут, все возьмут в левую руку вилку, все потянутся за второй - которой уже нет на месте. В столь недостойной ситуации всех их неизбежно ожидает голодная смерть. Несмотря на то, что каждый из участников способен к дальнейшим действиям, ни одна пара участников не в состоянии договориться о том, какое действие совершать следующим. Конец нашей истории, однако, не столь печален. Как только опасность была обнаружена, было предложено много способов ее избежать, учитывая, что покупка новых вилок исключается. В этом примере дается первое (и самое очевидное) решение задачи - философ может держать вилку только некоторое время, потом он устает и ложит ее обратно. В это время рядом сидящий философ может ее взять. Несмотря на простоту и очевидность такого решения в нем есть возможность конфуза - предположим, что все они устают одновременно, ложат вилки, потом снова берут и так далее. То есть, вместо статического дедлока, философы попадают в ситуацию динамического дедлока.
procedure TPhilosopher08.Execute;
begin
  while True do begin
    Meditate;
    WantToEat;
    Sit;
    repeat
      repeat
        try
          // ожидает первую вилку не более 7 секунд
          GetRightFork(7000);
        except
          on E: EGalaTimeout do
            BePerplexed // недоумевает
          else
            raise;
        end;
      until RightFork.Owner = Self;
      try
        // ожидает вторую вилку не более 7 секунд
        GetLeftFork(7000);
      except
        on E: EGalaTimeout do begin
          PutRightFork; // кладет вилку
          BePerplexed;  // недоумевает
        end
        else
          raise;
      end;
    until LeftFork.Owner = Self;
    Eat;
    PutLeftFork;
    PutRightFork;
  end;
end;

Спектакль 09

Название
Example09: Пять обедающих философов (решение - слуга)
Сценарий
В этом примере дается другие решение задачи, не имеющее недостатка предыдущего решения - появление слуги. (Решение предложено Карелом С. Шолтеном). В обязанности случи входит сопровождение каждого философа за стол и из-за стола. Слуге дается секретное указание следить за тем, чтобы за столом никогда не оказывалось более четырех филосов одновременно. Поскольку вилок в этой ситуации всегда больше, то опасности дедлока нет. Хотя появляется другая опасность. Предположим, что левая рука у сидящего философа весьма медлительна, в то время как его левый сосед в высшей степени проворен. Прежде чем философ дотянется до своей левой вилки, его левый сосед быстро вступает в дело, садится, хватает обе вилки и долго ест. Рано или поздно наевшись, он кладет обе вилки на стол и покидает свое место. Но стоит ему встать, как он снова ощущает голод, возвращается к столу, садится, мгновенно хватает обе вилки - и все это прежде, чем его его медлительный, долгосидящий и изнывающий от голода правый сосед дотянется до их общей вилки. Поскольку такого рода цикл может повторяться неограниченно, может оказаться, что сидящий философ никогда не приступит к еде.

Похоже, что при строгом подходе эта проблема неразрешима, поскольку, если каждый философ столь жаден и проворен, как было описано, то неизбежно кто-нибудь (или он, или его сосед) очень долгое время будет оставаться голодным. В такой ситуации не видно эффективного решения к достижению общего удовлетворения, кроме как приобрести достаточное количество вилок и побольше спагетти. Однако, если все-таки гарантировать, чтобы каждый сидящий философ мог в конце концов поесть, нужно изменить поведение слуги: проводив философа до его места, он ждет, пока философ не возьмет обе вилки и только после этого позволяет сесть его соседям.

Однако в отношении бесконечного перехвата остается более философская проблема. Предположим, что слуга испытывает иррациональную неприязнь к одному из философов и постоянно затягивает исполнение своей обязанности проводить его к столу, даже если философ к этому готов. Это ситуация, которую мы не можем описать формально, поскольку она неотличима от той, когда философу требуется бесконечно длительное время, чтобы проголодаться.

Слуга может быть естественно реализован как ГалаСемафор - его максимальное число свободных ресурсов на 1 меньше числа философов - столько философов могут находиться за столом одновременно. Входя в столовую, философ занимает этот ресурс, а выходя - отдает. Если все ресурсы заняты, то философ ждет.

Servant: TGalaSemaphore;
Servant := TGalaSemaphore.Create(PHIL_COUNT - 1);

Поведение философов становится таким:

procedure TPhilosopher09.Execute;
begin
  while True do begin
    Meditate;
    WantToEat;
    // ждет слугу
    Wait((ParentForm as TForm09).Servant);
    Sit;
    GetRightFork;
    GetLeftFork;
    Eat;
    PutLeftFork;
    PutRightFork;
    // отдает ресурс
    (ParentForm as TForm09).Servant.Release;
  end;
end;

Спектакль 10

Название
Example10: Пять обедающих философов (альтернативные сигналы)
Сценарий
В этом примере не делается попыток решения основной задачи, а решается вспомогательная - как уменьшить время, которое необходимо для того, чтобы философ мог начать есть. Решение состоит в том, что философ может брать вилки в любом порядке.
procedure TPhilosopher10.Execute;
begin
  while True do begin
    Meditate;
    WantToEat;
    Sit;
    NewState(stToGetFork); // берет любую вилку
    case AlternativeWait([RightFork, LeftFork]) of
      0: GetLeftFork;  // берет вторую вилку
      1: GetRightFork;
    end;
    Eat;
    PutLeftFork;
    PutRightFork;
  end;
end;

Это основные примеры - в демо-программе есть еще несколько примеров, демонстрирующих менее значимые возможности библиотеки.

Список литературы

  1. С. Гурин. "Параллельное объектно-ориентированное программирование на С++". Монитор 6, 1995
  2. Г. Джоунз. "Программирование на языке Оккам". М. Мир 1989
  3. В. Ш. Кауфман. "Языки программирования. Концепции и принципы". М. Радио и связь. 1993
  4. Р. Бар. "Язык Ада в проектировании систем". М. Мир 1988
  5. Ч. Хоар. "Взаимодействующие последовательные процессы". М. Мир 1989
  6. С. Янг. "Алгоритмические языки реального времени. Конструирование и разработка". М. Мир 1985
  7. Н. Джехани. "Язык Ада". М. Мир. 1988

Download

Библиотека Gala свободна для использования (Freeware) и распространяется с исходными текстами.

Downloadgala.zip - Архив библиотеки (232K)
Версия 1.11 от 24 янв 2004. Библиотека тестировалась на Delphi 4, 5, 6, 7 в среде Windows 95, 98, NT, 2000, XP. Архив включает в себя:

  1. библиотеку Gala в виде исходных текстов,
  2. справочный hlp-файл (для версии 1.0),
  3. исходные тексты примеров использования библиотеки.

Для компиляции примеров запустите Delphi, откройте проект
    Examples\GalaExamples.dpr
и выполните построение проекта. Не запускайте построенный exe-файл под отладчиком Delphi - отладчик часто зависает при отладке многопоточных приложений, тем более, что воспользоваться отладчиком для таких приложений практически невозможно.