30 November 2014

Мониторинг серверов через очередь заданий на JAVA

Java
Недавно был озадачен проблемой мониторинга нескольких десятков серверов (ну наверно редко кто не сталкивался с такой задачей). Проблему можно описать несколькими правилами:

  1. Нужно периодически пинговать сервер
  2. Иногда выполнять какое-либо действие с сервером (например, исполнение команды через ssh), которое засабмитил пользователь
  3. Действия с серверами могут нескольких типов, у каждого действия свой приоритет
  4. Таски (из п.1-3) нельзя выполнять одновременно для каждого сервера
  5. Таски могут завершаться с неудачей, например по причине отсутствия связи с сервером, нужно ждать пока связь восстановится и пытатся выполнить запланированную задачу




Первое решение, которое приходит большинству в голову — запустить для каждого сервера свой поток и там делать свои дела. Это неплохо, но что делать если в процессе мониторинга набор серверов будет меняться? Запускать и завершать потоки в процессе мониторинга как-то неэлегантно. А что делать если серверов тысяча? Иметь тысячу потоков наверно можно, но зачем это делать когда большинство времени поток простаивает и ждет своего времени для очередного пинга?

На данную проблему можно взглянуть с другой стороны и представить ее в виде классической задачи «producer-consumer». У нас есть продюсеры, которые производят таски (пинг, команда ssh) и у нас есть консьюмеры, которые эти таски исполняют. Разумеется продюсеров и консьюмеров у нас не по одну экземпляру. Решить нашу задачу «producer-consumer» в JAVA не просто, а очень просто используя классы PriorityQueue и ExecutorService.

Начнем, как обычно, с юнит-теста:

    @Test
    public void testOffer() {
        PollServerQueue xq = new PollServerQueue();
        
        xq.addTask(new MyTask(1, 11));
        xq.addTask(new MyTask(2, 12));
        xq.addTask(new MyTask(1, 13));
        
        MyTask t1 = (MyTask)xq.poll();
        assertEquals(1, t1.getServerId());
        assertEquals(11, t1.getTaskId());

        MyTask t2 = (MyTask)xq.poll();
        assertEquals(2, t2.getServerId());
        assertEquals(12, t2.getTaskId());
        
        MyTask t3 = (MyTask)xq.poll();
        assertEquals(null, t3);

        xq.FinishTask(1);
        MyTask t5 = (MyTask)xq.poll();
        assertEquals(1, t5.getServerId());
        assertEquals(13, t5.getTaskId());
    }



В этом юнит-тесте мы добавили в нашу очередь три задачи типа MyTask (первый аргумент конструктора означает serverId, второй — taskId). Метод poll извлекает задачу из очереди. Если извлечь задачу не удалось (например, задачи кончились или в очереди остались задачи для серверов, для которых уже выполняются задачи) — метод poll возвращает null. Из кода видно, что завершение задачи для serverId=1 ведет к тому, что из очереди можно извлечь следующую задачу для данного сервера.

Ура! Юнит-тест написан, можно писать код. Нам потребуется:
  1. Структура данных (HashMap) для хранения текущих исполняемых задач для каждого сервера (currentTasks)
  2. Структура данных (HashMap) для хранения задачи, стоящих в очереди на исполнения. Для каждого сервера — своя очередь (waitingTasks)
  3. Структура данных (PriorityQueue) для последовательного опроса серверов. Необходимо, чтобы в следующий вызов poll() к нам приходила задача для другого сервера. Короче, структура типа револьвера, только пули после каждого выстрела остаются в барабане (peekOrder)
  4. Cтруктура (HashSet) для хранения и быстрого поиска идентификаторов серверов в револьвере, чтобы каждый раз не просматривать револьвер с первого до последнего элемента (servers)
  5. Простой объект для синхронизации (syncObject)


Теперь процедура извлечения таски из очереди будет простой и короткой. И хотя код получился компактным, публиковать его здесь я не вижу смысла, а отошлю вас на https://github.com/get-a-clue/PollServerQueueExample

Disclaimer: код на github'e не является законченным, в частности, в нем отсутствует возможность установки приоритетов для задач внутри очереди для каждого сервера и механизма обработки ошибок и возврата failed таски в очередь. Ну и сам код для пингования. Как говорится, меньше кода — лучше спишь. :)
Tags: JAVA multithreading
Hubs: Java
+2
10.1k 46
Comments 10
Ads