Посмотрим на этот их Go в части параллельного программирования.
Собственно, язык содержит лишь два примитива: задачи (goroutines) и каналы (channels). И то, и другое весьма напоминает собой, соответственно, процессы (processes) и каналы (pipes) UNIX, но имеет одно удобное отличие.
В UNIX процессы изолированы друг от друга по памяти, отсюда и довольно тяжёлый и неуклюжий способ порождения новых процессов: либо через fork с полным копированием процесса, его памяти, его дескрипторов файлов, либо через exec с полным замещением текущего процесса новым. В Go задачи исполняются в общем адресном пространстве, поэтому такой неуклюжести не нужно. Достаточно единственного оператора над функцией.
go foo()
А если исполняемый кусок кода не достоин выделения в отдельную функцию, то его можно оформить как вызов лямбда-функции.
go (func() { /* ... */ })()
При этом контекст как вызываемой, так и тела лямбда-функции автоматически сохраняется в соответствующем замыкании.
Задача не является ни процессом, ни нитью (thread) операционной системы. Ближайшим аналогом оператора go в Windows будет постановка в очередь запуска функций на системном пуле нитей (thread pool). Внутри программы средствами языка реализуется собственный диспетчер задач, который управляет пулом системных нитей (работающих на разных ядрах физического процессора) и в общем-то обеспечивает балансировку нагрузки ядер. Задача также не является зелёной нитью (green thread), как это реализуют некоторые языки. Задача - это объект, о свойствах которого не делается предположений: является ли он единицей полноценной конкурентной многозадачности с истинным параллелизмом или псевдопараллельной сопрограммой с кооперативной многозадачностью. Это зависит от реализации языка Go и в общем случае может быть как тем, так и другим - в зависимости от решения диспетчера задач. В любом случае предполагается, что одновременно существующих задач как правило гораздо больше, чем ядер у процессора и нитей в системном пуле.
Канал - это объект со свойствами типизированного потока (stream). Поток всегда однонаправленный, но канал объединяет в себе оба конца потока: входной - для писателя и выходной - для читателя. Писателем и читателем может быть любое место в коде, в области видимости которого находится канал. Канал общего назначения при желании можно преобразовать к подтипам: канал только для чтения и канал только для записи. И тогда эти подтипы превращают канал в обыкновенные потоки для операций чтения и записи.
Как и потоки в операционной системе, канал обладает тем свойством, что пока он пуст, читатель останавливается в ожидании новой порции данных, а пока он полон, его освобождения ожидает писатель. Это свойство канала и используется для организации разнообраных способов синхронизации и передачи асинхронных сообщений.
Например, канал без буфера порождает ситуацию, когда читатель будет ждать писателя, а писатель - читателя до момента собственно передачи сообщения. А это представляет собой ни что иное как механизм рандеву двух параллельных задач, известный ещё с языка Ada и некоторых специальных операционных систем. Рандеву гарантирует, что обе встречающиеся задачи одновременно находятся во взаимно согласованных состояниях. Подобный же эффект достигается в кооперативной многозадачности, когда сопрограммы сами решают, в какой момент отдать управление соседям по процессору.
Канал с буфером позволяет писателю что-то записать, не дожидаясь читателя, а читателю - прочитать в удобное время, не тормозя писателя. Т.е. буферизированный канал выступает средством асинхронной передачи данных от одной задачи к другой, а размер буфера определяет длину допустимой очереди сообщений.
Эти свойства позволяют рассматривать канал как единственный, универсальный и полностью достаточный механизм для решения любых задач синхронизации. Некоторое исключение составляют разве что таймеры, отсутствующие в языке на уровне синтаксиса, но, разумеется, имеющиеся в библиотеке. Рассмотрим свойства канала как объекта синхронизации на примерах.
Простейший канал какого-нибудь примитивного типа, допустим, bool, с буфером глубины 1 может работать как флаг синхронизации с автоматическим сбросом, в Windows известный как auto reset event. Поднятый флаг пропускает строго одну нить, после чего автоматически сбрасывается.
Сконструируем на основе канала такой простейший объект синхронизации, как критическая секция.
type ICriticalSection interface {
Enter()
Leave()
}
type CriticalSection struct {
channel chan bool
}
func (this *CriticalSection) Enter() {
this.channel <- true
}
func (this *CriticalSection) Leave() {
<-this.channel
}
func NewCriticalSection() ICriticalSection {
this := new(CriticalSection)
this.channel = make(chan bool, 1)
var object ICriticalSection = this
return object
}
Глубина буфера 1 позволяет записывать в канал той же самой задаче, которая потом будет читать из канала. В большинстве случаев это и требуется для критической секции. Без буфера, поскольку начало и конец непустой критической секции не могут совпасть по времени, рандеву будет невозможным и вызовет мёртвую блокировку (deadlock) задачи. В приведённом примере входящая в критическую секцию задача занимает канал, и пытающиеся сделать то же самое другие задачи будут остановлены на операции записи. По выходе из критической секции задача освобождает канал, и в секцию может зайти следующая задача. Реализацию можно развернуть зеркально: входящая задача читает из канала, а выходящая задача пишет в канал. В таком случае изначально канал должен быть инициализирован - заполнен значением.
Канал по свойствам несколько отличается от флага синхронизации. Так, если флаг синхронизации уже установлен, но ещё не сработал, повторная установка флага не вызывает никаких побочных эффектов. В случае с каналом это не так: второй и последующие вызовы метода Leave вызовут остановку задач из-за пустого (или, зеркально, переполненного) буфера канала. В то же время мы не можем добавить внутрь метода Leave проверку состояния канала до операции над ним: это не будет атомарным действием и, следовательно, требует критической секции, но критическую секцию мы как раз ещё только и конструируем. Поэтому данную реализацию критической секции с описанным ограничением следует использовать только в тех случаях, когда гарантируется парный и строго последовательный вызов методов Enter и Leave.
Критическая секция работает по принципу турникета: пропуск группы строго по одному, каждый проверяется отдельно, а ещё не прошедшая часть группы ждёт на входе. Если нам нужен групповой пропуск, его реализует другой объект синхронизации - барьер. Барьер работает по принципу шлагбаума: группа собирается у входа, затем шлагбаум поднимается и пропускает всех собравшихся (и подбегавших в процессе прохода), а когда никого не осталось, шлагбаум опускается, и можно накапливать следующую группу.
Буферы каналов имеют статический размер, поэтому на неопределённое (или довольно большое) количество участников групповой операции создать канал невозможно (неэффективно). Однако можно комбинировать канал с числовым счётчиком. На этом принципе и построим барьер.
type IBarrier interface {
Wait()
Pass()
}
type Barrier struct {
counter uint
channel chan bool
}
func (this *Barrier) Pass() {
for this.counter > 0 {
this.counter -= 1
this.channel <- true
}
}
func (this *Barrier) Wait() {
this.counter += 1
<-this.channel
}
func NewBarrier() IBarrier {
this := new(Barrier)
this.channel = make(chan bool, 1)
var object IBarrier = this
return object
}
По построению здесь внутренняя критическая секция не нужна, поскольку приход нового запроса Wait во время выполнения Pass ни к чему плохому не приводит - просто Pass выполниться на одну итерацию больше, чем мог бы. После завершения работы Pass барьер вновь готов к работе и никого не пускает.
В этой конструкции барьера оповещение ждущих задач происходит последовательно. Причём цикл по уменьшающемуся счётчику синхронизируется с задачами: он не может записать в канал следующее true до тех пор, пока очередная задача не проснётся и не освободит канал от ожидающего её сообщения. Т.е. для большого количества ожидающих операция Pass в такой конструкции барьера будет работать достаточно длительное время. А поскольку обращение задач к каналам ничем не регламентируется, возможна такая ситуация, когда позже пришедшие к барьеру задачи проснутся заметно раньше ранее пришедших.
Сделав запись в канал асинхронной операцией, мы за счёт увеличения потребляемых ресурсов на создание новых задач (и общего торможения системы на их диспетчеризацию) могли бы ускорить работу Pass
go (func() { this.channel <- true })()
Но при этом мы получим неприятный побочный эффект: одна из задач, пришедших к барьеру в то время, пока ещё не все из ждущих проснулись, но работа Pass уже закончилась, может проскочить по "талончику", предназначенному одной из давно стоящих на барьере задач. Т.е. такое изменение перестаёт гарантировать, что очередной Pass обязательно пропустит всех собравшихся у барьера, и это сложно будет считать надёжной реализацией.
Наконец, получив групповой пропуск, мы можем приступить к созданию самого главного объекта синхронизации - семафора. Работает он примерно по такому же принципу, что и критическая секция и поэтому имеет тот же самый интерфейс, но зато имеет возможности для конструирования разнообразных групповых операций.
type Semaphore struct {
value int
barrier IBarrier
criticalSection ICriticalSection
}
func (this *Semaphore) Enter() {
for {
this.criticalSection.Enter()
if this.value < 0 {
this.value += 1
this.criticalSection.Leave()
break
} else {
this.criticalSection.Leave()
this.barrier.Wait()
}
}
}
func (this *Semaphore) Leave() {
this.criticalSection.Enter()
this.value -= 1
this.barrier.Pass()
this.criticalSection.Leave()
}
func NewSemaphore(startValue int) ICriticalSection {
this := new(Semaphore)
this.value = startValue
this.barrier = NewBarrier()
this.criticalSection = NewCriticalSection()
var object ICriticalSection = this
return object
}
Принцип работы семафора вообще (вне зависимости от реализации) следующий. На каждый вход он увеличивает счётчик. На каждый выход он уменьшает счётчик. Как только счётчик увеличивается до нуля, семафор закрывается на вход. Открыть семафор может только какой-то выход, когда счётчик уменьшается ниже нуля. Поэтому начальное значение семафора определяет групповую политику его поведения. Отрицательные начальные значения -N создают такие критические секции, в которых пользоваться управляемой семафором единицей ресурса могут не более, чем N нитей одновременно. Положительные начальные значения N создают такие условия, где для входящей в критическую секции нити доступно не менее N свободных единиц ресурса. Семафор с начальным значением -1 называется мьютексом и используется в частности для реализации критических секций без ограничения на последовательную парность входов и выходов, которое мы вводили ранее.
Семафоры обычно реализуются таким образом, что в ответ на событие открытия пробуждаются все спящие перед входом нити и конкурентно пытаются сделать вход. Какому-то количеству нитей это удаётся, а остальные засыпают до следующей попытки. На этом принципе построена и наша реализация семафора. На входе крутится цикл, в котором каждая задача проверяет значение счётчика и на основе этой проверки принимает решение. Это атомарная операция, поэтому она защищена критической секцией. Если решение положительное, задача входит в семафор и увеличивает счётчик, выходя из цикла ожидания. Если решение отрицательное, задача ожидает у барьера следующей попытки. На выходе из семафора происходит уменьшение счётчика и пропуск через барьер всех ожидающих. Это тоже атомарная операция, защищённая той же критической секцией. Сделано это для того, чтобы прошедшие через барьер задачи не смогли вновь встать на ожидание до того, как завершится Pass барьера. В противном случае может произойти взаимное зацикливание Pass и Wait барьера.
Поскольку семафор является альфой и омегой традиционных способов синхронизации, на его базе можно сконструировать любой известный объект синхронизации, и этому вопросу за несколько десятилетий было посвящено огромное количество как академической, так и практической литературы, обсуждение остальных разновидностей объектов синхронизации с этого момента можно считать излишним.
Поскольку нам удалось сконструировать семафор исключительно на каналах, мы можем считать канал в языке Go универсальным и вполне достаточным примитивом для любых задач синхронизации. Как говорится, теорема доказана.
Теперь посмотрим, как можно реализовывать некоторые приёмы параллельного программирования средствами языка Go.
Во-первых, обратим внимание на групповые операции WaitAll (ждать всех) и WaitAny (ждать любого). Поскольку различия между ними минимальные, их удобно описать общим интерфейсом объекта Waiter, но двумя разными конструкторами - по одному для каждого случая.
type IWaiter interface {
Register(event <-chan interface{})
Wait()
}
type Waiter struct {
event chan bool
counter uint
handle func(counter uint) bool
criticalSection ICriticalSection
}
func (this *Waiter) Register(event <-chan interface{}) {
this.criticalSection.Enter()
this.counter += 1
this.criticalSection.Leave()
go (func() {
<-event
this.criticalSection.Enter()
this.counter -= 1
decision := this.handle(this.counter)
this.criticalSection.Leave()
if decision {
this.event <- true
}
})()
}
func (this *Waiter) Wait() {
<-this.event
}
func newWaiter(handle func(counter uint) bool) IWaiter {
this := new(Waiter)
this.event = make(chan bool, 1)
this.handle = handle
this.criticalSection = NewCriticalSection()
var object IWaiter = this
return object
}
func NewWaiterOfAll() IWaiter {
return newWaiter(func(counter uint) bool { return counter == 0 })
}
func NewWaiterOfAny() IWaiter {
return newWaiter(func(counter uint) bool { return true })
}
Основная идея такова, что на каждый регистрирующийся в объекте канал запускается отдельная задача ожидания на этом канале, и все такие задачи имеют доступ к общему счётчику, работа с которым защищена критической секцией. При регистрации источника события счётчик увеличивается, при появлении события в источнике счётчик уменьшается. Тип interface{} в языке Go эквивалентен пустому object в других языка. Т.е. источником события может быть канал, принимающий в себя что угодно. В зависимости от условия по счётчику (настраиваемая функция handle) принимается решение: сигнализировать пользователю о наступлении группового события или нет. WaitAll ждёт, когда сработают все зарегистрированные источники - счётчик обнулится. WaitAny сигнализирует в ответ на срабатывание каждого источника событий в любом порядке срабатывания.
Решение простое, но из-за множества создаваемых вспомогательных задач далеко не самое быстродействующее.
Во-вторых, такой приём, как фоновое исполнение функции, известное как шаблон проектирования begin...end, реализуется проще всего. Пусть у нас есть функция
foo(arguments Arguments) Result
и нам нужно запустить её в фоне. Делается это приёмом замены результата на канал.
func beginFoo(arguments Arguments) <-chan Result {
result := make(chan Result)
go (func(arguments Arguments, result chan<- Result) {
result <- foo(arguments)
})(arguments, result)
return result
}
func endFoo(token <-chan Result) Result {
return <-token
}
Здесь как раз канал общего назначения превращается в канал только для записи внутри лямбда-функции и канал только для чтения как возвращаемый begin объект. Поскольку для записи много кода не требуется, в том случае, когда begin и end вызываются в одной и той же функции (в общем замыкании), можно записать ещё короче:
_foo := make(chan Result)
go (func() { _foo <- foo(arguments) })()
/* ... тут делаем что-то другое ... */
result := <-_foo
Последняя строчка автоматически обеспечивает рандеву фоновой и основной задач.
В целом комбинация таких примитивов, как задачи и каналы, делает язык Go одним из самых продвинутых и удобных на настоящий момент в части параллельного программирования. И вообще работу над такими экспериментальными проектами, как распределённая операционная система Plan 9 можно лишь поприветствовать.
К сожалению, о синтаксисе этого языка и средствах типизиации лестно отозваться не получится. Отсутствуют порождающие типы, что не позволяет конструировать шаблонные решения, в то время как язык является строго типизированным. Слабый контроль областей видимости вынуждает разработчика плодить массу коротких файлов-пакетов. Производительность в части выполнения задач всё же заметно уступает обычным циклическим алгоритмам, поэтому стиль программирования с интенсивным параллелизмом неизбежно ограничивается заметными накладными расходами на диспетчеризацию.