Pull to refresh

Мой опыт создания многопоточного приложения для работы с бэкапами

Reading time21 min
Views4.4K

Сейчас никого не удивить многопоточными приложениями, однако я думаю, что в данной статье вы сможете найти несколько интересных идей. Мое изучение Java началось именно с этого проекта, поэтому, возможно, в некоторых местах я буду сильно не прав или строить большой велосипед, но надеюсь, что кому-нибудь будет интересен опыт новичка в джаве. Приведу несколько особенностей приложения:


  • Работает с бэкапами исключительно в памяти вне зависимости от размера бэкапа
  • Не загружает весь бэкап в память
  • Операции создания/восстановления бэкапа можно отменять

Под катом будет рассмотрена архитектура приложения, а также основные возникшие проблемы и их решение.


Обзор приложения


Общение с приложением происходит через Web UI, но в будущем можно будет добавить и REST API при необходимости.


Приложение умеет:


  1. Создавать бэкапы и загружать их на одно или несколько хранилищ
  2. Восстанавливать бэкапы, загружая их с хранилища
  3. Удалять бэкапы со всех хранилищ
  4. Создавать бэкапы периодически

Поддерживаемые на данный момент хранилища:


  • Локальная файловая система (не поддерживается из Docker)
  • Dropbox

Поддерживаемые на данный момент базы данных:


  • PostgreSQL

Из особенной приложения могу отметить:


  1. Корректная работа в кластерной конфигурации
  2. Бэкап никогда не загружается полностью в память вне зависимости от размера бэкапа. Файловая система для временного хранения бэкапа также не задействуется. Как создание бэкапа, так и восстановление, а значит и загрузка/выгрузка бэкапа происходит исключительно в памяти.
  3. Кроссплатформенность — работает как на Windows, так и на Linux.
  4. Все запущенные задачи мы можем отслеживать и при необходимости отменять.

Ниже приведу скриншоты Web UI, наглядно описывающие возможности приложения.


Управление хранилищами



Управление базами данных



Создание бэкапа


Восстановление бэкапа


Управление созданными бэкапами


Периодические бэкапы


Отслеживание запущенных задач




Архитектура


Основная работа будет происходить в 3 сервисах — DatabaseBackup, Processor, Storage, а свяжем их вместе мы с помощью концепции тасков. Обо всем этом далее.


DatabaseBackup


Данный сервис отвечает за создание и восстановление plain-text бэкапа.


Интерфейс сервиса:


public interface DatabaseBackup {
    InputStream createBackup(DatabaseSettings databaseSettings, Integer id);

    void restoreBackup(InputStream in, DatabaseSettings databaseSettings, Integer id);
}

Оба метода интерфейса оперируют инстансами InputStream, так как нам необходимо, чтобы весь бэкап не загружался в память, а значит бэкап должен читаться/писаться в стриминговом режиме. Сущность DatabaseSettings предварительно создается из Web UI и хранит различные настройки, необходимые для доступа к базе данных. Что это за параметр — id — будет объяснее чуть далее.


Требования к сервису следующие:


  1. Оба метода не должны читать весь бэкап в память.
  2. Метод restoreBackup() должен восстанавливать бэкап в одиночной транзакции, чтобы в случае ошибки не оставить базу данных в неконсистентном состоянии.

Реализация для PostgreSQL (текстовое описание)

Конкретно в реализации для PostgreSQL сервис реализован следующим образом:


  1. createBackup(): создается процесс pg_dump, который будет создавать бэкап и писать его в стандартный поток вывода. Из метода возвращается стандартный поток вывода процесса (см. https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html#getInputStream--). Потоки ввода/вывода в системе основаны на буфере определенного размера, и когда процесс пишет в поток вывода, он на самом деле пишет в буфер в памяти. Здесь самое важное это то, что тред процесса не будет писать в заполненный буфер, пока последний не будет прочитан другой стороной, а значит тред будет находиться в заблокированном состоянии и бэкап не будет загружаться полностью в память. Возможно, вы сталкивались с ситуацией, когда ваша Java программа ловила deadlock при работе с процессами из-за того, что вы не читали stdout или stderr процесса. Крайне важно следить за этим, потому что процесс не может работать дальше, если он заблокируется на блокирующем I/O вызове при записи в заполненный буфер и никто данный буфер не читает.
  2. restoreBackup(): создается процесс psql, бэкап читается из переданного в метод InputStream и одновременно пишется в поток стандартного ввода psql (см. https://docs.oracle.com/javase/8/docs/api/java/lang/Process.html#getOutputStream--). Это работает потому, что plain-text PostgreSQL бэкап — это всего лишь набор DDL и DML команд, которые легко понимаются psql.

Кода много, поэтому здесь я его приводить не буду, а посмотреть сможете на GitHub по ссылке в конце статьи.


Processor


Данный сервис отвечает за применение процессоров и обратный депроцессинг бэкапа. Процессоры применяются перед загрузкой на хранилище или после выгрузки с хранилища. Пример процессора: компрессор, шифрование.


Интерфейс сервиса:


public interface Processor {
    InputStream process(InputStream in);

    InputStream deprocess(InputStream in);

    ProcessorType getType(); // ProcessorType - уникальный Enum, позволяет идентифицировать каждый процессор

    int getPrecedence(); // приоритет процессора
}

Каждый процессор имеет приоритет — если задано несколько процессоров, они будут применены в порядке убывания их приоритета. Применяя обратную функцию в таком же порядке, в котором были применены процессоры, мы получим исходный бэкап.


Storage


Данный сервис отвечает за загрузку и выгрузку бэкапа, а также его удаление с хранилища. Пример хранилища: Dropbox, локальная файловая система.


Интерфейс сервиса:


public interface Storage {
    void uploadBackup(InputStream in, StorageSettings storageSettings, String backupName, Integer id);

    InputStream downloadBackup(StorageSettings storageSettings, String backupName, Integer id);

    void deleteBackup(StorageSettings storageSettings, String backupName, Integer id);
}

Каждому созданному бэкапу сопоставляется уникальное имя — так мы сможем найти его на любом из хранилищ, на которые он был загружен. То, как бэкап представлен на хранилище — дело исключительно реализации сервиса, но при передаче имени бэкапа в одну из функций мы должны ожидать правильного поведения. Сущность StorageSettings предварительно создается из Web UI и хранит необходимые настройки для доступа к хранилищу.




Концепция тасков


Мы хотели бы иметь возможность отслеживать состояния наших задач, обрабатывать возможные ошибки в зависимости от прогресса задачи, а также отменять задачи. Поэтому мы будем оперировать далее только тасками. Каждый таск будет представлен в базе данных записью в таблице, а программно — инстансом Future (см. Java Future). Каждой записи в таблице сопоставляется своя Future (причем, если запущено несколько серверов, инстансы Future могут находиться в памяти разных серверов).


Давайте пойдем последовательно. В первую очередь нам нужен сервис для запуска тасков — создания, восстановления и удаления бэкапа.


Запуск задач


Создание бэкапа:


public Task startBackupTask(@NotNull Task.RunType runType, @NotNull List<String> storageSettingsNameList, @Nullable List<ProcessorType> processors,
                                @NotNull DatabaseSettings databaseSettings) {
        Objects.requireNonNull(runType);
        Objects.requireNonNull(storageSettingsNameList);
        Objects.requireNonNull(processors);
        Objects.requireNonNull(databaseSettings);

        BackupProperties backupProperties =
                backupPropertiesManager.initNewBackupProperties(storageSettingsNameList, processors, databaseSettings.getName());
        Task task = tasksManager.initNewTask(Task.Type.CREATE_BACKUP, runType, backupProperties.getId());
        Integer taskId = task.getId();

        Future future = tasksStarterExecutorService.submit(() -> {
            tasksManager.updateTaskState(taskId, Task.State.CREATING);
            logger.info("Creating backup...");

            try (InputStream backupStream = databaseBackupManager.createBackup(databaseSettings, taskId)) {
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }

                tasksManager.updateTaskState(taskId, Task.State.APPLYING_PROCESSORS);
                logger.info("Applying processors on created backup. Processors: {}", processors);

                try (InputStream processedBackupStream = backupProcessorManager.process(backupStream, processors)) {
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }

                    tasksManager.updateTaskState(taskId, Task.State.UPLOADING);
                    logger.info("Uploading backup...");

                    backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId);
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }

                    tasksManager.updateTaskState(taskId, Task.State.COMPLETED);
                    logger.info("Creating backup completed. Backup properties: {}", backupProperties);
                }
            } catch (IOException ex) {
                logger.error("Error occurred while closing input stream of created backup", ex);
            } catch (RuntimeException ex) {
                logger.error("Error occurred while creating backup. Backup properties: {}", backupProperties, ex);
                errorTasksManager.addErrorTask(taskId);
            } catch (InterruptedException ex) {
                tasksManager.setInterrupted(taskId);
                logger.error("Backup creating task was interrupted. Task ID: {}", taskId);
            } finally {
                futures.remove(taskId);
            }
        });

        futures.put(taskId, future);
        return task;
    }

Создание бэкапа проходит 3 главных шага в следующем порядке: создание бэкапа -> применение процессоров -> загрузка на хранилище. Практически во все методы сервисов мы пробрасываем ID текущего таска для того, чтобы сервис мог сообщить об ошибке из треда, который работает в фоне. Об обработке ошибок, для чего здесь InterruptedException и что происходит с ошибкой после получения RuntimeException будет рассказано далее.


А вот как мы будем запускать задачу создания бэкапа:


tasksStarterService.startBackupTask(Task.RunType.USER, storageSettingsNameList, processors, databaseSettings);

Первым параметром мы передаем инициатора задачи: юзер или же внутренняя задача сервера (пример внутренней задачи — периодический бэкап). Знание инициатора задачи позволяет нам показывать в Web UI только те задачи, которые были запущены пользователем. Остальные параметры необходимы для непосредственного создания бэкапа — список хранилищ, процессоры для применения, база данных, дамп которой необходимо создать.


При создании бэкапа также создается запись в базе данных под названием BackupProperties. Данная сущность будет хранить такие свойства бэкапа, как имя, примененные процессоры и список хранилищ, на которые бэкап был загружен. Далее для восстановления или удаления бэкапа мы будем оперировать именно данной сущностью.


Задача в базе данных хранится в следующем виде:


@Entity
@Table(name = "backup_tasks")
public class Task {
    /**
     * Identifier of each backup task. Identifier is generated by PostgreSQL database after saving of entity.
     */
    @Id
    @Column(insertable = false, updatable = false)
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer id;

    /**
     * Backup task type.
     * <p>
     * Type is set at the very start of any task and can't be changed.
     *
     * @see Type
     */
    @Enumerated(EnumType.STRING)
    @Column(updatable = false)
    private Type type;

    /**
     * Who initiated a task: user or server.
     * <p>
     * We need to know it to show on front only these tasks that was started by user.
     *
     * @see RunType
     */
    @Enumerated(EnumType.STRING)
    @Column(updatable = false)
    private RunType runType;

    /**
     * Backup task state.
     * <p>
     * State is updated with every new step in task being executed.
     *
     * @see Task.State
     */
    @Enumerated(EnumType.STRING)
    private State state;

    /**
     * Whether task has been interrupted or not.
     * <p>
     * Default is {@literal false}.
     */
    @Column(insertable = false)
    private boolean interrupted;

    /**
     * Identifier of {@link BackupProperties}.
     * <p>
     * We need to know backup ID to be able to handle occurred errors.
     */
    @Column(updatable = false)
    private Integer backupPropertiesId;

    /**
     * Start time of the task.
     */
    @Column(updatable = false)
    private LocalDateTime date;

    public enum RunType {
        USER,
        INTERNAL
    }

    public enum State {
        PLANNED,
        CREATING,
        RESTORING,
        DELETING,
        APPLYING_PROCESSORS,
        APPLYING_DEPROCESSORS,
        DOWNLOADING,
        UPLOADING,
        COMPLETED,
    }

    public enum Type {
        CREATE_BACKUP {
            @Override
            public String toString() {
                return "CREATE BACKUP";
            }
        },
        RESTORE_BACKUP {
            @Override
            public String toString() {
                return "RESTORE BACKUP";
            }
        },
        DELETE_BACKUP {
            @Override
            public String toString() {
                return "DELETE BACKUP";
            }
        }
    }

    // getters & setters...

}

Таким образом, описать процесс создания бэкапа в виде диаграммы можно так:
Процесс создания бэкапа




Остальные типы задач запускаются по аналогии. Дабы не загромождать статью огромным количеством кода, для любопытных приведу код запуска задач по восстановлению и удалению бэкапа отдельно в спойлере.


Восстановление бэкапа
public Task startRestoreTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties, @NotNull String storageSettingsName,
                                 @NotNull DatabaseSettings databaseSettings) {
        Objects.requireNonNull(runType);
        Objects.requireNonNull(backupProperties);
        Objects.requireNonNull(storageSettingsName);
        Objects.requireNonNull(databaseSettings);

        Task task = tasksManager.initNewTask(Task.Type.RESTORE_BACKUP, runType, backupProperties.getId());
        Integer taskId = task.getId();

        Future future = tasksStarterExecutorService.submit(() -> {
            tasksManager.updateTaskState(taskId, Task.State.DOWNLOADING);
            logger.info("Downloading backup...");

            try (InputStream downloadedBackup =
                         backupLoadManager.downloadBackup(backupProperties.getBackupName(), storageSettingsName, taskId)) {
                if (Thread.interrupted() || downloadedBackup == null) {
                    throw new InterruptedException();
                }

                tasksManager.updateTaskState(taskId, Task.State.APPLYING_DEPROCESSORS);
                logger.info("Deprocessing backup...");

                try (InputStream deprocessedBackup = backupProcessorManager.deprocess(downloadedBackup, backupProperties.getProcessors())) {
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }

                    tasksManager.updateTaskState(taskId, Task.State.RESTORING);
                    logger.info("Restoring backup...");

                    databaseBackupManager.restoreBackup(deprocessedBackup, databaseSettings, taskId);
                    if (Thread.interrupted()) {
                        throw new InterruptedException();
                    }

                    tasksManager.updateTaskState(taskId, Task.State.COMPLETED);
                    logger.info("Restoring backup completed. Backup properties: {}", backupProperties);
                }
            } catch (IOException ex) {
                logger.error("Error occurred while closing input stream of downloaded backup", ex);
            } catch (RuntimeException ex) {
                logger.info("Error occurred while restoring backup. Backup properties: {}", backupProperties, ex);
                errorTasksManager.addErrorTask(taskId);
            } catch (InterruptedException ex) {
                tasksManager.setInterrupted(taskId);
                logger.error("Task was interrupted. Task ID: {}", taskId);
            } finally {
                futures.remove(taskId);
            }
        });

        futures.put(taskId, future);
        return task;
    }

Восстановление бэкапа проходит 3 главных шага в следующем порядке: выгрузка бэкапа с хранилища -> применение депроцессоров для получения исходного plain-text бэкапа -> восстановление бэкапа.


Запускать восстановление следующим образом:


tasksStarterService.startRestoreTask(Task.RunType.USER, backupProperties, storageSettingsName, databaseSettings);

Процесс восстановления бэкапа в виде диаграммы:
Процесс восстановления бэкапа


Удаление бэкапа
public Task startDeleteTask(@NotNull Task.RunType runType, @NotNull BackupProperties backupProperties) {
        Objects.requireNonNull(runType);
        Objects.requireNonNull(backupProperties);

        Task task = tasksManager.initNewTask(Task.Type.DELETE_BACKUP, runType, backupProperties.getId());
        Integer taskId = task.getId();

        Future future = tasksStarterExecutorService.submit(() -> {
            try {
                logger.info("Deleting backup started. Backup properties: {}", backupProperties);
                tasksManager.updateTaskState(taskId, Task.State.DELETING);

                backupLoadManager.deleteBackup(backupProperties, taskId);
                if (Thread.interrupted()) {
                    throw new InterruptedException();
                }

                tasksManager.updateTaskState(taskId, Task.State.COMPLETED);
                logger.info("Deleting backup completed. Backup properties: {}", backupProperties);
            } catch (RuntimeException ex) {
                logger.error("Error occurred while deleting backup. Backup properties: {}", backupProperties, ex);
                errorTasksManager.addErrorTask(taskId);
            } catch (InterruptedException ex) {
                tasksManager.setInterrupted(taskId);
                logger.error("Task was interrupted. Task ID: {}", taskId);
            } finally {
                futures.remove(taskId);
            }
        });

        futures.put(taskId, future);
        return task;
    }

Процесс удаления бэкапа довольно прост: бэкап просто удаляется со всех хранилищ, на которые он был загружен.


Запускать удаление следующим образом:


tasksStarterService.startDeleteTask(Task.RunType.USER, backupProperties);

Процесс удаления бэкапа в виде диаграммы:
Процесс удаления бэкапа




Отмена тасков


Что такое отмена задачи? Конечно же, это ничто иное, как прерывание треда. Вы могли видеть, что весь основной код, выполняющийся в Future, обернут в следующую try-catch конструкцию:


try {
    ...
} catch (InterruptedException ex) {
    ...
    tasksManager.setInterrupted(taskId);
}

А также после каждого важного метода, поток выполнения которого мы контролируем, установлена следующая конструкция:


if (Thread.interrupted()) {
    throw new InterruptedException();
}

Перед тем, как идти дальше, следует привести краткую теорию о прерываниях и состояниях тредов JVM.


Треды в JVM могут иметь следующие состояния:


  1. New
  2. Runnable
  3. Timed waiting
  4. Waiting
  5. Blocked
  6. Terminated

Нас интересуют только состояния Waiting и Timed waiting. Тред переводится в состояние Waiting методами Object.wait(), Thread.join() и прочими. Тред переводится в состояние Timed waiting (т.е. ожидание, которое длится определенный промежуток времени) методами Object.wait(timeout), Thread.join(timeout), Thread.sleep(sleeping) и прочими.


Самое важное здесь то, что если прервать тред перед входом в состояние Waiting или Timed waiting или же когда тред находится в данном состоянии, то тред проснется, выбросив исключение InterruptedException.


Но это еще не все. Совсем не факт, что тред когда-либо перейдет в данные состояния, выполняя создание, восстановление или удаление бэкапа. Как же тогда сообщить треду о том, что он был прерван?


Первый способ — это самостоятельная проверка флага прерывания тредом методами Thread.interrupted() или Thread.currentThread.isInterrupted(). Разница между ними в том, что первый вызывает приватный нативный метод currentThread.isInterrupted(boolean ClearInterrupted), передавая в него true, обозначая то, что флаг прерывания будет очищен, а второй — передавая false, оставляя флаг прерывания нетронутым. Выбор между этими двумя методами зависит исключительно от ситуации. Когда бросается InterruptedException, флаг прерывания также очищается — об этом стоит помнить.


Но должен же быть способ легче — и он есть. В приложении существует огромное количество работы с I/O стримами, а значит и с I/O методами. Наша задача — проследить за тем, чтобы при вызове методов read() или write(int b) на I/O стриме при прерывании была выброшена какая-нибудь ошибка, сообщающая о том, что блокирующий I/O вызов был прерван. К счастью, Java имеет такое исключение — InterruptedIOException. Однако, не все read/write методы стримов следят за прерываниями треда, а конкретно за этим следит только PipedInputStream. Поэтому в тех местах, где данный стрим не задействуется, мы должны расширить метод read/write таким образом, чтобы при наличии прерывания выбрасывалось исключение InterruptedIOException. На самом деле, мне в приложении хватило расширения метода read() только в единственном месте — при возвращении InputStream из метода выгрузки бэкапа. Именно таким образом мы можем узнать о происхождении прерывания, не расставляя везде шаблонные проверки флага. Однако, важно ловить данное исключение отдельно от IOException и отдельно его обрабатывать. Конечно же, не обойтись без помощи шаблонной проверки флага в некоторых местах, но уже стало лучше.


Также важно отметить то, что если при обработке прерывания флаг был очищен, всегда необходимо снова установить флаг прерывания для того, чтобы после возвращения из метода мы смогли узнать о произошедшем прерывании.


Поясню на примере, почему это важно. Пусть мы загружаем бэкап на хранилище в методе upload() и происходит прерывание. Прерывание обрабатывается, работа останавливается и происходит возврат из метода. Прерывание не происходит спроста — оно означает, что или где-то произошла ошибка, или юзер отменил задачу. Вне зависимости от причины, мы обязаны остановить всю работу в данном Future. Но если не установить перед возвратом из метода загрузки флаг прерывания снова, мы никогда не узнаем в главном блоке Future о случившемся прерывании.
Этот же пример кодом:


backupLoadManager.uploadBackup(processedBackupStream, backupProperties, taskId); <- здесь произошло прерывание, метод обязан снова установить флаг прерывания
if (Thread.interrupted()) { // после выхода из метода мы проверяем, не завершился ли вызов метода из-за того, что произошло прерывание
    throw new InterruptedException();
}

Таким образом, это хорошая практика обрабатывать InterruptedException или InterruptedIOException следующим образом:


try {
    ...
} catch (InterruptedException e) { // или InterruptedIOException
    ...
    // re-interrupt the thread
    Thread.currentThread().interrupt();
}

Хорошо, обрабатывать прерывание мы умеем, но кто же будет собственно прерывать треды?
Для этого мы создадим еще одну сущность под названием CancelTask, которая будет хранить ID таска для отмены, а также напишем вотчер, который и будет пытаться прерывать таски. Почему именно пытаться? Потому что:


  1. Невозможно прервать тред в памяти другого сервера. У нас может работать несколько серверов, а значит Future раскиданы по разным серверам. Таким образом, когда запрос на отмену задачи приходит на какой-то из серверов, нужная Future может находиться в памяти другого сервера.
  2. Таск не может быть отменен, так как Future была утеряна из-за падения сервера.

Кратко опишу алгоритм отмены в вотчере:
Вотчер достает все записи из таблицы cancel_tasks (при этом лок не ставится), проходит по каждой и пытается получить соответствующую Future из своей памяти. Если Future успешно получена — соответствующий тред прерывается, происходит revert таска и запрос удаляется из таблицы. Если же таймаут запроса на отмену таска превышен (что означает то, что сервер упал и Future были потеряны) — запрос просто удаляется из таблицы. Если несколько серверов заметят таймаут и удалят запись из таблицы, ничего страшного не произойдет, потому что удаление в PostgreSQL идемпотентно.


Код CancelTasksWatcher:


Скрытый текст
/**
 * This class scans for tasks to cancel and tries to cancel them.
 */
@Component
class CancelTasksWatcher {
    private static final Logger logger = LoggerFactory.getLogger(CancelTasksWatcher.class);
    private static final Duration cancelTimeout = Duration.ofMinutes(10);
    private CancelTasksManager cancelTasksManager;
    private TasksStarterService tasksStarterService;
    private TasksManager tasksManager;

    // spring setters...

    /**
     * This watcher wakes up every time 10 seconds passed from the last completion, checks if there are any tasks to cancel and tries to
     * cancel each task.
     * <p>
     * Since there are can be working more that one instance of the program, {@literal Future} instance of task can belong to different
     * servers. We can't get access to {@literal Future} if it's not in memory of the server where task cancellation request was accepted.
     * So the purpose of this watcher is to be able cancel tasks that works in the other instance of program. Each server has this watcher
     * checking for available cancellation requests and if any, the watcher tries to cancel corresponding {@literal Future}.
     * If cancellation is successful task will be also reverted.
     * <p>
     * If task cancellation request timeout exceeded, then it means a server that had requested {@literal Future} instances has been
     * shutdown, so all {@literal Future} instances lost and task can't be canceled. In such case task cancellation request will be ignored.
     *
     * @see TasksStarterService#getFuture(Integer)
     * @see TasksManager#revertTask(Task)
     */
    @Scheduled(fixedDelay = 10 * 1000)
    @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
    public void watchTasksToCancel() {
        Iterable<CancelTask> cancelTasks = cancelTasksManager.findAll();

        Iterable<Task> tasks = tasksManager.findAllById(StreamSupport.stream(cancelTasks.spliterator(), false)
                .map(CancelTask::getTaskId).collect(Collectors.toList()));
        Map<Integer, Task> tasksAsMap = StreamSupport.stream(tasks.spliterator(), false)
                .collect(Collectors.toMap(Task::getId, Function.identity()));

        List<Integer> taskIdsForDeleting = new ArrayList<>();

        for (CancelTask cancelTask : cancelTasks) {
            Integer taskId = cancelTask.getTaskId();

            Task task = tasksAsMap.get(taskId);
            if (task == null) {
                logger.error("Can't cancel task: no such entity with ID {}", taskId);
                taskIdsForDeleting.add(taskId);
                continue;
            }

            // timeout exceeded, that is server shutdown and lost all Future instances, so task can't be canceled
            if (LocalDateTime.now(ZoneOffset.UTC).isAfter(cancelTask.getPutTime().plus(cancelTimeout))) {
                logger.error("Can't cancel task: timeout exceed. Task ID: {}", taskId);
                taskIdsForDeleting.add(taskId);
                continue;
            }

            tasksStarterService.getFuture(taskId).ifPresent(future -> {
                logger.info("Canceling task with ID {}", taskId);

                boolean canceled = future.cancel(true);
                if (canceled) {
                    try {
                        // give time to properly handle interrupt
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        // should not happen
                    }
                    tasksManager.revertTask(task);
                }

                taskIdsForDeleting.add(taskId);

                logger.info("Task canceled: {}. Task ID: {}", canceled, taskId);
            });
        }

        cancelTasksManager.deleteByTaskIdIn(taskIdsForDeleting);
    }
}



Обработка ошибок


Выше вы могли видеть, что весь основной код, выполняющийся в Future, обернут в следующую try-catch конструкцию:


try {
    ...
} catch (RuntimeException e) {
    ...
    errorTasksManager.addErrorTask(taskId);
}

Каждый метод может вернуть RuntimeException при происхождении исключения, и вся работа в Future будет остановлена, а задача будет помечена как ошибочная и позже обработана.


Метод addErrorTask(taskId) создает в отдельной таблице в базе данных запись, которая хранит ID таска, в котором произошла ошибка.
Что же делать с записью дальше? Далее в работу вступает еще один вотчер, который будет периодически просыпаться, проверять на наличие ошибочных тасков и обрабатывать их, откатывая сделанные изменения.


Кратко опишу алгоритм работы данного вотчера:
Вотчер загружает по одной странице ошибочных тасков при каждом пробуждении и устанавливает лок на полученные записи для того, чтобы ошибочный таск не был обработан несколько раз, ведь у нас может быть несколько серверов. Лок — это PostgreSQL select for update, причем к select запросу добавляется skip locked для пропуска уже обрабатывающихся ошибочных тасков. Далее, заполучив задачу, он вызывает метод revertTask(), в котором и происходит основная работа по отмене сделанных изменений в зависимости от прогресса задачи.


Код ErrorTasksWatcher:


Скрытый текст
/**
 * This class scans for erroneous tasks and handles them depending on their state.
 */
@Component
class ErrorTasksWatcher {
    private static final Logger logger = LoggerFactory.getLogger(ErrorTasksWatcher.class);

    private static final Integer nRows = 10;

    private TasksManager tasksManager;

    private ErrorTasksManager errorTasksManager;

    // spring setters...

    /**
     * This watcher wakes up every time 1 minute passed from the last completion, checks backup states periodically and handles erroneous
     * tasks if any.
     * <p>
     * The watcher handles at most N tasks as described by {@link #nRows} constant and skips already locked tasks.
     * When retrieving error tasks from database pessimistic lock is set. It allows safely run more than one copy of program, as no other
     * watcher can pick up already being handled error tasks.
     * <p>
     * If the server shutdowns while rows was locked, transaction will be rolled back and lock released, so these entities can be picked
     * up by the other running server.
     */
    @Scheduled(fixedDelay = 60 * 1000)
    @Transactional(isolation = Isolation.READ_COMMITTED, propagation = Propagation.REQUIRES_NEW)
    public void watchErrorTasks() {
        for (ErrorTask errorTask : errorTasksManager.findFirstNAndLock(nRows)) {
            if (!errorTask.isErrorHandled()) {
                Integer backupTaskId = errorTask.getTaskId();

                Optional<Task> optionalTask = tasksManager.findById(backupTaskId);
                if (!optionalTask.isPresent()) {
                    logger.info("Can't handle erroneous task: no corresponding backup task entity. Backup task ID: {}", backupTaskId);
                    continue;
                }

                tasksManager.revertTask(optionalTask.get());
                errorTask.setErrorHandled(true);
            }
        }
    }
}

Код revertTask(Task):


Скрытый текст
    /**
     * This function reverts erroneous task by its entity.
     * <p>
     * Use this function only after canceling related {@literal Future}.
     * <p>
     * If the task was of the type {@link Task.Type#CREATE_BACKUP} then related {@link BackupProperties} will be deleted.
     *
     * @param task the entity
     */
    public void revertTask(@NotNull Task task) {
            Objects.requireNonNull(task);

            Task.State state = task.getState();

            switch (state) {
                case DOWNLOADING:
                case APPLYING_DEPROCESSORS:
                case RESTORING:
                case DELETING: {
                    logger.info("Handling broken operation. Operation: {}. No extra actions required", state.toString());
                    break;
                }
                case CREATING:
                case APPLYING_PROCESSORS: {
                    logger.info("Handling broken operation. Operation: {}. Delete backup properties...", state.toString());

                    Integer backupPropertiesID = task.getBackupPropertiesId();

                    if (!backupPropertiesManager.existsById(backupPropertiesID)) {
                        logger.error("Can't revert task: no related backup properties. Task info: {}", task);
                        return;
                    }

                    backupPropertiesManager.deleteById(backupPropertiesID);
                    break;
                }
                case UPLOADING: {
                    logger.info("Handling broken operation. Operation: {}. Deleting backup from storage...", state);

                    Integer backupPropertiesId = task.getBackupPropertiesId();
                    Optional<BackupProperties> optionalBackupProperties = backupPropertiesManager.findById(backupPropertiesId);
                    if (!optionalBackupProperties.isPresent()) {
                        logger.error("Can't revert task: no related backup properties. Task info: {}", task);
                        return;
                    }

                    tasksStarterService.startDeleteTask(Task.RunType.INTERNAL, optionalBackupProperties.get());
                    backupPropertiesManager.deleteById(backupPropertiesId);
                    break;
                }
                default: {
                    logger.error("Can't revert task: unknown state. Task info: {}", task);
                }
            }
        }

Разберем возможные ситуации:


  1. Таск находился в состоянии DOWNLOADING, APPLYING_DEPROCESSORS, RESTORING, DELETING — бэкап восстанавливался или удалялся. Тогда никаких дополнительных действий не требуется, так как нет никаких ресурсов для освобождения.
  2. Таск находился в состоянии CREATING, APPLYING_PROCESSORS — бэкап создавался, но еще не начал загружаться на хранилище. Тогда мы просто удалим запись BackupProperties из таблицы, чтобы не показывать пользователю поломанный бэкап (сущности BackupProperties отображаются в Web UI в разделе управления бэкапами).
  3. Таск находился в состоянии UPLOADING — бэкап создавался и начал загружаться на хранилище. Тогда нам требуется не только удалить сущность BackupProperties из базы данных, но и удалить бэкап с физического хранилища. Поэтому мы запускаем задачу по удалению бэкапа.

Однако, это еще не все. Что же делать с возможными исключениями в тех тредах, которые работают в фоне? Например, метод создания бэкапа, выполняющийся в треде Future (назовем Тред 1), может запускать еще один тред, который будет писать бэкап в возвращенный методом создания бэкапа InputStream (назовем Тред 2). Что, если исключение происходит в Треде 2, а Тред 1 уже давно начал загружать бэкап на хранилище и исключение из Треда 2 не может быть поймано?


Для того, чтобы корректно обрабатывать эту ситуацию, мы напишем специальный сервис, который следует вызывать вручную при происхождении прерывания. Данный сервис будет прерывать тред Future (все тот же Тред 1) и помечать таск как ошибочный:


    public void onError(@NotNull Throwable t, @NotNull Integer taskId) {
        logger.error("Exception caught. Task ID: {}", taskId, t);

        Optional<Future> optionalFuture = tasksStarterService.getFuture(taskId);
        if (!optionalFuture.isPresent()) {
            logger.error("Can't cancel the Future of task with ID {}: no such Future instance", taskId);
        } else {
            boolean canceled = optionalFuture.get().cancel(true);
            if (!canceled) {
                logger.error("Error canceling the Future of task with ID {}", taskId);
            } else {
                logger.info("Task canceled. Task ID: {}", taskId);
                errorTasksManager.setError(taskId);
            }
        }
    }

Именно для того, чтобы такое решение стало возможным, мы пробрасывали в методы сервисов ID текущего таска, ведь для того, чтобы получить нужную Future необходимо её каким-то образом идентифицировать, и ID таска нам в этом помогает.


Здесь важно отметить то, что задача будет помечена как ошибочная только в том случае, если тред был прерван успешно, так как необходимо исключать ситуацию, когда задача была прервана пользователем и начался откат изменений, а при обработке ошибочного таска откат изменений начнется еще раз.


О прерываниях тредов, работающих в фоне:


Возможно, вы могли заметить, что я ничего не говорил о прерывании тредов, которые работают в фоне. Ошибки из них мы ловить умеем — но при ошибке прерываем только тред Future.


На самом деле, абсолютно все треды в моей программе, которые работают в фоне, так или иначе работают с I/O стримами, и прерывание в них может быть обнаружено достаточно легко — достаточно получить исключение при записи/чтении из закрытого стрима. Не может быть такой ситуации, что стрим будет закрыт просто так. Он будет закрыт только в двух случаях:


  1. Работа завершилась, все хорошо. Стрим закрывается ровно после того, как работа с ним завершилась — исключение не может быть получено.
  2. Произошла ошибка при работе — тогда после прерывания треда Future все ресурсы освобождаются, включая любые стримы. Любой поток, пишущий/читающий из такого стрима, получит исключение, сообщающее о закрытом стриме (на самом деле, это ещё целое приключение — распознать в произошедшем IOException закрытый стрим, поэтому практически везде это или проверка сообщения исключения, или прочие костыли).

Другой способ, более простой — это просто проверка флага прерывания таска у записи в базе данных (ведь мы пробрасываем ID таска практически во все важные методы сервисов, поэтому знаем, на какую запись смотреть), но пока мне это никогда не было необходимо.




Таким образом, мы рассмотрели архитектуру приложения, а также реализацию обработки ошибок и отмены задач. Многие интересные детали реализации я опустил, чтобы не перегружать статью, но приведенной выше информации вполне достаточно для понимания сути происходящей работы в приложении.


Планы на будущее


  1. Обновить Web UI: сделать его информативнее, удобнее для пользователя. Сейчас он выглядит крайне топорно, хоть и исполняет свои обязанности
  2. Добавить поддержку других хранилищ
  3. Добавить поддержку других баз данных
  4. Добавить проверку целостности бэкапов
  5. Добавить поддержку инкрементальных бэкапов

Заключение


Ссылки на приложение:



На этом все, спасибо за внимание! Буду рад видеть ваши отзывы и замечания о приложении в комментариях, а также новые пул реквесты на GitHub!

Tags:
Hubs:
Total votes 13: ↑12 and ↓1+11
Comments2

Articles