Pull to refresh
0
ex-Wargaming
Издатель и разработчик free-to-play MMO

Impala для Python-разработчика на примере определения фрода при анализе трафика в маркетинговой платформе

Reading time9 min
Views3K

Всем привет.

Как известно, есть множество различных систем хранения. Большинство из них рассчитаны на определенный объем данных. Если данных больше, то система хранения начинает вести себя непредсказуемо. Этих проблем лишены системы на базе Hadoop, основанные на файловой системе HDFS. Подобные не слишком часто используются в веб-разработке, но незаменимы для анализа данных и построения отчетов.

Если нужно предоставить пользователям (менеджерам компании) аналитику за несколько лет относительно кликов пользователей на рекламных объявлениях, измеряющихся в сотнях миллионов в месяц, отговорка в духе “нуу... наш postgres не умеет нормально работать с такими объемами” не работает. Без правильной системы хранения тут не обойтись. Можно было использовать, в том числе, Clickhouse, Snowflake и т.д., но в компании уже была инфраструктура и готовые решения на Impala и Hive. Первая быстрее и работает с памятью. Второй – медленнее и работает с диском. Никто не любит ждать, поэтому была выбрана Impala.

Недостатков подобных систем не избежать:

  • Доступ только на добавление данных и чтение. Никаких update, delete, etc (за исключением Kudu, о чем речь чуть ниже).

  • Гигабайты… нет, терабайты памяти.

  • Как в python нету ORM для Impala? Какое еще ODBC?

  • Запросы к таблицам из 1000 и 1 000 000 строк занимают сравнимое время.

  • Никаких индексов и прочих способов оптимизации производительности.

  • Тормозит? То, что время ответа измеряется в секундах и десятках секунд, хорошо для построения аналитических отчетов, но так себе для получения информации в real-time’е.

  • Чтобы хватало памяти, необходимо партиционировать данные по какому-то параметру, благо что всегда есть дата. Перезапись данных так же возможна или в рамках всей таблицы, или в рамках партиции.

В нашем распоряжении оказался кластер из 30 нод с ограничениями в 90 ГБ памяти на ноде и 1 ТБ суммарно. Для работы с данными существует хороший инструмент – Hue ("Хью") – довольно удобный веб-интерфейс для запросов.

Работа с Impala для python-разработчика сильно отличается от работы с традиционными БД. Драйверов всего 2 – impyla и cloudera odbc. С первым когда-то давно был плохой опыт, плюс у него куча открытых issue на гитхабе. Решено было использовать второй. Для этого нужно качать сам коннектор с сайта, настраивать его, и затем использовать pyodbc. Т.к. наше приложение асинхронное, вместо pyodbc у нас aioodbc – обертка вокруг pyodbc.

def _convert_timestamp(value):
    unpacked = struct.unpack('6Hxxxx', value)
    if unpacked == (1970, 1, 1, 0, 0, 0):
        return None
    return datetime(*unpacked)
 
 
class Impala:
    def __init__(self):
        self.dsn = (
            f'DRIVER={{{cfg.impala_driver}}};'
            f'HOST={{{cfg.impala_host}}};'
            f'PORT={{{cfg.impala_port}}};'
            f'SCHEMA={{{cfg.impala_default_schema}}};'
            f'UID={{{cfg.impala_uid}}};'
            f'PWD={{{cfg.impala_pwd}}};'
            f'AUTHMECH=3;'
            f'USESASL=1;'
            f'SSL=0;'
        )
 
        self.pool: Optional[Pool] = None
 
    async def connect(self):
        self.pool: Pool = await create_pool(
            minsize=cfg.impala_min_poolsize,
            maxsize=cfg.impala_max_poolsize,
            dsn=self.dsn,
            after_created=self.after_created,
            autocommit=True,
            pool_recycle=55,
        )
 
        async with self.pool.acquire() as connection:
            async with connection.cursor() as cursor:
                cursor: Cursor
 
                result = await cursor.execute('select 1')
                await result.fetchone()
 
    async def disconnect(self):
        if self.pool is not None:
            self.pool.close()
            await self.pool.wait_closed()
 
    @staticmethod
    async def after_created(connection):
        # Driver return zero date values incorrectly
        connection.add_output_converter(SQL_TYPE_TIMESTAMP, _convert_timestamp)
 
        connection.setdecoding(SQL_CHAR, encoding='utf-8')
        connection.setdecoding(SQL_WCHAR, encoding='utf-8')
        connection.setdecoding(SQL_WMETADATA, encoding='utf-16')
        connection.setencoding(encoding='utf-8')
 
    async def _run_with_retries(
        self,
        query,
        *params,
        retries=0,
        max_retries=cfg.impala_max_retries,
    ):
        try:
            async with self.pool.acquire() as connection:
                async with connection.cursor() as cursor:
                    result = await cursor.execute(query, *params)
                    return await getattr(result, 'execute').__call__()
        except Error as e:
            if retries < max_retries:
                retries += 1
                return await self._run_with_retries(
                    query,
                    *params,
                    retries=retries,
                    max_retries=max_retries,
                )
            raise
 
    async def execute(self, query, *params):
        return await self._run_with_retries(query, *params)
 
    @staticmethod
    def to_python(data: Union[List[Row], Row]) -> Union[List[dict], dict]:
        if isinstance(data, List):
            return list(map(Impala.to_python, data))
 
        return {info[0]: data[i] for i, info in enumerate(data.cursor_description)}

Связка из самой Impala и ее драйвера по стабильности сильно уступает обычным БД, поэтому retry-и – наше все. Только после этого всё заработало более-менее корректно.

Про ОРМ говорить не будем – его нет. Может, это и к лучшему, т.к. сложные запросы в ОРМ выглядят очень нечитаемо, писать raw-sql гораздо приятнее, и код лучше читается. Но отсюда следует и другой недостаток – отсутствие инструментария для миграций схемы таблиц. Пока что мы меняем таблицы вручную, но планируем написать свой маленький инструмент (или кто-то подскажет в комментариях уже готовое решение?)

SELECT-запросы к Impala немного отличаются от запросов к обычным реляционным БД:

  • Любой запрос делает full scan, т.к. индексов нету, и выгружает запрашиваемые данные в память. Поэтому желательно всегда использовать партиции и фильтры по ним. По сути, одна партиция – один каталог с данными на диске. Меньше партиций выгружено – меньше памяти использовано.

  • Есть несколько форматов хранения данных в Impala, каждый со своими преимуществами и недостатками. Т.к. у нас таблицы с большим количеством колонок, одинаковых для всех записей, мы используем Parquet – он позволяет читать только необходимые колонки и экономить память. Подробнее про Parquet можно почитать тут: https://www.bigdataschool.ru/blog/apache-parquet-avro-spark-big-data.html

  • При добавлении или изменений данных в таблицах Impala перекладывает данные на диске, но при этом, чтобы она впоследствии знала, где и какие данные лежат, нужно обязательно собирать статистику (COMPUTE STATS), иначе анализатор будет работать очень медленно.

Иногда без обновления существующих данных либо их удаления (привет, GDPR) не обойтись. Impala в базовом варианте так не умеет, кроме перезаписи партиции целиком. Тут нам поможет KUDU. Это движок хранения данных, который совмещает в себе возможность перезаписи данных по primary key и удаления отдельных строк, а также быстрый поиск по столбцам. Для использования KUDU обязательно нужно указать primary key (одно или несколько полей), по которым можно будет делать DELETE и UPSERT, а сами данные должны иметь четкую структуру – жестко заданы поля в таблице и их типы. При этом, KUDU не поддерживает некоторые типы полей (CHAR, DATE, etc.), и, как оказалось на практике, не очень хорошо работает с большими объемами данных. Мы решили мириться со всеми недостатками, чтобы использовать его возможности. Что важно: делать запросы к KUDU можно напрямую из Impala. После этого возможности Impala уже приближаются к возможностям реляционных БД, и пользоваться ими гораздо удобнее.

Имея эти возможности, попробуем реализовать следующее. Компания платит за рекламу поставщикам трафика (Google, Yandex и др.). Есть несколько моделей оплаты: за клики пользователей по баннеру, за регистрации пользователей, за показы баннеров и т.д. Естественно, всякие недоброжелатели могут делать (и делают) накрутки событий, иначе говоря, фрод. Наша задача – определить такие накрученные события и не платить за них.

Основной датасет, click_event – данные по рекламным кликам и событиям, к которым привели эти клики (регистрации, логины в игру, платежи и т.д.).

Есть много правил, определяющих фродовые события, у нас их сейчас несколько десятков. Общая архитектура фичи такова: раз в сутки собирается датасет с фродом для каждого правила. Затем запускается запрос, который аггрегирует всех фродовых пользователей в один датасет. И потом в отдельный датасет складываются все события всех фродовых пользователей – это и есть нужные данные для анализа.

Пример правила определения фрода: если несколько пользователей зарегистрировались с одного IP-адреса, то это фрод.

UPSERT INTO {self.model.get_tablename()} (
    id,
    rule_run_id,
    player_id,
    event_id,
    ip_hash,
    regs_cnt,
    analysed_start_day,
    analysed_end_day,
    created_at
)
WITH same_ip_regs AS (
    SELECT
        CE.campaign_id,
        CE.campaign_sub_id,
        CE.player_id,
        CE.event_id,
        CE.ip_hash ip_address,
        count(CE.event_id) OVER (PARTITION BY campaign_id, campaign_sub_id, CE.ip_hash) cnt
    FROM click_event CE
    WHERE
        TO_DATE(CE.event_dt) = '{self.config.process_from}' AND
        CE.event_name='registration' AND
        CE.ip_hash != 'unknown'
)
SELECT UUID() id, '{rule_run_id}', player_id, event_id, ip_address, cnt, '{self.config.process_from}', '{self.config.process_until}', now() created_at
FROM same_ip_regs
WHERE cnt>1

Далее аггрегируем всех найденных отдельными правилами фрод-пользователей:

def generate_rules_select_sql(self):
    rules_select_sql = []

    for rule in all_rules:
        rule_sql = f'''
            SELECT player_id, '{rule["slug"]}' rule_slug, rule_run_id
            FROM {rule["tablename"]}
            WHERE created_at >= date_add(now(), -{self.config.process_lookback})
            GROUP BY player_id, rule_slug, rule_run_id
        '''
        rules_select_sql.append(rule_sql)

    return ' UNION ALL '.join(rules_select_sql)
 
async def run_etl(self):
    rules_data_sql = self.generate_rules_select_sql()

    insert_sql = f'''
        INSERT INTO fraud_user
        (
            id,
            player_id,
            rule_slug,
            rule_run_id,
            created_at,
            load_day
        )
        WITH all_users AS (
            {rules_data_sql}
        )
        SELECT UUID() id, au.player_id, au.rule_slug, au.rule_run_id,
            now() created_at, to_date(now()) load_day
        FROM all_users au
        LEFT ANTI JOIN fraud_user fu
            USING(rule_run_id)
    '''

    await impala.execute(insert_sql)

И, наконец, складываем все фродовые события:

INSERT INTO fraud_event
(
    id,
    event_id,
    player_id,
    rule_slug,
    rule_run_id,
    created_at,
    load_day
)
SELECT UUID() id,
    fn.event_id,
    fn.player_id,
    fu.rule_slug,
    fu.rule_run_id,
    now() created_at,
    to_date(now()) load_day
FROM fraud_user fu
INNER JOIN raw_event fn
    ON fn.player_id = fu.player_id
LEFT ANTI JOIN fraud_event fe
    ON fe.event_id = fn.event_id AND fe.rule_run_id = fu.rule_run_id
WHERE fn.dt>= date_add(to_date(now()), -{self.config.process_lookback_for_events})
    AND fu.load_day>=date_add(to_date(now()), -{self.config.process_lookback_for_users})

Итого: проанализировав сотни тысяч событий за последние несколько дней, за несколько минут получили список фродовых событий, за которые не стоит платить поставщикам трафика. Часто эти данные анализируются вручную: аналитик смотрит, какие фрод-события найдены, какие их параметры, и все ли отработало так, как ожидалось. Если что-то не так – корректирует настройки. Impala позволяет отобразить данные в каких угодно разрезах, что позволяет сделать конструктор запросов. Пример получения списка событий с разбивкой по кампаниям и странам:

WITH
// Список фрод-эвентов, отфильтрованный по нужным правилам
filtered_fraud_event AS (
    SELECT * FROM fraud_event
    WHERE fraud_event.rule_slug IN (<list_of_required_rules>)
),
// Список фрод-эвентов, отфильтрованный по нужным правилам, без дублей
fraud_events_wo_dups AS (
    SELECT * FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY event_id ORDER BY event_id) AS rn FROM filtered_fraud_event
    ) AS t
    WHERE t.rn=1
),
// Список фрод-эвентов, отфильтрованный по нужным правилам, без дублей в рамках каждого правила
fraud_events_wo_dups_by_rules AS (
    SELECT * FROM (
        SELECT *, ROW_NUMBER() OVER (PARTITION BY event_id, rule_slug ORDER BY event_id) AS rn FROM filtered_fraud_event
    ) AS t
    WHERE t.rn=1
),
// Список событий с кампаниями, отфильтрованный по датам, эвентам и кампаниям
augmented_click_event AS (
    SELECT campaign.full_name, click_event.country, click_event.event_id, click_event.event_name
    FROM click_event
    LEFT JOIN campaign ON campaign.id=click_event.campaign_id
    WHERE 
        click_event.event_dt <= '{self.config.report_to}' 
        AND click_event.event_dt >= '{self.config.report_from}' 
        AND click_event.event_name IN ('registration') 
        AND click_event.campaign_id IN (<list_of_required_campaigns>)
)
 
// Считаются нужные метрики. Записи с layer=1 - сами эвенты, по типам. С layer=2 - фродовые эвенты по типам правил. С layer=3 - фродовые эвенты по типам эвентов.
// В следующем запросе отдельно выбираются данные по каждому из layer-ов и объединяются в один датасет для удобства оперирования.
SELECT
    ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 1 THEN raw_data.event_count ELSE 0 END)) AS 'event_count',
    ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 1 and raw_data.event_name='registration' THEN raw_data.event_count ELSE 0 END)) AS 'event_count.registration',
    ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 3 THEN raw_data.fraud_event_count ELSE 0 END)) AS 'fraud_event_count',
    ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 3 and raw_data.event_name='registration' THEN raw_data.fraud_event_count ELSE 0 END)) AS 'fraud_event_count.registration',
    ZEROIFNULL(SUM(CASE WHEN raw_data.layer = 2 and raw_data.rule_slug='<rule_name>' THEN raw_data.fraud_event_count ELSE 0 END)) AS 'fraud_rule_count.<rule_name>',
    ...<the_same_block_for_each_fraud_rule>...
 
    raw_data.country, raw_data.full_name
FROM (
    SELECT 
        augmented_click_event.country, 
        augmented_click_event.event_name,
        augmented_click_event.full_name,
        null AS rule_slug,
        COUNT(augmented_click_event.event_id) AS event_count,
        0 AS fraud_event_count,
        1 AS layer
    FROM augmented_click_event
    GROUP BY augmented_click_event.country, augmented_click_event.event_name, augmented_click_event.full_name
 
    UNION ALL
 
    SELECT 
        augmented_click_event.country, 
        augmented_click_event.event_name, 
        augmented_click_event.full_name,
        null AS rule_slug,
        0 AS event_count,
        count(fraud_events_wo_dups.event_id) AS fraud_event_count,
        3 AS layer
    FROM augmented_click_event
    LEFT JOIN fraud_events_wo_dups
    ON fraud_events_wo_dups.event_id=augmented_click_event.event_id
    WHERE fraud_events_wo_dups.event_id IS NOT NULL
    GROUP BY augmented_click_event.country, augmented_click_event.event_name, augmented_click_event.full_name
 
    UNION ALL
 
    SELECT 
        augmented_click_event.country, 
        augmented_click_event.event_name, 
        augmented_click_event.full_name,
        fraud_events_wo_dups_by_rules.rule_slug AS rule_slug,
        0 AS event_count,
        count(fraud_events_wo_dups_by_rules.event_id) AS fraud_event_count,
        2 AS layer
    FROM augmented_click_event
    LEFT JOIN fraud_events_wo_dups_by_rules
    ON fraud_events_wo_dups_by_rules.event_id=augmented_click_event.event_id
    WHERE 
        fraud_events_wo_dups_by_rules.event_id IS NOT NULL 
        AND fraud_events_wo_dups_by_rules.rule_slug IN (<list_of_required_rules>)
    GROUP BY 
        augmented_click_event.country, 
        augmented_click_event.event_name, 
        augmented_click_event.full_name, 
        fraud_events_wo_dups_by_rules.rule_slug
) AS raw_data
 
GROUP BY raw_data.country, raw_data.full_name
ORDER BY raw_data.country, raw_data.full_name

В результате получаем красивую табличку, понятную менеджеру:

Tags:
Hubs:
+1
Comments2

Articles

Information

Website
lesta.ru
Registered
Founded
Employees
501–1,000 employees
Location
Россия