Pull to refresh

Многопоточное программирование в Go

Reading time10 min
Views8.5K
Возникла задача: у нас есть компилятор собственного языка программирования, которым мы компилируем некоторый диалект бейсика в исходник на C.

К сожалению, по историческим причинам, у нас не было четкого регрессионного тестирования для этого компилятора. Но сейчас, на основе исходников бизнес-приложения, написанного на этом бейсике, решили сделать полноценное тестирование.

План таков: принять какую-то текущую версию компилятора, на которую нет открытых жалоб от клиентов, за эталон. Скомпилировать этой версией приличное количество исходников, сохранить результат, и затем каждый раз при внесении в компилятор изменений прогонять все эти исходники и смотреть, генерируется ли точно такой же вывод. Это не защитит от появления ошибок в целом, но по крайне мере будет уверенность, что существующий бизнес код все еще компилируется правильно.

Несложная задача. Только есть одно «но». Количество исходников, которые планируется использовать как эталонные — около 15 тысяч файлов, суммарным объемом чуть меньше гига (для удобства они завернуты в один TAR). Подобный «прогон» может быть весьма долгим. И есть естественное желание сделать тест максимально быстрым, используя многопроцессорную машину, ибо задача прекрасно распараллеливается.

Как вариант — можно сделать Makefile и запускать его с ключом "-j" в GNU Make. Но если написать специализированную многопоточную программу, то можно достичь лучшей производительности.


Итак, очевидно: вместо последовательного выполнения нужно запускать компиляцию каждого файла в параллельных потоках. Но так как файлов много (~15 тысяч), неэффективно просто одновременно запустить столько много потоков. Разумнее всего будет иметь пул потоков, где их количество будет определяться, например, количеством процессоров (например, умноженное на 2). Пул будет назначать очередную задачу на свободный поток, и если все потоки заняты, он будет блокироваться до тех пор, пока не появиться свободный.

Таким образом, мы будем поддерживать занятыми N потоков, обеспечивая оптимальную загрузку процессоров, не тратя время на лишние переключения контекста и постоянное создание и уничтожение потоков.

Сначала я решил написать все на С++ и pthreads. После нескольких часов танцов вокруг функторов, мьютексов, семафоров и условных переменных, у меня так ничего реально работающего не вышло. И тут я вспомнил про Go. Не поверите — через час работы у меня была готова первая версия, включая мелочевку типа работы с TAR, командной строкой и запуском внешнего процесса.

Итак: данная программа берет TAR с исходниками, распаковывает его, и каждый файл прогоняет через компилятор.

Сразу скажу, цель того, что я все это пишу тут, это продемонстрировать (и не более того), как просто и удобно на Go можно писать многопоточные императивные программы.

Главная концепция, которая используется в этой программе — это каналы. По каналам можно синхронно передавать данные и функции между потоками (Go-рутинами).

Далее, можно просто смотреть по исходнику. Самое интересное место там, где видно, как функция «compile()» может вызываться из нескольких потоков без каких-либо изменений.

package main 
 
import ( 
        "archive/tar" 
        "container/vector" 
        "exec" 
        "flag" 
        "fmt" 
        "io" 
        "os" 
        "strings" 
) 

// Два флага: количество потоков и имя компилятора.
var jobs *int = flag.Int("jobs", 0, "number of concurrent jobs") 
var compiler *string = flag.String("cc", "bcom", "compiler name") 
 
func main() { 
        flag.Parse() 
        os.Args = flag.Args() 
        args := os.Args 
 
        ar := args[0] 
        r, err := os.Open(ar, os.O_RDONLY, 0666); 
        if err != nil { 
                fmt.Printf("unable to open TAR %s\n", ar) 
                os.Exit(1)       
        }
        // defer - это аналог "finally {}", гарантированное выполнение
        // кода при выходе из блока.
        defer r.Close() 
 
        // Цикл распаковки TAR.
        fmt.Printf("- extracting %s\n", ar)
        // Создаем контекст для распаковки.          
        tr := tar.NewReader( r )
        tests := new(vector.StringVector)
        // Последовательный проход по архиву, сохранение файлов и составление
        // списка для компиляции.
        for {
                // Получаем дескриптор следующего файла в архиве.
                hdr, _ := tr.Next() 
                if hdr == nil { 
                        break 
                } 
                name := &hdr.Name
                // Если это не заголовочный файл, сохраним имя.                  
                if !strings.HasPrefix(*name, "HDR_") { 
                        tests.Push(*name) 
                } 
                // Создаем новый файл.
                w, err := os.Open("data/" + *name, os.O_CREAT | os.O_RDWR, 0666) 
                if err != nil { 
                        fmt.Printf("unable to create %s\n", *name) 
                        os.Exit(1) 
                }
                // Копируем содержимое в текущий файл.
                io.Copy(w, tr) 
                w.Close() 
        } 
 
        fmt.Printf("- compiling...\n") 
        *compiler , _ = exec.LookPath(*compiler) 
        fmt.Printf("- compiler %s\n", *compiler) 

        if *jobs == 0 {
                // Вызываем "compile()" последовательно, в основном потоке.
                fmt.Printf("- running sequentially\n")
                for i := 0; i < tests.Len(); i++ {
                        compile(tests.At(i))
                }
        } else {
                // Запускаем "compile()" в параллельных потоках.
                fmt.Printf("- running %d concurrent job(s)\n", *jobs)

                // Канал задач: в этот канал мы будем класть имена файлов,
                // которые надо скомпилировать. Потоки-runner'ы будут ждать
                // сообщений из этого канала. Канал имеет ограничение по
                // длине. Это аналог семафора, чтобы блокировать главный 
                // поток, если все runner'ы заняты.
                tasks := make(chan string, *jobs)
                
                // Канал подтверждения полного завершение потока-runner'а.
                // Главный поток будет ждать, пока все runner'ы ответят
                // по этому каналу. Тип сообщений тут не важен.
                done := make(chan bool)

                // Запускаем runner'ы.
                for i := 0; i < *jobs; i++ {
                        go runner(tasks, done)
                }

                // Передаем в канал имена файлов для обработки. При 
                // достижении максимального размера канала, главный поток
                // будет заблокирован.
                for i := 0; i < tests.Len(); i++ {
                        tasks <- tests.At(i)
                }

                // Посылаем всем потокам команду завершиться и ждем
                // подтверждения о нормальном выходе от каждого потока.
                for i := 0; i < *jobs; i++ {
                        tasks <- ""
                        <- done
                }
        }
}

// Поток-runner.
func runner(tasks chan string, done chan bool) {
        // Бесконечный цикл.
        for {
                // Ждем сообщения из канала. Обычно, поток заблокирован
                // на этом месте.
                name := <- tasks
                // Если имя пустое, нас просят завершиться.
                if len(name) == 0 {
                        break
                }
                // Компилируем файл.
                compile(name)
        }
        // Посылаем сообщение, что поток завершился.
        done <- true
}

func compile(name string) {
        // Вызываем компилятор.
        c, err := exec.Run(*compiler, []string{*compiler, name}, 
                           os.Environ(), "./data", exec.DevNull, 
                           exec.PassThrough, exec.PassThrough)
        if err != nil {
                fmt.Printf("unable to compile %s (%s)\n", name, err.String())
                os.Exit(1)
        }
        c.Wait(0)
}

Makefile:

target = tar_extractor

all:
        6g $(target).go
        6l -o $(target) $(target).6

Я погонял это добро под Линуксом 64-бит на восьми процессорном блейде. Во время тестирования я был на машине один, так что результаты разных прогонов можно сравнивать. Файл «huge.tar» содержит ~15 тысяч исходников и имеет размер один гигабайт.

Так выглядит загрузка процессоров, когда машина ничего не делает (все процессоры почти на 100% в idle):

  Cpu0  :  0.0%us,  0.0%sy,  0.0%ni,100.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu1  :  0.0%us,  0.0%sy,  0.0%ni, 99.7%id,  0.3%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu2  :  0.0%us,  0.0%sy,  0.0%ni,100.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu3  :  0.0%us,  0.0%sy,  0.0%ni,100.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu4  :  0.0%us,  0.0%sy,  0.0%ni,100.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu5  :  0.0%us,  0.3%sy,  0.0%ni, 99.3%id,  0.3%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu6  :  0.0%us,  0.0%sy,  0.0%ni,100.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu7  :  0.0%us,  0.0%sy,  0.0%ni,100.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st

Запускаем в последовательном режиме (-jobs 0):

  make && time -p ./tar_extractor -jobs 0 huge.tar

Время работы:

  real 213.81
  user 187.32
  sys 61.33

Практически все процессоры на 70-80% ничего не делают (все снимки я делал во время стадии компиляции):

  Cpu0  : 11.9%us,  4.3%sy,  0.0%ni, 82.5%id,  1.3%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu1  :  9.6%us,  2.7%sy,  0.0%ni, 87.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu2  :  4.3%us,  1.3%sy,  0.0%ni, 92.7%id,  1.7%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu3  : 16.0%us,  6.0%sy,  0.0%ni, 78.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu4  : 12.6%us,  4.3%sy,  0.0%ni, 82.7%id,  0.3%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu5  : 11.6%us,  3.3%sy,  0.0%ni, 85.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu6  :  4.7%us,  1.3%sy,  0.0%ni, 94.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu7  : 16.6%us,  6.3%sy,  0.0%ni, 77.1%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st

Суммарная загрузка процессоров — 2.7%:

    PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                       
  15054 tester    18   0 41420 4980 1068 S  2.7  0.1   0:02.96 tar_extractor 

Теперь запускаем с пулом потоков, но только с одним каналом (-jobs 1).

Время:

  real 217.87
  user 191.42
  sys 62.53

Процессоры:

  Cpu0  :  5.7%us,  1.7%sy,  0.0%ni, 92.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu1  : 13.3%us,  5.3%sy,  0.0%ni, 81.3%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu2  :  7.0%us,  2.7%sy,  0.0%ni, 89.3%id,  0.7%wa,  0.0%hi,  0.3%si,  0.0%st
  Cpu3  : 15.3%us,  5.7%sy,  0.0%ni, 77.7%id,  1.3%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu4  :  6.0%us,  2.0%sy,  0.0%ni, 92.0%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu5  : 14.3%us,  7.3%sy,  0.0%ni, 78.4%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu6  :  7.0%us,  2.3%sy,  0.0%ni, 90.7%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu7  : 15.3%us,  6.6%sy,  0.0%ni, 78.1%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st

Понятно, что картина такая же, так как реально мы также гоняем один поток.

А теперь включаем пул потоков (-jobs 32):

  make && time -p ./tar_extractor -jobs 32 huge.tar

Время работы упало почти в семь раз:

  real 38.38
  user 195.55
  sys 69.92

Общая загрузка процессоров (во время стадии компиляции) возросла до 23%:

    PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND                                                       
  17488 tester    16   0 45900 9732 1076 S 23.6  0.1   0:06.40 tar_extractor

Видно, что все процессоры реально заняты:

  Cpu0  : 56.3%us, 26.3%sy,  0.0%ni, 17.3%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu1  : 55.5%us, 27.9%sy,  0.0%ni, 15.6%id,  1.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu2  : 56.1%us, 25.9%sy,  0.0%ni, 15.0%id,  0.7%wa,  0.3%hi,  2.0%si,  0.0%st
  Cpu3  : 58.1%us, 26.2%sy,  0.0%ni, 15.6%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu4  : 57.2%us, 25.8%sy,  0.0%ni, 17.1%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu5  : 56.8%us, 26.2%sy,  0.0%ni, 16.9%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu6  : 59.0%us, 26.3%sy,  0.0%ni, 13.0%id,  1.7%wa,  0.0%hi,  0.0%si,  0.0%st
  Cpu7  : 56.5%us, 27.2%sy,  0.0%ni, 16.3%id,  0.0%wa,  0.0%hi,  0.0%si,  0.0%st

Данное тестирование предназначено исключительно для понимания, как простейший параллельный код ускоряет все в разы. И также для демонстрации, как просто и относительно безопасно можно программировать параллельные вычисления в Go.

Идем дальше. Сравним эффектиность Go-рутин по сравнению с нативными потоками, например в С++.

Признаюсь, я не боец в бусте и новом C++, но и на С++ решение получается весьма изящное.

Интересно было сравнить производительнось потоков во обоих языках в плане скорости из создания и назначения им работы. Как я понял, это битва между pthreads и системой Go-рутин, которые не являются потоками операционной системы. Как сказано в документации:

Goroutines are multiplexed onto multiple OS threads so if one should block, such as while waiting for I/O, others continue to run. Their design hides many of the complexities of thread creation and management.

Я взял последний boost, и на той же восьми процессорной машине провел эксперимент.

Программе надо будет выполнить множество однотипной работы (фактически, вызвать функцию). Задачи будут мультиплексироваться между несколькими параллельными потоками. Сама функция будет элементарной и быстрой. Надеюсь, этим удастся сфокусировать тестирование именно на подсистеме потоков, нежели на полезной нагрузке.

Итак, программа на Go:

package main

import (
        "flag"
        "fmt"
)

var jobs *int = flag.Int("jobs", 8, "number of concurrent jobs")
var n *int = flag.Int("tasks", 1000000, "number of tasks")

func main() {
        flag.Parse()

        fmt.Printf("- running %d concurrent job(s)\n", *jobs)
        fmt.Printf("- running %d tasks\n", *n)
        tasks := make(chan int, *jobs)
        done := make(chan bool)

        for i := 0; i < *jobs; i++ {
                go runner(tasks, done)
        }

        for i := 1; i <= *n; i++ {
                tasks <- i
        }

        for i := 0; i < *jobs; i++ {
                tasks <- 0
                <- done
        }
}

func runner(tasks chan int, done chan bool) {
        for {
                if arg := <- tasks; arg == 0 {
                        break
                }
                worker()
        }
        done <- true
}

func worker() int {
        return 0
}

Makefile для прогона по серии параметров:

target = go_threading

all: build 

build:
        6g $(target).go
        6l -o $(target) $(target).6

run:
        (time -p ./$(target) -tasks=$(args) \
                1>/dev/null) 2>&1 | head -1 | awk '{ print $$2 }'

n = \
10000 \
100000 \
1000000 \
10000000 \
100000000

test:
        @for i in $(n); do \
                echo "`printf '% 10d' $$i`" `$(MAKE) args=$$i run`; \
        done

Программа на C++:

#include <iostream>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <queue>
#include <string>
#include <sstream>

class thread_pool {

  typedef boost::function0<void> worker;

  boost::thread_group threads_;
  std::queue<worker> queue_;
  boost::mutex mutex_;
  boost::condition_variable cv_;
  bool done_;

 public:

  thread_pool() : done_(false) {
    for(int i = 0; i < boost::thread::hardware_concurrency(); ++i)
      threads_.create_thread(boost::bind(&thread_pool::run, this));
  }

  void join() {
    threads_.join_all();
  }

  void run() {
    while (true) {
      worker job;
      {
        boost::mutex::scoped_lock lock(mutex_);
        while (queue_.empty() && !done_)
          cv_.wait(lock);

        if (queue_.empty() && done_) return;

        job = queue_.front();
        queue_.pop();
      }
      execute(job);
    }
  }

  void execute(const worker& job) {
    job();
  }

  void add(const worker& job) {
    boost::mutex::scoped_lock lock(mutex_);
    queue_.push(job);
    cv_.notify_one();
  }

  void finish() {
    boost::mutex::scoped_lock lock(mutex_);
    done_ = true;
    cv_.notify_all();
  }
};

void task() {
  volatile int r = 0;
}

int main(int argc, char* argv[]) {
  thread_pool pool;
  int n = argc > 1 ? std::atoi(argv[1]) : 10000;

  int threads = boost::thread::hardware_concurrency();
  std::cout << "- executing " << threads << " concurrent job(s)" << std::endl;
  std::cout << "- running " << n << " tasks" << std::endl;
  for (int i = 0; i < n; ++i) {
    pool.add(task);
  }

  pool.finish();
  pool.join();
 
  return 0;
}

Makefile:

BOOST = ~/opt/boost-1.46.1

target = boost_threading

build:
        g++ -O2 -I $(BOOST) -o $(target) \
                -lpthread \
                -lboost_thread \
                 -L $(BOOST)/stage/lib \
                $(target).cpp 

run:
        (time -p LD_LIBRARY_PATH=$(BOOST)/stage/lib ./$(target) $(args) \
                1>/dev/null) 2>&1 | head -1 | awk '{ print $$2 }'

n = \
10000 \
100000 \
1000000 \
10000000 \
100000000

test:
        @for i in $(n); do \
                echo "`printf '% 10d' $$i`" `$(MAKE) args=$$i run`; \
        done

В обоих языках число потоков будет равно количеству процессоров — 8. Количество задач, прогоняемых через эти восемь поток будет варьироваться.

Запускаем программу на C++:

make && make -s test

g++ -O2 -I ~/opt/boost-1.46.1 -o boost_threading \
                -lpthread \
                -lboost_thread \
                 -L ~/opt/boost-1.46.1/stage/lib \
                boost_threading.cpp 
(time -p LD_LIBRARY_PATH=~/opt/boost-1.46.1/stage/lib ./boost_threading  \
                1>/dev/null) 2>&1 | head -1 | awk '{ print $2 }'
     10000 0.03
    100000 0.35
   1000000 3.43
  10000000 29.57
 100000000 327.37

Теперь Go:

make && make -s test

6g go_threading.go
6l -o go_threading go_threading.6
     10000 0.00
    100000 0.03
   1000000 0.35
  10000000 3.72
 100000000 38.27

Разница очевидна.

Кстати, если установить переменную GOMAXPROCS = 8, чем сказать рантайму Go использовать все 8 процессоров (по умолчанию используется только один вне зависимости, столько физических процессоров имеется), то скорость программы на Go дико падает и становится почти равной времени программы на С++. Получается, что легковесные потоки Go, будучи мультеплексированы на нативные потоки, работают быстрее.

Может быть я сравниваю соленое с красным, и результаты просто неадекватны. Будет очень признателен за подсказку, в каких попугаях на правильно измерять.

Посты по теме Go:
Tags:
Hubs:
+19
Comments20

Articles