Pull to refresh
334.41
ГК ЛАНИТ
Ведущая многопрофильная группа ИТ-компаний в РФ

#PostgreSQL. Ускоряем деплой в семь раз с помощью «многопоточки»

Reading time 11 min
Views 17K
Всем привет! Мы на проекте ГИС ЖКХ используем PostgreSQL и недавно столкнулись с проблемой долгого выполнения SQL скриптов из-за быстрого увеличения объема данных в БД. В феврале 2018 года на PGConf я рассказал, как мы решали эту проблему. Слайды презентации доступны на сайте конференции. Предлагаю вашему вниманию текст моего выступления.



Дано



Про ГИС ЖКХ уже была подробная статья в блоге группы ЛАНИТ на Хабре. Если в двух словах ГИС ЖКХ – это первый в России федеральный портал о всей информации в ЖКХ, который запущен почти во всех регионах (в 2019 году присоединятся Москва, Питер и Севастополь). За последние три месяца в систему было загружено более 12 ТБ данных о домах, лицевых счетах, фактах оплаты и много-много еще чего, а всего в PostgreSQL сейчас лежит уже более 24 ТБ.

Проект архитектурно разделен на подсистемы. Каждой подсистеме выделена отдельная база данных. Всего таких баз сейчас около 60, они размещены на 11 виртуальных серверах. Некоторые подсистемы нагружены сильнее других, и у них базы по объему могут занимать 3-6 терабайт.

ЦУП, у нас проблема




Теперь немного подробнее расскажу о проблеме. Начну издалека: у нас код приложения и код миграций базы данных (под миграцией я понимаю перевод базы данных из одной ревизии в другую с выполнением всех необходимых SQL скриптов для этого) хранятся вместе в системе контроля версий. Это возможно благодаря использованию Liquibase (подробнее про Liquibase на проекте можно узнать из доклада Миши Балаяна на TechGuruDay в ЛАНИТ).

Теперь давайте представим себе выпуск версии. Когда данных всего пара терабайт или меньше и все таблицы в пределах сотни гигабайт, изменения (миграции) любых данных или изменения структуры в любых таблицах проходят быстро (обычно).

А теперь представим, что у нас данных уже пара десятков терабайт и появилось несколько таблиц по терабайту и больше (возможно, разбитых на партиции). В новой версии нам нужно провести миграцию по одной из этих таблиц или еще хуже сразу по всем. И при этом время регламентных работ увеличивать нельзя. И при этом такую же миграцию нужно провести и на тестовых базах данных, где железо слабее. И при этом нужно понять заранее, сколько в сумме все миграции по времени займут. Здесь и начинается проблема.

Сперва мы попробовали советы из официальной документации PostgreSQL (удаление индексов и FK перед массовой миграцией, пересоздание таблиц с нуля, использование copy, динамическое изменение конфига). Это дало эффект, но нам хотелось еще быстрее и удобнее (тут, конечно, дело субъективное – кому как удобно :–)). В результате мы реализовали параллельное выполнение массовых миграций, что увеличило скорость на многих кейсах в разы (а иногда и на порядок). Хотя на самом деле запускается параллельно несколько процессов, внутри команды у нас прижилось слово “многопоточка”.

«Многопоточка»



Основная идея такого подхода заключается в разделении большой таблицы на непересекающиеся диапазоны (например, функцией ntile) и выполнение SQL скрипта не сразу по всем данным, а параллельно по нескольким диапазонам. Каждый параллельный процесс забирает себе один диапазон, блокирует его и начинает выполнять SQL скрипт только для данных из этого диапазона. Как только скрипт отработал, мы опять ищем незаблокированный и еще не обработанный диапазон и повторяем операцию. Важно выбрать правильный ключ для разделения. Это должно быть проиндексированное поле с уникальными значениями. Если такого поля нет, можно использовать служебное поле ctid.

Первая версия «многопоточки» была реализована с помощью вспомогательной таблицы с диапазонами и функции взятия следующего диапазона. Требуемый SQL скрипт подставлялся в анонимную функцию и запускался в требуемом количестве сессий, обеспечивая параллельное выполнение.

Пример кода
-- Таблица UPDATE_INFO_STEPS используется для реализации обновления/заполнения 
-- больших таблиц, выполнения сложных запросов обновления/заполнения
CREATE TABLE UPDATE_INFO_STEPS (
  BEGIN_GUID varchar(36), 
  END_GUID varchar(36) NOT NULL, 
  STEP_NO int, 
  STATUS char(1), 
  BEGIN_UPD timestamp, 
  END_UPD timestamp, 
  ROWS_UPDATED int, 
  ROWS_UPDATED_TEXT varchar(30), 
  DISCR varchar(10)
);
ALTER TABLE UPDATE_INFO_STEPS ADD PRIMARY KEY(discr, step_no);


-- Функция FUNC_UPDATE_INFO_STEPS реализует ключевой функционал. 
-- Возможность "брать" следующий интервал, если текущий занят.
CREATE OR REPLACE FUNCTION func_update_info_steps(
  pStep_no int, 
  pDiscr varchar(10)
) RETURNS text AS 
$BODY$ 
DECLARE 
  lResult text;
BEGIN 
  SELECT 
    'SUCCESS' INTO lResult 
  FROM 
    update_info_steps 
  WHERE 
    step_no = pStep_no 
    AND discr = pDiscr 
    AND status = 'N' 
  FOR UPDATE NOWAIT;
    
  UPDATE 
    UPDATE_INFO_STEPS 
  SET 
    status = 'A', 
    begin_upd = now() 
  WHERE 
    step_no = pStep_no 
    AND discr = pDiscr 
    AND status = 'N';
    
  return lResult;
  
  EXCEPTION WHEN lock_not_available THEN 
    SELECT 
      'ERROR' INTO lResult;
  return lResult;
END;
$BODY$ 
LANGUAGE PLPGSQL VOLATILE;


-- Пример использования (1 процесс на 1 сессию)
-- Шаг 1. Заполняем служебную таблицу интервалами для обработки.
DO 
LANGUAGE PLPGSQL 
$$ 
DECLARE 
  -- Указать количество обрабатываемых записей за одну итерацию
  l_count int := 10000;
  -- Подставить идентификатор
  l_discr VARCHAR(10) := '<discr>';
BEGIN 
  INSERT INTO UPDATE_INFO_STEPS (
    BEGIN_GUID, END_GUID, STEP_NO, STATUS, 
    DISCR
  ) 
  SELECT 
    min(guid) BEGIN_GUID, 
    max(guid) END_GUID, 
    RES2.STEP STEP_NO, 
    'N' :: char(1) STATUS, 
    l_discr DISCR 
  FROM 
    (
      SELECT 
        guid, 
        floor(
          (ROWNUM - 1) / l_count
        ) + 1 AS STEP 
      FROM 
        (
          -- Подставить название колонки
          SELECT 
            <column> AS GUID, 
            -- Подставить название колонки
            row_number() over (
              ORDER BY 
                <column>
            ) AS ROWNUM
          FROM 
            -- Подставить схему и название таблицы
            <schema>.<table_name> 
          ORDER BY 
            1 --
            ) RES1
    ) RES2 
  GROUP BY 
    RES2.step;
END;
$$;


-- Шаг 2. Используя служебную таблицу, выполняем скрипт UPDATE.
DO 
LANGUAGE PLPGSQL 
$$ 
DECLARE 
  cur record;
  vCount int;
  vCount_text varchar(30);
  vCurStatus char(1);
  vCurUpdDate date;
  -- Подставить идентификатор
  l_discr varchar(10) := '<discr>';
  l_upd_res varchar(100);
BEGIN 
  FOR cur IN (
    SELECT 
      * 
    FROM 
      UPDATE_INFO_STEPS 
    WHERE 
      status = 'N' 
      AND DISCR = l_discr 
    ORDER BY 
      step_no
  ) LOOP 
  
    vCount := 0;
    -- Внутренняя транзакция обязательна!
    SELECT 
      result INTO l_upd_res 
    FROM 
      dblink(
        '<parameters>', 
        'SELECT FUNC_UPDATE_INFO_STEPS(' || cur.step_no 
          || ',''' 
          || l_discr 
          || ''')'
      ) AS T (result text);
      
    IF l_upd_res = 'SUCCESS' THEN 
      -- Основной скрипт. В данной секции необходимо выполнять 
      -- требуемые действия по обновлению, вставке и тп.
      -- Обязательное требование - использовать интервал 
      -- cur.begin_guid - cur.end_guid и dblink на "самого себя".
      -- Указан примерный скрипт.
      SELECT 
        dblink(
          '<parameters>', 
          'UPDATE FOO set level = 42
           WHERE id BETWEEN ''' || cur.begin_guid 
               || ''' AND ''' 
               || cur.end_guid 
               || ''''
        ) INTO vCount_text;
      -- Конец основного скрипта.
      
      SELECT 
        dblink(
          '<parameters>', 
          'update UPDATE_INFO_STEPS 
           SET status = ''P'', end_upd = now(), 
           rows_updated_text = ''' || vCount_text || '''
           WHERE step_no = ' || cur.step_no || '
           AND discr = ''' || l_discr || ''''
        ) INTO l_upd_res;
    END IF;
  END LOOP;
END;
$$;


-- Мониторинг выполнения.
SELECT 
  SUM(CASE status WHEN 'P' THEN 1 ELSE 0 END) done, 
  SUM(CASE status WHEN 'A' THEN 1 ELSE 0 END) processing, 
  SUM(CASE status WHEN 'N' THEN 1 ELSE 0 END) LEFT_, 
  round(
    SUM(CASE status WHEN 'P' THEN 1 ELSE 0 END):: numeric / COUNT(*)* 100 :: numeric, 
    2
  ) done_proc 
FROM 
  UPDATE_INFO_STEPS 
WHERE 
  discr = '<discr>';

Такой подход хоть и работал быстро, но требовал очень большого числа действий руками. И если деплой проходил в 3 часа ночи, ДБА должен был отловить момент выполнения «многопоточного» скрипта в Liquibase (который его выполнял, по сути, в одном процессе) и запустить руками еще несколько процессов для ускорения.

«МноGOпоточка 2.0»



Предыдущая версия «многопоточки» была неудобной в использовании. Поэтому мы сделали приложение на Go, которое автоматизирует процесс (можно сделать и на Python, например, да и на многих других языках).

Сперва мы разбиваем данные в изменяемой таблице на диапазоны. После этого во вспомогательную таблицу задач добавляем информацию о скрипте – его имя (уникальный идентификатор, например, имя задачи в Jira) и количество одновременно запускаемых процессов. Затем во вспомогательную таблицу скриптов добавляем текст SQL миграции, разбитый на диапазоны.

Пример кода
-- В целевой БД необходимо создать объекты, в которых будет храниться 
-- конфигурация многопоточного обновления (pg_parallel_task)
-- и логи задания (pg_parallel_task_statements).
CREATE TABLE IF NOT EXISTS public.pg_parallel_task (
  name text primary key, threads_count int not null DEFAULT 10, 
  comment text
);
COMMENT ON table public.pg_parallel_task 
  IS 'Задание параллельного выполнения';
COMMENT ON COLUMN public.pg_parallel_task.name 
  IS 'Уникальный идентификатор';
COMMENT ON COLUMN public.pg_parallel_task.threads_count 
  IS 'Количество одновременных потоков обработки. По умолчанию 10';
COMMENT ON COLUMN public.pg_parallel_task.comment 
  IS 'Комментарий';

  
CREATE TABLE IF NOT EXISTS public.pg_parallel_task_statements (
  statement_id bigserial primary key, 
  task_name text not null references public.pg_parallel_task (name), 
  sql_statement text not null, 
  status text not null check (
    status in (
      'new', 'in progress', 'ok', 'error'
    )
  ) DEFAULT 'new', 
  start_time timestamp without time zone, 
  elapsed_sec float(8), 
  rows_affected bigint, 
  err text
);
COMMENT ON table public.pg_parallel_task_statements 
  IS 'Операторы параллельного выполнения';
COMMENT ON COLUMN public.pg_parallel_task_statements.sql_statement 
  IS 'Полный текст выполняемого запроса';
COMMENT ON COLUMN public.pg_parallel_task_statements.status 
  IS 'Статус обработки текущего оператора. Один из new|in progress|ok|error';
COMMENT ON COLUMN public.pg_parallel_task_statements.start_time 
  IS 'Время начала выполнения текущего оператора';
COMMENT ON COLUMN public.pg_parallel_task_statements.elapsed_sec 
  IS 'Для выполненных операторов, затраченное время в секундах';
COMMENT ON COLUMN public.pg_parallel_task_statements.rows_affected 
  IS 'Для выполненных операторов, количество затронутных строк';
COMMENT ON COLUMN public.pg_parallel_task_statements.err 
  IS 'Для выполненных операторов, текст ошибки. NULL, если выполнение успешно.';


-- Основной скрипт
INSERT INTO PUBLIC.pg_parallel_task (NAME, threads_count) 
VALUES ('JIRA-001', 10);

INSERT INTO PUBLIC.pg_parallel_task_statements (task_name, sql_statement) 
SELECT 
  'JIRA-001' task_name, 
  FORMAT(
    'UPDATE FOO SET level = 42 where id >= ''%s'' and id <= ''%s''', 
    MIN(d.id), 
    MAX(d.id)
  ) sql_statement 
FROM 
  (
    SELECT 
      id, 
      NTILE(10) OVER (
        ORDER BY 
          id
      ) part 
    FROM 
      foo
  ) d 
GROUP BY 
  d.part;
-- Конец основного скрипта

При деплое происходит вызов приложения на Go, которое считывает конфигурацию задачи и скрипты по этой задаче из вспомогательных таблиц и автоматически запускает скрипты с заданным числом параллельных процессов (worker’ов). После выполнения управление передается обратно в Liquibase.

Код
<changeSet id="JIRA-001" author="soldatov">
    <executeCommand os="Linux, Mac OS X" executable="./pgpar.sh">
        <arg value="testdatabase"/><arg value="JIRA-001"/>
    </executeCommand>
</changeSet>

Приложение состоит из трех основных абстракций:

  • task – загружает в память параметры миграции, количество процессов и все диапазоны, запускает “многопоточку” и поднимает Web–сервер для отслеживания прогресса выполнения;
  • statement – представляет собой один диапазон выполняемой операции, также отвечает за изменение статуса выполнения диапазона, запись времени выполнения диапазона, количество строк в диапазоне и т.д.;
  • worker – представляет собой один поток выполнения.

В методе task.do создается канал, в который отправляются все statements операции. На этом канале запускается указанное число worker’ов. Внутри worker’ов бесконечный цикл, он мультиплексирует на двух каналах: по которому получает statements и выполняет их, и пустой канал как сигнализатор? что надо завершиться. Как только пустой канал будет закрыт, worker завершит работу – это случается при ошибке в одном из worker’ов. Т.к. каналы в Go это thread–safe структура, то закрытием одного канала мы можем отменить все worker’ы разом. Когда statement в канале закончится, worker просто выйдет из цикла, и уменьшит общий для всех worker'ов счетчик. Так как task всегда знает, сколько worker’ов по нему работает, он просто ждет, когда этот счетчик обнулится и после этого завершается сам.

Плюшки




За счет такой реализации «многопоточки» появилось несколько интересных фич:

  • Интеграция с Liquibase (вызываем с помощью тега executeCommand).
  • простой веб–интерфейс, который появляется при запуске “многопоточки” и содержит всю информацию о ходе ее выполнения.
  • Прогресс–бар (мы знаем, сколько обрабатывается один диапазон, сколько запущено параллельных процессов и сколько диапазонов еще осталось обработать – значит можем подсчитать время завершения).
  • Динамическое изменение параллельных процессов (пока это мы делаем руками, но в дальнейшем хотим автоматизировать).
  • Логирование информации по ходу выполнения многопоточных скриптов для возможности дальнейшего анализа.
  • Можно выполнять блокирующие операции типа update, почти ничего не блокируя (если разбить табличку на очень маленькие диапазоны, все скрипты будут выполняться почти мгновенно).
  • Есть обертка для вызова “многопоточки” прямо из БД.

Не плюшки


Основным недостатком является необходимость один раз пройти фулсканом по табличке для разбиения ее на диапазоны, если в качестве ключа используется текстовое поле, дата или uid. Если ключом для разбиения выбрано поле с последовательно увеличивающимися плотными значениями, то такой проблемы нет (мы заранее можем указать все диапазоны, просто задав требуемый шаг).

Ускоряемся в семь раз (тест на pgbench таблице)



Напоследок приведу пример сравнения по скорости выполнения операции UPDATE 500 000 000 строк без использования «многопоточки» и с ней. Простой UPDATE выполнялся 49 минут, тогда как «многопоточка» завершилась за семь минут.

Пример кода
SELECT count(1) FROM pgbench_accounts;
 count
-------
500000000
(1 row)

SELECT pg_size_pretty(pg_total_relation_size('pgbench_accounts'));
pg_size_pretty
----------------
 62 Gb
(1 row)

UPDATE pgbench_accounts
SET abalance = 42;
-- Время выполнения 49 минут

vacuum full analyze verbose pgbench_accounts;

INSERT INTO public.pg_parallel_tASk (name, threads_count) values ('JIRA-002', 25);

INSERT INTO public.pg_parallel_tASk_statements (tASk_name, sql_statement)
SELECT 'JIRA-002' tASk_name,
  FORMAT('UPDATE pgbench_accounts
          SET abalance = 42
          WHERE aid >= ''%s'' AND aid <= ''%s'';',
  MIN(d.aid), MAX(d.aid)) sql_statement
FROM (SELECT aid, ntile(25) over (order by aid) part
      FROM pgbench_accounts) d
GROUP BY d.part;
-- Время выполнения 10 минут

-- Можно дробить по ctid, но получится неравномерно и нужно чтобы эту таблицу никто не изменял в процесе многопоточки
INSERT INTO public.pg_parallel_tASk_statements (tASk_name, sql_statement)
SELECT 'JIRA-002-ctid' tASk_name,
  FORMAT('UPDATE pgbench_accounts
          SET abalance = 45
          WHERE (ctid::text::point)[0]::text > ''%s'' AND (ctid::text::point)[0]::text <= ''%s'';',
  (d.min_ctid), (d.max_ctid)) sql_statement
FROM (
  WITH max_ctid AS (
    SELECT MAX((ctid::text::point)[0]::int) FROM pgbench_accounts)
  SELECT generate_series - (SELECT max / 25 FROM max_ctid) AS min_ctid, generate_series AS max_ctid
  FROM generate_series((SELECT max / 25 FROM max_ctid), (SELECT max FROM max_ctid), (SELECT max / 25 FROM max_ctid))) d;
-- Время выполнения 9 мин

 ./pgpar-linux-amd64 jdbc:postgresql://localhost:5432 soldatov password testdatabase JIRA-002
 -- Время выполнения 7 минут


P.S. Вам это надо, если:



Все инструменты хороши для определенных задач, и вот несколько таких для «многопоточки».

  • UPDATE таблиц > 100 000 строк.
  • UPDATE со сложной логикой, которую можно распараллелить (например, вызов функций для вычисления чего-либо).
  • UPDATE без локов. За счет дробления на очень маленькие диапазоны и запуска небольшого числа процессов можно добиться мгновенной обработки каждого диапазона. Таким образом, блокировка тоже будет почти мгновенной.
  • Параллельное выполнение changeSet’ов в Liquibase (например, VACUUM).
  • Создание и заполнение данными новых полей в таблице.
  • Сложные отчеты.

Почти неблокирующий UPDATE (50 000 диапазонов по 10 000 строк каждый)
<changeSet author="soldatov" id="JIRA-002-01">
  <sql>
    <![CDATA[
    INSERT INTO public.pg_parallel_task (name, threads_count) 
    VALUES ('JIRA-002', 5);
    
    INSERT INTO public.pg_parallel_task_statements (task_name, sql_statement) 
    SELECT 
    'JIRA-002' task_name, 
    FORMAT(
      'UPDATE pgbench_accounts
                  SET abalance = 42
                  WHERE filler IS NULL
                    AND aid >= ''%s'' AND aid <= ''%s'';', 
      MIN(d.aid), 
      MAX(d.aid)
    ) sql_statement 
    FROM 
    (
      SELECT 
        aid, 
        ntile(10000) over (
          order by 
            aid
        ) part 
      FROM 
        pgbench_accounts 
      WHERE 
        filler IS NULL
    ) d 
    GROUP BY 
    d.part;
    ]]>
  </sql>
</changeSet>

<changeSet author="soldatov" id="JIRA-002-02">
	<executeCommand os="Linux, Mac OS X" executable="./pgpar.sh">
			<arg value="pgconfdb"/><arg value="JIRA-002"/>
	</executeCommand>
</changeSet>



Параллельные changeSet’ы в Liquibase
<changeSet author="soldatov" id="JIRA-003-01">
    <sql>
        <![CDATA[
        INSERT INTO pg_parallel_task (name, threads_count) 
        VALUES ('JIRA-003', 2);
        
        INSERT INTO pg_parallel_task_statements (task_name, sql_statement) 
        SELECT 
          'JIRA-003' task_name, 
          'VACUUM FULL ANALYZE pgbench_accounts;' sql_statement;
          
        INSERT INTO pg_parallel_task_statements (task_name, sql_statement) 
        SELECT 
          'JIRA-003' task_name, 
          'VACUUM FULL ANALYZE pgbench_branches;' sql_statement;

        ]]>
    </sql>
</changeSet>

<changeSet author="soldatov" id="JIRA-003-02">
    <executeCommand os="Linux, Mac OS X" executable="./pgpar.sh">
        <arg value="testdatabase"/><arg value="JIRA-003"/>
    </executeCommand>
</changeSet>


Почти неблокирующее заполнение нового поля таблицы данными (50 000 диапазонов по 10 000 строк каждый) с вызовом «многопоточки» функцией из БД
-- SQL part
ALTER TABLE pgbench_accounts ADD COLUMN account_number text;

INSERT INTO public.pg_parallel_task (name, threads_count) VALUES ('JIRA-004', 5);

INSERT INTO public.pg_parallel_task_statements (task_name, sql_statement)
SELECT 'JIRA-004' task_name,
        FORMAT('UPDATE pgbench_accounts
                SET account_number = aid::text || filler
                WHERE aid >= ''%s'' AND aid <= ''%s'';',
        MIN(d.aid), MAX(d.aid)) sql_statement
FROM (SELECT aid,
      ntile(50000) over (order by device_version_guid) part
      FROM pgbench_accounts) d
GROUP BY d.part;

SELECT * FROM func_run_parallel_task('testdatabase','JIRA-004');


Кстати, у нас есть вакансия
Tags:
Hubs:
+51
Comments 13
Comments Comments 13

Articles

Information

Website
lanit.ru
Registered
Founded
Employees
over 10,000 employees
Location
Россия
Representative
katjevl