Ruby
26 May 2010

Многопоточность в Ruby

Original author: David Thomas, Chad Fowler, Andrew Hunt
Translation
Перевод главы «Multithreading» книги David Thomas и Andrew Hunt «Programming Ruby: The Pragmatic Programmers' Guide, Second Edition».

Часто самым простым способом выполнить одновременно две вещи является использование потоков в Ruby. Они являются внутрипроцессными, встроенными в интерпретатор Ruby. Это делает потоки Ruby полностью переносимыми, т.е. независимыми от операционной системы. Но в то же время вы точно не получите выгоду от использования родных, нативных потоков. Что это значит?

Вы можете столкнуться с голоданием (thread starvation — это когда поток с маленьким приоритетом не имеет шанса запуститься). Если вы хотите заблокировать ваши потоки, то со скрежетом остановится целый процесс. А если возникнет ситуация, что некоторые потоки будут посылать вызовы операционной системе, для выполнения которых требуется немалое время, то все потоки будут висеть, пока интерпретатор не получит контроль обратно. И наконец, если ваша машина имеет больше одного процессора, потоки Ruby не будут это использовать, т.к. они запускаются в одном процессе, а в одиночном родном потоке они будут вынуждены запускаться на одном процессоре единовременно.

Все это звучит страшновато. Тем не менее, на практике во многих случаях выгода от использования потоков во многом перевешивает любые потенциальные проблемы, которые могут возникнуть. Потоки Ruby являются эффективным и легким путем достижения параллельности в вашем коде. Вы просто должны понять основные проблемы реализации, и, соответственно, архитектуру.

Создание потоков Ruby


Создание нового потока довольно прямолинейно. Следующий код — простой пример. Он параллельно скачивает набор Веб-страниц. Для каждого запрашиваемого для закачки URL код создает отдельный поток, который управляет HTTP-транзакцией.

require 'net/http'
pages = %w( www.rubycentral.com slashdot.org www.google.com )
threads = []
for page_to_fetch in pages
    threads << Thread.new(page_to_fetch) do |url|
        h = Net::HTTP.new(url, 80)
        puts "Fetching: #{url}"
        resp = h.get('/'nil )    
        puts "Got #{url}#{resp.message}"
    end
end 
threads.each {|thr| thr.join }


Результат:
Fetching: www.rubycentral.com
Fetching: slashdot.org
Fetching: www.google.com
Got www.google.com: OK
Got www.rubycentral.com: OK
Got slashdot.org: OK 


Давайте взглянем на этот код более внимательно: здесь происходит несколько тонких моментов. Новые потоки создаются вызовом Thread.new. Это задает блок, содержащий код, который будет выполняться в новом потоке. В нашем случае блок использует библиотеку net/http для извлечения главной страницы указанных сайтов. Наша трассировка явно показывает, что эти извлечения выполняются параллельно.

Когда мы создаем поток, мы указываем необходимый URL в качестве параметра. Этот параметр передается в блок в виде переменной url. Почему мы это делаем, когда проще было бы использовать значение переменной page_to_fetch внутри блока?

Поток имеет общий доступ ко всем глобальным переменным, переменным экземпляра и локальным переменным, которые имеются на момент запуска потока. Любой человек, имеющий младшего брата, может вам сказать, что общий доступ или совместное использование не всегда является хорошей вещью. В этом случае, все три потока будут делиться переменной page_to_fetch. Когда запускается первый поток, page_to_fetch принимает значение «www.rubycentral.com». А между тем, цикл, создающий потоки, до сих пор работает. В следующий момент времени page_to_fetch устанавливается в «slashdot.org». Если первый поток еще не закончил использовать переменную page_to_fetch, то он неожиданно начнет использовать ее новое значение. Этот вид ошибки очень трудно отследить.

Однако, локальные переменные, создаваемые внутри блока потока, являются действительно локальными по отношению к этому потоку — каждый поток будет иметь свою копию адреса страницы. Вы можете указать любое число аргументов в блоке с помощью Thread.new.

Управление потоками

Другая тонкость происходит на последней строке нашей программы. Почему мы вызываем join для каждого создаваемого потока?

Когда программа Ruby завершается, все потоки убиваются, несмотря на их состояние. Однако, вы можете подождать завершения отдельного потока путем вызова метода Thread#join. Вызывающий поток заблокируется до того момента, как текущий поток не завершится. Вызывая join для каждого потока, вы можете быть уверенными, что все три запроса будут выполнены перед завершением основной программы. Если вы не хотите блокировать поток навсегда, вы можете передать в join параметр лимита времени — если этот лимит закончится перед завершением потока, вызов join вернет значение nil. Другой вариант join'а — метод Thread#value, который возвращает значение последней операции, выполненной в потоке.

Помимо join, для управления потоками используется несколько других удобных операций. Доступ к текущему потоку можно всегда получить, используя Thread.current. Вы можете получить список всех потоков, используя Thread.list, которая возвращает список всех объектов Thread: и работающих, и остановленных. Для определения статуса отдельного потока вы можете использовать Thread#status и Thread#alive?.
Дополнительно вы можете настроить приоритет потока, используя Thread#priority=. Потоки с бОльшим приоритетом будут запускаться перед потоками с меньшим приоритетом. Мы поговорим немного позже о планировании расписания потоков, а также об их запуске и остановке.

Переменные потока

Поток имеет обычный доступ ко всем переменным, находящимся в области видимости во время его запуска. Локальные переменные блока, содержащего код потока, являются локальными по отношению к самому потоку и не имеют к себе общего доступа.

Но что делать, если вам понадобятся в потоке такие переменные, над которыми можно было бы иметь доступ с других потоков — включать их в главный поток? Характерной чертой класса Thread является специальная возможность, позволяющая создавать и иметь доступ по имени к локальным переменным потока. Вы просто обращаетесь с объектом потока как с хэшем, устанавливая значения элементов с помощью [ ]= и читая их с помощью [ ]. В следующем примере каждый поток записывает текущее значение счетчика в локальную переменную потока с ключом mycount. Для осуществления этого код использует строку «mycount» в качестве индекса объекта потока.

count = 0
threads = []
10.times do |i|
    threads[i] = Thread.new do
        sleep(rand(0.1))
        Thread.current["mycount"] = count
        count += 1
    end
end
threads.each {|t| t.join; print t["mycount"]", " } 
puts "count = #{count}" 


Результат:
4, 1, 0, 8, 7, 9, 5, 6, 3, 2, count = 10 


Главный поток ждет, пока завершатся остальные потоки, а затем выводит значения счетчика, зафиксированные каждым потоком. Для интереса мы добавили случайную задержку каждому потоку перед записью значения счетчика.

Потоки и исключения


Что будет, если в потоке возникнет необработанное исключение? Это зависит от значения флага abort_on_exception и от значения флага отладки интерпретатора.
Если abort_on_exception = false и флаг отладки не включен (состояние по умолчанию), то необработанное исключение просто убьет текущий поток, а все остальные продолжат свою работу. В реальности, вы даже ничего не знаете об исключении, пока для потока, выбросившего это исключение, не будет вызван join.

В следующем примере поток 2 раздувается и не может ничего вывести. Однако, вы все еще можете видеть след от остальных потоков.

threads = []
4.times do |number|
    threads << Thread.new(number) do |i|
        raise "Boom!" if i == 2
        print "#{i}\n"
    end
end 
threads.each {|t| t.join } 


Результат:
0 
1
3
prog.rb:4: Boom! (RuntimeError)
from prog.rb:8:in `join'
from prog.rb:8
from prog.rb:8:in `each'
from prog.rb:8


Мы можем перехватить исключение во время выполнения join.
threads = []
4.times do |number|
    threads << Thread.new(number) do |i|
        raise "Boom!" if i == 2
        print "#{i}\n"
    end
end
threads.each do |t|
    begin
        t.join
        rescue RuntimeError => e
        puts "Failed: #{e.message}"
    end
end 


Результат:
0
1
3 
Failed: Boom!

Однако, если установить abort_on_exception в true или использовать -d для отключения флага отладки, то необработанное исключение убьет все работающие потоки. Как только поток 2 умрет, больше не будет произведено никакого вывода.

Thread.abort_on_exception = true
threads = []
4.times do |number|
    threads << Thread.new(number) do |i|
        raise "Boom!" if i == 2
        print "#{i}\n"
    end
end
threads.each {|t| t.join }


Результат:
0
1
prog.rb:5: Boom! (RuntimeError)
from prog.rb:4:in `initialize'
from prog.rb:4:in `new'
from prog.rb:4
from prog.rb:3:in `times'
from prog.rb:3


Данный пример также иллюстрирует глюк. Внутри цикла предпочтительней использовать print для вывода числа, чем puts. Почему? Потому что puts тайно разбивает свою работу на две составляющие: выводит свой аргумент, а затем выводит символ новой строки. Между ними двумя может запуститься поток, и вывод будет чередоваться. Вызывая print одной строки, которая уже содержит символ новой строки, мы можем обойти данную проблему.

Управление планировщиком потоков


В хорошо спроектированном приложении вы будете просто давать потокам делать свою работу. Создание временных зависимостей в многопоточном приложении, как правило считается дурным тоном, т.к. это делает код намного труднее для восприятия, а также делает невозможной оптимизацию выполнения вашей программы планировщиком потоков.

Однако, иногда вам будет необходимо управлять потоками явно. Например, музыкальный автомат, показывающий светомузыку. Мы должны остановить его на время, когда останавливается музыка. Вы можете использовать два потока в виде схемы производитель-потребитель, где потребитель должен подождать, если производитель имеет незавершенные заказы.

Класс Thread предоставляет набор методов для управления планировщиком потоков. Вызов Thread.stop останавливает текущий поток, а вызов Thread#run запускает отдельный поток. Thread.pass запускает планировщик для передачи выполнения другому потоку, а Thread#join и Thread#value приостанавливают вызывающий поток, пока заданные потоки на завершатся.

Мы можем продемонстрировать эту особенность в следующей совершенно бессмысленной программе. Она создает два дочерних потока: t1 и t2, каждый из которых является экзмепляром класса Chaser. Метод chase инкрементирует счетчик, но не дает ему стать большим, чем на два по сравнению со счетчиком в другом потоке. Для остановки этого увеличения метод вызывает Thread.pass, которые позволяет запуститься методу chase в другом потоке. Для интереса мы сразу после старта приостанавливаем потоки, а затем в случайном порядке запускаем.

class Chaser
    attr_reader :count
    def initialize(name)
        @name = name
        @count = 0
    end
    def chase(other)
        while @count < 5
            while @count - other.count > 1
                Thread.pass
            end
            @count += 1
            print "#@name#{count}\n"
        end
    end 
end 

c1 = Chaser.new("A")
c2 = Chaser.new("B")
threads = [
    Thread.new { Thread.stop; c1.chase(c2) },
    Thread.new { Thread.stop; c2.chase(c1) }
]
start_index = rand(2)
threads[start_index].run
threads[1 - start_index].run 
threads.each {|t| t.join }


Результат:
B: 1
B: 2
A: 1
B: 3
A: 2
B: 4
A: 3
B: 5
A: 4
A: 5


Однако, использовать такие элементарные действия для достижения синхронизации в реальном коде не так просто — состояние гонок будет вас постоянно преследовать. А когда вы будете работать с общими данными, состояние гонок точно гарантирует вам долгую и разочаровывающую отладку. Вообще-то предыдущий пример содержит ошибку: существует возможность инкрементировать счетчик в одном потоке, но перед выводом его значения, запускается другой поток и выводит значение своего счетчика. В результате вывод будет в неправильном порядке.

К счастью, потоки имеют одну дополнительную возможность — идея взаимного исключения (mutual exclusion). Используя это, мы можем создавать безопасные схемы синхронизации.

+33
29.4k 97
Comments 30