Обновить

AWS Athena: GEOIP lookups

Amazon Web ServicesBig Data
Tutorial
Автор оригинала: Denis Gladkikh (outcoldman)

PS. Это перевод моей статьи на английском. Давно я не писал на Хабре. Сразу прощу прощения, много на русском не пишу. Не скажу что у меня и английский шикарный. Но к сожалению проживание за рубежом ухудшает мой русский и медленно развивает английский.

Если вы пользуетесь AWS Athena для анализа логов, то часто хочется найти источник IP адресов. К сожалению AWS Athena не предоставляет этого из коробки. К счастью MaxMind предоставляет базы данных GeoIP таблиц, которые позволяют вычислить местоположение по IP адресам. Есть платная и бесплатная версия.

В этой статье я покажу как создать AWS Lambda функцию, которая каждую неделю будет скачивать последнюю базу данных с MaxMind на S3. Эту базу данных можно использовать в AWS Athena для написания SQL запросов для анализа, например, веб логов.

Создание аккаунта на MaxMind

Для скачивания даже бесплатных баз данных GeoLite 2 с MaxMind вам нужно будет создать аккаунт. После создания аккаунта, в Services можно сгенерировать Service Key. Сохраните его. Мы будем использовать формат GeoLite2-City-CSV.

При помощи Service Key мы можем попробовать скачать базу данных при помощи curl

curl -o GeoLite2-City-CSV.zip \
  'https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-City-CSV&license_key={{YOUR_LICENSE_KEY}}&suffix=zip'

Последние инструкции по скачиванию GeoIP баз данных можно найти тут.

AWS Lambda функция для обновления GeoIP базы данных на S3

Для собственного проекта я создал S3 Bucket s3://app.loshadki.data, где я планирую разместить базу данных GeoIP. Размещать две таблицы я буду по путям

  • s3://app.loshadki.datadata/geoip_blocks/data.csv.gz - база IP масок и их GEO положение

  • s3://app.loshadki.datadata/geoip_locations/data.csv.gz - расшифровка GEO в адреса (Страны, Города).

Создайте новую Lambda функцию, я назвал свою GeoIP-Table-Update, и использую python:3.8.

Я использую Environment Variables для некоторых настроек функции:

  • MAXMIND_GEOIP_LICENSE - ваш Service Key от MaxMind.

  • S3_BUCKET_NAME - S3 Bucket, куда собираетесь сохранять базу данных (я использую app.loshadki.data).

  • S3_BUCKET_PREFIX - префикс для баз данных, я использую data

Эта функцию может выполняться долгое время. Выставите Timeout в 5 минут. Я так же изменил Memory в 256MB, так как с большей памятью выделяется так же больше CPU, и я часто замечал что дешевле выполнять функции с более быстрым CPU, даже если не используется вся память. Мы будем запускать эту функцию раз в неделю, так что большой разницы не будет, сколько мы будем платить.

Для автоматического выполнение этой функции раз в неделю создайте trigger. Выберете EventBridge (Cloud Watch Events), создайте новое правило upload-geoip-to-s3-weekly с правилом rate(7 days).

Для того, чтобы AWS Lambda функция могла писать на S3, мы так же должны обновить разрешения, я обычно просто обновляю Role созданную автоматически. Добавьте разрешение

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "s3:PutObject",
      "Resource": "arn:aws:s3:::app.loshadki.data/data/*"
    }
  ]
}

Исходный код функции

Исходный код функции нижу. Вы можете просто скопировать и вставить его в редактор, сделать Deploy и запустить. Функцию будет работать несколько минут. По окончании, если все настройки верны, вы должны увидеть файлы на S3.

import os
import os.path
import urllib.request
import shutil
import zipfile
import tempfile
import gzip
import boto3

def lambda_handler(event, context):
    with tempfile.TemporaryDirectory() as tmpdirname:
        zipfilename = os.path.join(tmpdirname, 'GeoLite2-City-CSV.zip')

        print('step 1 - download geolite ip database')
        download_geo_ip(tmpdirname, zipfilename)
        print('step 2 - unzip all files')
        unzip_all(tmpdirname, zipfilename)
        print('step 3 - gzip files')
        gzip_files(tmpdirname)
        print('step 4 - upload to s3')
        upload_to_s3(tmpdirname)

    return

def download_geo_ip(tmpdirname, zipfilename):
    geoip_url = 'https://download.maxmind.com/app/geoip_download?edition_id=GeoLite2-City-CSV&license_key={}&suffix=zip'.
        format(os.getenv('MAXMIND_GEOIP_LICENSE'))

    with urllib.request.urlopen(geoip_url) as response, open(zipfilename, 'wb') as output:
        shutil.copyfileobj(response, output)


def unzip_all(tmpdirname, zipfilename):
    # unzip all, but without the directories, to easily find the files
    with zipfile.ZipFile(zipfilename, 'r') as z:
        for member in z.namelist():
            filename = os.path.basename(member)

            # if a directory, skip
            if not filename:
                continue

            # copy file (taken from zipfile's extract)
            with z.open(member) as zobj:
                with open(os.path.join(tmpdirname, filename), "wb") as targetobj:
                    shutil.copyfileobj(zobj, targetobj)


def gzip_files(tmpdirname):
    for filename in ['GeoLite2-City-Blocks-IPv4.csv', 'GeoLite2-City-Locations-en.csv']:
        file_path = os.path.join(tmpdirname, filename)
        with open(file_path, 'rb') as f_in,
                gzip.open(file_path + '.gz', 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)


def upload_to_s3(tmpdirname):
    s3_bucket_name = os.getenv('S3_BUCKET_NAME')
    s3_bucket_prefix = os.getenv('S3_BUCKET_PREFIX')

    s3_client = boto3.client('s3')
    s3_client.upload_file(
        os.path.join(tmpdirname, 'GeoLite2-City-Blocks-IPv4.csv.gz'),
        s3_bucket_name,
        os.path.join(s3_bucket_prefix, 'geoip_blocks/data.csv.gz')
    )
    s3_client.upload_file(
        os.path.join(tmpdirname, 'GeoLite2-City-Locations-en.csv.gz'),
        s3_bucket_name,
        os.path.join(s3_bucket_prefix, 'geoip_locations/data.csv.gz')
    )

Создание таблиц AWS Athena

Теперь мы можем создать таблицы в AWS Athena на базе CSV файлов, которые мы только что скопировали на S3.

Первая таблица это IP адреса (не забудьте поменять расположение на S3, где вы храните CSV файлы)

CREATE EXTERNAL TABLE IF NOT EXISTS default.geoip_blocks (
  network STRING,
  geoname_id INT,
  registered_country_geoname_id INT,
  represented_country_geoname_id INT,
  is_anonymous_proxy INT,
  is_satellite_provider INT,
  postal_code STRING,
  latitude DOUBLE,
  longitude DOUBLE,
  accuracy_radius INT
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
LOCATION 's3://app.loshadki.data/data/geoip_blocks/'
TBLPROPERTIES ('skip.header.line.count'='1');

Вторая таблицы это расшифровка к странам и городам (опять не забудьте поменять S3 путь)

CREATE EXTERNAL TABLE IF NOT EXISTS default.geoip_locations (
  geoname_id INT,
  locale_code STRING,
  continent_code STRING,
  continent_name STRING,
  country_iso_code STRING,
  country_name STRING,
  subdivision_1_iso_code STRING,
  subdivision_1_name STRING,
  subdivision_2_iso_code STRING,
  subdivision_2_name STRING,
  city_name STRING,
  metro_code STRING,
  time_zone STRING,
  is_in_european_union INT
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
WITH SERDEPROPERTIES (
   'separatorChar' = ',',
   'quoteChar' = '\"',
   'escapeChar' = '\\'
)
LOCATION 's3://app.loshadki.data/data/geoip_locations/'
TBLPROPERTIES ('skip.header.line.count'='1');

Мы можем попробовать выполнить простой SQL запрос

select * 
from  default.geoip_blocks t1
  inner join default.geoip_locations t2 on t1.geoname_id = t2.geoname_id
limit 10

Использование таблиц для поиска адреса по IP адресу (CIDR lookup)

Таблица geoip_blocks определяет адреса блоками в CIDR кодировке, например 1.0.0.0/24, что определяет все адреса от 1.0.0.0 до 1.0.0.255. Текущая версия Presto поддерживает функции для проверки если IP адрес подходит для CIDR кодировки. Но к сожалению AWS Athena (даже версия 2) до сих пор не поддерживает эти функции, так как использует Presto 0.217. Но мы можем найти другой способ проверки.

Один из способов это преобразование IP адресов в Integer, чтобы была возможность выполнять запросы вроде ip_start <= ip_address <= ip_end. Преобразовать IP адрес в Integer очень легко, формула простая ipv4[1]*256*256*256 + ipv4[2]*256*256 + ipv4[3]*256 + ipv4[4]. Ну и маску /24 просто нужно преобразовать в последний IP адрес диапазона.

Самый простой вариант это просто создать View на базе таблицы geoip_blocks

CREATE OR REPLACE VIEW geoip_blocks_int AS
select
        cast(ip[1] as BIGINT)*256*256*256 + cast(ip[2] as BIGINT)*256*256 + cast(ip[3] as BIGINT)*256 + cast(ip[4] as BIGINT) as ip_start,
        (
            bitwise_or(cast(ip[1] as BIGINT), bitwise_and(255, cast(power(2, greatest(8 - range, 0)) as BIGINT)-1))
            )*256*256*256 +
        (
            bitwise_or(cast(ip[2] as BIGINT), bitwise_and(255, cast(power(2, greatest(16 - range, 0)) as BIGINT)-1))
            )*256*256 +
        (
            bitwise_or(cast(ip[3] as BIGINT), bitwise_and(255, cast(power(2, greatest(24 - range, 0)) as BIGINT)-1))
            )*256+
        (
            bitwise_or(cast(ip[4] as BIGINT), bitwise_and(255, cast(power(2, greatest(32 - range, 0)) as BIGINT)-1))
            ) as ip_end,
        network,
        geoname_id,
        registered_country_geoname_id,
        represented_country_geoname_id,
        cast(is_anonymous_proxy as BOOLEAN) as is_anonymous_proxy,
        cast(is_satellite_provider as BOOLEAN) as is_satellite_provider,
        postal_code,
        latitude,
        longitude,
        accuracy_radius
from
    (
        select
            network,
            geoname_id,
            registered_country_geoname_id,
            represented_country_geoname_id,
            is_anonymous_proxy,
            is_satellite_provider,
            postal_code,
            latitude,
            longitude,
            accuracy_radius,
            split(network_array[1], '.') as ip,
            cast(network_array[2] as BIGINT) as range
        from
            (
                select
                    network,
                    geoname_id,
                    registered_country_geoname_id,
                    represented_country_geoname_id,
                    is_anonymous_proxy,
                    is_satellite_provider,
                    postal_code,
                    latitude,
                    longitude,
                    accuracy_radius,
                    split(network, '/') as network_array
                from default.geoip_blocks
            )
    )

Пробуем результаты

Например мы можем попробовать найти местоположение IP адреса 1.1.1.1. Нам нужно только преобразовать его в Integer опять.

with ips as (
    select
        (
                cast(ip_array[1] as BIGINT)*256*256*256 +
                cast(ip_array[2] as BIGINT)*256*256 +
                cast(ip_array[3] as BIGINT)*256 +
                cast(ip_array[4] as BIGINT)
            ) as ip_int,
        ip
    from (
             select
                 '1.1.1.1' as ip,
                 split('1.1.1.1', '.') as ip_array
         ) as source
)
select
    ips.ip,
    locations.continent_name,
    locations.country_name,
    locations.city_name,
    locations.time_zone
from
    ips as ips
        left join geoip_blocks_int as blocks on blocks.ip_start <= ips.ip_int and ips.ip_int <= blocks.ip_end
        left join geoip_locations as locations on blocks.geoname_id = locations.geoname_id

Ну и немного более сложный SQL запрос, если у вас хранятся логи от CloudFront, чтобы показать самые популярные страницы с группировкой по странам и городам.

with access_logs as (
  select
    uri,
    (
      cast(split(ip, '.')[1] as BIGINT)*256*256*256 + 
      cast(split(ip, '.')[2] as BIGINT)*256*256 + 
      cast(split(ip, '.')[3] as BIGINT)*256 + 
      cast(split(ip, '.')[4] as BIGINT)
    ) as ip_int
  from (
    select  uri,
      case xforwarded_for
        when '-' then request_ip
        else xforwarded_for
      end as ip
    from access_logs_yesterday
    where 
      sc_content_type = 'text/html' 
      and status = 200 
      and method = 'GET'
      and not regexp_like(url_decode(user_agent), '(bot|spider)')
  )
)
select
    count(*) as count,
    access_logs.uri as uri,
    locations.continent_name,
    locations.country_name,
    locations.city_name,
    locations.time_zone
from
    access_logs
    left join geoip_blocks_int as blocks on 
      blocks.ip_start <= access_logs.ip_int and access_logs.ip_int <= blocks.ip_end
    left join geoip_locations as locations on blocks.geoname_id = locations.geoname_id
group by 2, 3, 4, 5, 6
order by 1

Что дальше?

Вы можете использовать колонки postal_code или city_name вместе с country_name вместе с AWS QuickSight для создания отчетов. Я так же создал для себя CloudWatch Alert, если функция упадет больше 2х раз, чтобы знать если что-то сломалось.

Теги:awsaws athenacsvgeoipmaxmindsql
Хабы: Amazon Web Services Big Data
Рейтинг +3
Количество просмотров 458 Добавить в закладки 5
Комментарии
Комментировать

Похожие публикации

Java Architect
от 4 000 до 5 000 $Virtual HealthМожно удаленно
Devops Specialis | Системный инженер
от 120 000 ₽УМСКУЛМожно удаленно
DevOps инженер
от 110 000 до 200 000 ₽Очень ИнтересноКрасноярск
Senior Software Engineer (Go/Ruby)
до 350 000 ₽New.HRМоскваМожно удаленно
DevOPS Engineer
от 3 700 до 6 500 $Coins.phМожно удаленно

Лучшие публикации за сутки