Java
26 January 2012

Multithreading in practice

Нашел как-то на stack overflow вопрос (link).
Need to create java CLI programm that searchs for specific files matched some pattern. Need to use multi-threading approach without using util.concurrent package and to provide good performance on parallel controllers.
Перевод
Нужно написать консольную программу, которая ищет файлы по какому-то паттерну. Программа должна быть многопоточная, но нельзя использовать пакет util.concurrent. Требуется добиться максимальной производительности.


В общем идея в принципе была не сложная. Т.к. по условию нельзя использовать util.concurrent, то надо реализовать свой пул потоков, плюс написать какие-то таски, которые в этом пуле потоков будут крутиться.
Так же я не был уверен в том, что при многопоточном использовании IO будет увеличение производительности.

Сразу скажу, что целью было не выполнение задания, а исследование проблемы, поэтому не весь код будет красивый.

В принципе алгоритм похож на рекурсивный обход дерева, вот к примеру простая его реализация link
	import java.io.File;
	public class MainEntry {
	    public static void main(String[] args) {
	        walkin(new File("/home/user")); //Replace this with a suitable directory
	    }
	 
	    /**
	     * Recursive function to descend into the directory tree and find all the files 
	     * that end with ".mp3"
	     * @param dir A file object defining the top directory
	     **/
	    public static void walkin(File dir) {
	        String pattern = ".mp3";
	 
	        File listFile[] = dir.listFiles();
	        if(listFile != null) {
	            for(int i=0; i<listFile.length; i++) {
	                if(listFile[i].isDirectory()) {
	                    walkin(listFile[i]);
	                } else {
	                    if(listFile[i].getName().endsWith(pattern)) {
	                        System.out.println(listFile[i].getPath());
	                    }
	                }
	            }
	        }
	    }
	}
	


Для начало проверим как быстро работает однопоточная реализация, код однопоточной реализации я приводить не буду, он есть в архиве.
Результаты следующие:
154531 мс

Теперь попробуем сделать то же самое, но будем использовать многопоточную реализацию алгоритма.

Для этого вместо рекурсивного вызова, мы будем создавать некий Таск, который будем отдавать на выполнение пулу потоков. Так же надо, что бы по результату выполнения таска, была возможность как-то сообщить о результатах. Плюс надо добавить новый таск в пул потоков (вместо рекурсии).
Тут надо сразу остановиться, почему я выбрал именно небольшие таски и пул тредов, вместо создания новых тредов. Представте, что у нас куча директорий, и на каждую директорию мы будем создавать новый тред? мы можем просто упать по OOM (OutOfMemory) или просто все начнет сильно тормозить из-за переключений между тредами ОС (особенно если одноядерная система). Так же мы будем тратить время на старт нового треда каждый раз при его создании.

Для начала надо создать класс, который будет выполнять какие-то действие в будущем тред пуле.
Основные требование к классу:
— класс должен наследоваться от Thread (в принципе можно только интерфейс Runnable, но так проще)
— класс должен принимать на выполнения объекты Runnable
— класс не должен падать при возникновении каких-то исключений в результате выполнения таска.
— если нету никаких задач, Thread не должен работать в пустую, а должен уходить в ожидание
— процедура добавления новых объектов Runnable должна быть очень быстрая, иначе при большом количестве мелких задач, кто-нибудь будет либо блокировать работу треда, либо ждать возможности добавить новую задачу в тред
— ну и он должен быть ThreadSafe

Вот код:
import java.util.ArrayList;
	import java.util.List;

	class BacklogWorker extends Thread {
	/* здесь хранятся все таски на выполнение*/
	    private final LinkedList<Runnable> backlog = new LinkedList<Runnable>();
	    private static final int INITIAL_CAPACITY = 100;
	/* здесь хранятся таски, которые будут выполнятся */
	    private final List<Runnable> workQueue = new ArrayList<Runnable>(INITIAL_CAPACITY);

	    BacklogWorker(String name) {
	        super(name);
	    }

	/* добавить  новый таск*/
	    synchronized void enque(Runnable work) {
	        if (work != null) {
	            backlog.add(work);
	        }
	        notify();
	    }

	    public void run() {
	        while (!isInterrupted()) {
	/* добавляем все в очередь на выполнения, и отпускаем лок*/
	             synchronized (this) {
	                if (backlog.isEmpty()) {
	                    try {
	                        wait();
	                    } catch (InterruptedException e) {
	                        interrupt();
	                    }
	                }
	                int size = backlog.size();
	                for (int i = 0; i < INITIAL_CAPACITY && i < size; i++) {
	                    workQueue.add(backlog.poll());
	                }
	                backlog.clear();
	            }
	            for (Runnable task : workQueue) {
	                try {
	                    task.run();
	                } catch (RuntimeException e) {
	                    e.printStackTrace();
	                }
	            }
	            workQueue.clear();
	        }
	    }
	}


Теперь надо создать ThreadPool, который будет распределять работу между его тредами.
Требование к классу такие:
— ThreadSafe
— масштабируемый
— равномерное распределение между рабочими тредами тасков
— не блокируемый

код получился такой:
	import java.util.concurrent.Executor;

	public class BacklogThreadPool implements Executor/*i don't use anything from concurrent, just only one interface*/ {

	    private final BacklogWorker workers[];
	    private final int mask;
	    private static volatile int sequence;

	    public BacklogThreadPool(int threadCount, String id) {
	        int tc;
	        for (tc = 1; tc < threadCount; tc <<= 1) ;
	        mask = tc - 1;

	        if (id == null) {
	            id = Integer.toString(getSequence());
	        }
	        workers = new BacklogWorker[tc];

	        for (int i = 0; i < tc; i++) {
	            workers[i] = new BacklogWorker((new StringBuilder()).append("thead-pool-worker-").append(id).append(":").append(i).toString());
	            workers[i].start();
	        }
	    }

	    private synchronized int getSequence() {
	        return sequence++;
	    }

	    public void shutdown() {
	        for (int i = 0; i < workers.length; i++)
	            workers[i].interrupt();

	    }

	    @Override
	    public void execute(Runnable command) {
	        workers[command.hashCode() & mask].enque(command);
	    }
	}
	

В принципе тут все понятно, и наверное комментировать не чего.

Теперь надо написать таск, который будет выполняться в ThreadPool'е.
К сожалению первая версия у меня потерялась, поэтому привожу быстро написанную заново версию.
import java.io.File;
	import java.util.ArrayList;
	import java.util.List;
	import java.util.regex.Matcher;
	import java.util.regex.Pattern;

	public class WalkinTask1 implements BacklogTask {

	    private List<File> dirs;
	    private ParseHandler parseHandler;

	    public WalkinTask1(List<File> dirs, ParseHandler parseHandler) {
	        this.parseHandler = parseHandler;
	        //this.parseHandler.taskStart();
	        this.parseHandler.taskStartUnblock();
	        this.dirs = dirs;
	    }

	    @Override
	    public void run() {
	        try {
	            List<String> filePaths = new ArrayList<String>();
	            List<File> dirPaths = new ArrayList<File>();
	            for (File dir : dirs) {
	                if (!dirPaths.isEmpty()) {
	                    dirPaths = new ArrayList<File>();
	                }
	                if (!filePaths.isEmpty()) {
	                    filePaths = new ArrayList<String>();
	                }
	                File listFile[] = dir.listFiles();

	                if (listFile != null) {
	                    for (File file : listFile) {
	                        if (file.isDirectory()) {
	                            dirPaths.add(file);
	                        } else {
	                            filePaths.add(file.getPath());
	                        }
	                    }
	                }
	                if (!dirPaths.isEmpty()) {
	                    parseHandler.schedule(TaskFactory.createWalking1Task(parseHandler, dirPaths));
	                }
	                if (!filePaths.isEmpty()) {
	                    Pattern pattern = parseHandler.getPattern();
	                    List<String> result = new ArrayList<String>();
	                    for (String path : filePaths) {
	                        Matcher matcher = pattern.matcher(path);
	                        while (matcher.find()) {
	                            String str = matcher.group();
	                            if (!"".equals(str)) {
	                                result.add(str);
	                            }
	                        }
	                    }
	                    parseHandler.taskComplete(result);
	                }
	            }
	        } finally {
	            //parseHandler.taskFinish();
	            parseHandler.taskFinishUnblock();
	        }
	    }

	    @Override
	    public int getTaskType() {
	        return 1;  //TODO
	    }
	}


Теперь поговорим немного о профайлере. Для чего он нужен я не буду описывать, вы можете сами поискать, если ещё ничего не слышали о таком зверьке. При профайлинге многопоточных приложений наибольшое внимание надо уделять Monitor Usage (в каждом профайлере есть такая возможность). Обычно этот тип профайлинга надо запускать вручную. Интерес представляет сколько времени те или иные треды висят в ожидании локов. К примеру вы можете насоздовать кучу тредов, но они все будут упираться в какой-нибудь лок, и производительность системы будет сильно падать. Так же стоит обратить внимание на использование CPU, к примеру если CPU использует на 10-20%, то это тоже может означать, что треды больше ожидают локов, чем выполняют какие-нибудь вычисления (хотя это не всегда так).

Теперь посмотрим результат в профайлере:
время выполнения программы:
total task: 78687
55188ms

В результате скорость работы увеличилась где-то в 3 раза.


Тут мы видим, что все треды в тредпуле были заняты работой почти на всем протяжении времени. Блокировки тредов почти не наблюдается.

На второй картинке мы видим, что основное время CPU тратится на IO.

Тут мы видим, что использование CPU > 80%.

Тут мы видим только одну блокировку треда, которая заняла меньше 1мс, при 78 тыс тасков весьма хороший результат.

Как мы видим, в принципе мы нагружаем CPU, и у нас нету простоя, так как все треды почти полностью загружены работой. Блокировок по локам нету.

Интересно будет посмотреть на картинку номер 2. Как мы видим, самая «дорогая» операция — java.io.File.isDirectory(), она занимает примерно 46% общего времени. Погуглив по поводу этой проблемы, я так ничего и не нашел, кроме возможности использовать Java7, ну или dependency OS фичи. Поэтому возможность оптимизации этой части как я вижу больше нету. Дальше идет уже парсер — java.util.regex.Matcher.find(), а вот тут уже можно ускорить. Можно создать еще один таск, который будет заниматься только парсингом. Т.е. мы разделим две самые тяжелые операции:
1) работа с файловой системой
2) парсинг имен
Третья операция опять IO, и это тоже сложно ускорить.

Итак модифицируем немного первый таск, и добавим новый:
	import java.io.File;
	import java.util.ArrayList;
	import java.util.List;

	public class WalkinTask implements BacklogTask {

	    private List<File> dirs;
	    private ParseHandler parseHandler;

	    public WalkinTask(List<File> dirs, ParseHandler parseHandler) {
	        this.parseHandler = parseHandler;
	        //this.parseHandler.taskStart();
	        this.parseHandler.taskStartUnblock();
	        this.dirs = dirs;
	    }

	    @Override
	    public void run() {
	        try {
	            List<String> filePaths = new ArrayList<String>();
	            List<File> dirPaths = new ArrayList<File>();
	            for (File dir : dirs) {
	                if (!dirPaths.isEmpty()) {
	                    dirPaths = new ArrayList<File>();
	                }
	                if (!filePaths.isEmpty()) {
	                    filePaths = new ArrayList<String>();
	                }
	                File listFile[] = dir.listFiles();

	                if (listFile != null) {
	                    for (File file : listFile) {
	                        if (file.isDirectory()) {
	                            dirPaths.add(file);
	                        } else {
	                            filePaths.add(file.getPath());
	                        }
	                    }
	                }
	                if (!dirPaths.isEmpty()) {
	                    parseHandler.schedule(TaskFactory.createWalkingTask(parseHandler, dirPaths));
	                }
	                if (!filePaths.isEmpty()) {
	                    parseHandler.schedule(TaskFactory.createParseTask(parseHandler, filePaths));
	                }
	            }
	        } finally {
	            //parseHandler.taskFinish();
	            parseHandler.taskFinishUnblock();
	        }

	    }

	    @Override
	    public int getTaskType() {
	        return 1;  //TODO
	    }
	}

import java.util.ArrayList;
	import java.util.List;
	import java.util.regex.Matcher;
	import java.util.regex.Pattern;

	public class ParseTask implements BacklogTask {

	    private ParseHandler handler;
	    private List<String> paths;

	    public ParseTask(ParseHandler hander, List<String> paths) {
	        this.handler = hander;
	        this.paths = paths;
	        handler.taskStartUnblock();
	    }

	    @Override
	    public void run() {
	        try {
	            Pattern pattern = handler.getPattern();
	            List<String> result = new ArrayList<String>();
	            for (String path : paths) {
	                Matcher matcher = pattern.matcher(path);
	                while (matcher.find()) {
	                    String str = matcher.group();
	                    if (!"".equals(str)) {
	                        result.add(str);
	                    }
	                }
	            }
	            handler.taskComplete(result);
	        } finally {
	            handler.taskFinishUnblock();
	        }
	    }

	    @Override
	    public int getTaskType() {
	        return 0;  
	    }
	}


И запустим еще раз:
total task: 221560
52328


Как мы видим, результат не сильно отличился от первого запуска, но все-таки немного быстрее, особенно выигрыш будет расти, если будут директории с большим количеством файлов. Но при таком подходе мы увеличили количество тасков почти в 3 раза, что к примеру может сказаться на Garbage Collector'е. Так что тут надо уже выбирать, что мы хотим — максимальную производительность или экономию памяти и ресурсов.

Теперь надо подумать о том, как выходить из программы, и как возвращать результат. Мы наплодили кучу тасков, и мы не знаем когда они все выполнятся. Я не придумал ни чего лучшего, как просто считать общее количество тасков, и ждать пока счетчик станет равен нулю. Для этого нам понадобится переменная, в которой мы будем накапливать значени. Но тут тоже все не так просто. К примеру если мы возьмем обычную переменную, и будем инкрементировать её когда таск создался, и декрементировать её когда таск закончился. Но при таком подходе результат будет плачевный, т.к. в java операция i++ не является атомарной, даже если мы поставит заветный модификатор volatile. Идеально было бы взять AtomicIteger, но нам по условию нельзя использовать пакет util.concurrent. Поэтому нам придется сделать свой Atomic. Если покопаться как работает Atomic в java, то мы наткнемся на native метод. Сама по себе атомарность изменения переменной реализовано в виде команды процессора, поэтому java вызывает нативную команду ОС, которая уже вызывает команду процессора.
В принципе мы может использовать обычный synchronized. Но при большом количестве тасков начнется Lock race, и производительность уменьшится (хотя конечно и не критично). Вот пример кода реализующий CAS алгоритм(код был найдем на сайте ibm):
	public class SimulatedCAS {
	    private volatile int value;

	    public synchronized int getValue() {
	        return value;
	    }

	    public synchronized int compareAndSwap(int expectedValue, int newValue) {
	        int oldValue = value;
	        if (value == expectedValue)
	            value = newValue;
	        return oldValue;
	    }
	}

	public class CasCounter {
	    private SimulatedCAS value = new SimulatedCAS();

	    public int getValue() {
	        return value.getValue();
	    }

	    public int increment() {
	        int oldValue = value.getValue();
	        while (value.compareAndSwap(oldValue, oldValue + 1) != oldValue)
	            oldValue = value.getValue();
	        return oldValue + 1;
	    }
	    
	    public int decrement() {
	        int oldValue = value.getValue();
	        while (value.compareAndSwap(oldValue, oldValue - 1) != oldValue)
	            oldValue = value.getValue();
	        return oldValue - 1;
	    }
	}
	


Вот вобощем-то и все.
Архив с исходниками:link

P.S. Я тестировал это на линуксе, и на виндоусе на 4-х ядерном процессоре. Оптимальное количество тредов в пуле было вычислено эксперементально — 16, т.е. количество ядер * 4, когда-то находил в интернете уже такую формулу, но не помню где. В Windows есть особенность, когда запускаешь первый раз, то работает все очень долго, и часто все виснет на IO, но уже при втором запуске все работает значительно быстрее, думаю это особенность OS кэшировать файловую систему. Я тестировал все со вторым запуском и дальше, потом смотрел в профайлере загруженность CPU, если был где-то провал использования CPU, то считал этот тест неточным и не использовал этот тест в статистике. Тестировал все на папке с проектами (много больших проектов вместе с CVS файлами).

P.S.S. Это мой первый большой топик на хабре, так что прошу не сильно критиковать по оформлению, по возможности буду исправлять.

+38
27.7k 167
Comments 17
Top of the day