Pull to refresh

Пример ускорения расчётов в R путём многопоточности

Reading time 4 min
Views 9.8K

Введение


Как следует из Википедии:
R — язык программирования для статистической обработки данных и работы с графикой, а также свободная программная среда вычислений с открытым исходным кодом в рамках проекта GNU.

Данный язык, в настоящее время, нашёл широкое применение во многих практических и чисто научных областях. Однако, исторически сложилось, что скорость многих ресурсоёмких вычислений оставляет желать лучшего. Тема параллельных вычислений в R на habrahabr уже поднималась. В этой статье я попытаюсь показать применение подобного подхода на конкретном примере с использованием пакета для многопоточных вычислений — parallel.


Постановка задачи


Потребность в ускорении вычислений возникла при изучении связи генетических маркеров с фенотипическими признаками человека с помощью пакета для построения нейронных сетей — neuralnet. Однако предложенное решение может быть использовано и для других задач.

Материалы и методы


В работе использовался R 2.15.2, neuralnet — 1.32, parallel — 2.15.2, rbenchmark — 1.0.0. Компьютер под управлением Windows 7, процессором Intell Core i5-2550K CPU @ 3.40GHz (4 ядра) и 8Гб оперативной памяти. Встроенный набор данных data(infert, package=«datasets»).

Результаты и обсуждение


Для начала посмотрим нагрузку на процессор при обычном вычислении нейронной сети. Пример взят из описания пакета neuralnet с небольшой модификацией: threshold=0.0001 — один из критериев остановки, и rep=20 — 20 повторов вычисления, для того, чтобы расчёты были более «сложные».

nn<-function()
{
  print("Обычное выполнение")
  neuralnet(case~parity+induced+spontaneous, infert,
            err.fct="ce", linear.output=FALSE, likelihood=TRUE,rep=20, threshold=0.0001)
}
nn()


Из Диспетчера задач Windows видно — при выполнении используется только одно из ядер процессора и загрузка ЦП составляет 25%.
Теперь измерим время выполнения этого кода. Для этого я использовал пакет rbenchmark. В общем виде код для измерения будет выглядеть так:

within(benchmark(test.name=test.function(), # название теста и тестируемая функция
                 replications=c(3), # число повторов в тесте (+1 повтор на "разогрев")
                 columns=c('test', 'replications', 'elapsed'), # стандартные колонки в таблице вывода результатов
                 order=c('elapsed', 'test')), # параметры сортировки вывода
{ average = elapsed/replications }) # нестандартная колонка среднего времени выполнения одного повтора теста

В результате получается:
                test replications elapsed     average
1 Обычное_выполнение            3   47.83 15.94333333


Теперь сравним это время с выполнением 20 вычислений не через опцию rep=20, а через, например, sapply.
nn.s<-function()
{
  print("Последовательное выполнение")
  nets<-sapply(1:20, function(X)
    neuralnet(case~parity+induced+spontaneous, infert,
              err.fct="ce", linear.output=FALSE, likelihood=TRUE,rep=1, threshold=0.0001))
}

Получается:
                         test replications elapsed average
1          Обычное_выполнение            3   46.05   15.35
2 Последовательное_выполнение            3   47.52   15.84

Время выполнения примерно одинаковое. Видимо никакой оптимизации выполнения повторов внутри функции neuralnet нет. Чтобы задействовать остальные ядра используем пакет parallel. Он входит в состав R c версии 2.14.0 и позволяет выполнять код в нескольких независимых потоках. Каждый из потоков будет выполнять функцию neuralnet.
nn.p<-function()
{
  print("Параллельное выполнение")
  cl <- makeCluster(getOption("cl.cores", 4)) # создание кластера из четырёх ядер процессора
  clusterExport(cl,"infert") # передача данных внутрь кластера
  clusterEvalQ(cl,library(neuralnet)) # загрузка пакета neuralnet в кластере
  parSapply(cl, 1:20, function(X) # параллельная версия sapply
    neuralnet(case~parity+induced+spontaneous, infert,
              err.fct="ce", linear.output=FALSE, likelihood=TRUE,rep=1, threshold=0.0001)
  )
  stopCluster(cl)
}

Сравним время выполнения:
                         test replications elapsed      average
3     Параллельное_выполнение            3   17.38  5.793333333
2 Последовательное_выполнение            3   45.88 15.293333333
1          Обычное_выполнение            3   46.61 15.536666667

Таким образом, параллельный вариант выполняется более чем в 2,5 раза быстрее. При этом загрузка процессора составляет 100%.

Для удобства можно создать свою обёртку для функции neuralnet.
pneuralnet <- function(formula, data, rep=1, ..., cl)
{
  clusterExport(cl,"data")
  clusterEvalQ(cl,library(neuralnet))
  nets <- parLapply(cl, 1:rep, function(X)
    neuralnet(formula, data, rep=1, ...)
  )
  # сортировка списка сетей в зависимости от reached.threshold
  nets <- nets[order(sapply(1:rep,function(i){nets[[i]]$result.matrix["reached.threshold", ]}))]
  return(nets)
}

cl <- makeCluster(getOption("cl.cores", 4))
nets <- pneuralnet(case~parity+induced+spontaneous, infert,
                   err.fct="ce", linear.output=FALSE, likelihood=TRUE,rep=4,
                   threshold=0.0001, cl=cl)
stopCluster(cl)


Весь код
library(parallel)
library(neuralnet)
library(rbenchmark)

data(infert, package="datasets")

nn<-function()
{
  print("Обычное выполнение")  
  neuralnet(case~parity+induced+spontaneous, infert, 
            err.fct="ce", linear.output=FALSE, likelihood=TRUE,rep=20, threshold=0.0001)
}

nn.s<-function()
{
  print("Последовательное выполнение")
  nets<-sapply(1:20, function(X)  
    neuralnet(case~parity+induced+spontaneous, infert, 
              err.fct="ce", linear.output=FALSE, likelihood=TRUE,rep=1, threshold=0.0001))
}

nn.p<-function()
{
  print("Параллельное выполнение")
  cl <- makeCluster(getOption("cl.cores", 4))
  clusterExport(cl,"infert")
  clusterEvalQ(cl,library(neuralnet))
  parSapply(cl, 1:20, function(X)        
    neuralnet(case~parity+induced+spontaneous, infert, 
              err.fct="ce", linear.output=FALSE, likelihood=TRUE,rep=1, threshold=0.0001)
            
  )
  stopCluster(cl)  
}

pneuralnet <- function(formula, data, rep=1, ..., cl)
{  
  clusterExport(cl,"data")
  clusterEvalQ(cl,library(neuralnet))
  nets <- parLapply(cl, 1:rep, function(X)        
    neuralnet(formula, data, rep=1, ...)          
  )
  # сортировка списка сетей в зависимости от reached.threshold
  nets <- nets[order(sapply(1:rep,function(i){nets[[i]]$result.matrix["reached.threshold", ]}))]
  return(nets)
}

within(benchmark(Обычное_выполнение=nn(),
                 Последовательное_выполнение=nn.s(),
                 Параллельное_выполнение=nn.p(),
                 replications=c(3),
                 columns=c('Тест', 'replications', 'elapsed'),
                 order=c('elapsed', 'test')),
{ average = elapsed/replications })

cl <- makeCluster(getOption("cl.cores", 4))
nets <- pneuralnet(case~parity+induced+spontaneous, infert, 
                   err.fct="ce", linear.output=FALSE, likelihood=TRUE,rep=4, 
                   threshold=0.0001, cl=cl)
stopCluster(cl)

Tags:
Hubs:
+12
Comments 5
Comments Comments 5

Articles