21 июня 2019

Освобождаем руки нескольким аналитикам: API Livy для автоматизации типовых банковских задач

Блог компании ООО «Хоум Кредит Энд Финанс Банк»PythonBig DataМашинное обучениеHadoop
Привет, Хабр!

Не секрет, что для оценки платежеспособности клиентов банки используют данные из различных источников (кредитное бюро, мобильные операторы и т.д.). Количество внешних партнёров может достигать нескольких десятков, а аналитиков в нашей команде наберётся лишь несколько человек. Возникает задача оптимизации работы небольшой команды и передачи рутинных задач вычислительным системам.

Как данные попадают в банк, и как команда аналитиков следит за этим процессом, разберём в данной статье.



Начнём по порядку.

Нашу распределённую систему на основе Hadoop, и все процессы, связанные с ней, мы коротко называем SmartData. SmartData получает данные по API от внешних агентов. (Причём агентами для неё являются как внешние партнёры, так и внутренние системы банка). Безусловно, было бы полезно собирать некий «актуальный профиль» по каждому клиенту, что мы и делаем. Обновлённые данные от источников попадают в Оперпрофиль. Оперпрофиль реализует идею Customer 360 и хранится в виде таблиц Hbase. Это удобно для дальнейшей работы с клиентом.

Customer 360
Customer 360 — подход реализации операционного хранилища с всевозможными атрибутами клиентских данных, используемых во всех процессах в организации, которые работают с клиентом и его данными, доступных по ключу клиента.

Работа с агентами осуществляется непрерывно, и её нужно контролировать. Для быстрой проверки качества взаимодействия и hit rate, а также простоты передачи этой информации другим командам, мы используем визуализацию, например, отчёты в Tableau.

Исходные данные поступают в Kafka, проходят предварительную обработку и помещаются в DataLake, построенный на основе HDFS. Потребовалось придумать решение, как организовать парсинг файлов с логами из HDFS, их обработку и ежедневную выгрузку в аналитические системы и системы визуализации. А ещё совместить это с любовью аналитиков к Python ноутбукам.

Закончим с внутренней кухней и перейдём к практике.

Нашим решением стало использование API Livy. Livy позволяет сабмитить код на кластер прямо из Jupyter. HTTP запрос, содержащий код, написанный на Python (или Scala), и мета-данные, отправляется в Livy. Livy инициирует запуск Spark сессии на кластере, которая управляется менеджером ресурсов Yarn. Для отправки HTTP запросов подойдёт модуль requests. Любители парсить сайты наверняка с ним уже знакомы (а если нет – вот шанс немного узнать про него).

Импортируем необходимые модули и создадим сессию. (Также сразу узнаем адрес нашей сессии, в будущем это пригодится). В параметры передаем данные для авторизации пользователя и название языка скрипта, который будет исполнять кластер.

import json, requests, schedule, time
 
host = 'http://***:8998'
data = {'kind': 'spark', 'proxyUser': 'user'}
headers = {'Content-Type': 'application/json'}
r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
session_id = r.json().get('id')
print("session_id: " + str(session_id))
session_url = host + r.headers['location']
r = requests.get(session_url, headers=headers)

Ждём, когда статус сессии перейдёт в idle. В случае, если время ожидания превысит установленный timeout – отправляем сообщение об ошибке.

timeout = time.time() + wait_time
sess_state = ['starting', 'success', 'idle']
 
while(True):
    time.sleep(7)
    req_st = requests.get(session_url, headers=headers).json().get('state')
    if req_st != 'idle' and time.time() > timeout:
        requests.delete(session_url, headers=headers)
        send_message("Scheduler_error", req_st)
        break
    if req_st == 'idle':
        break
    if req_st not in sess_state:
        send_message("Scheduler_error", req_st)
        break
print("Session_state: ", req_st) 

Теперь можно отправлять код в Livy.

statements_url = session_url + '/statements'
data = {'code': '1 + 1'}
 
r = requests.post(statements_url, data=json.dumps(data), headers=headers)
statement_url = host + r.headers['location']
r = requests.get(statement_url, headers=headers)
while (requests.get(statement_url, headers=headers).json()['progress'] != 1):
    time.sleep(15)
r = requests.get(statement_url, headers=headers).json()['output']
session_url = 'http://***:8998/sessions/' + str(session_id)

В цикле ждём окончания исполнения кода, получаем результат обработки:

r.get('data').get('text/plain')

Метод delete удалит сессию.

requests.delete(session_url, headers=headers) 

Для ежедневной выгрузки можно использовать несколько вариантов, про cron на хабре уже писали, а вот про user-friendly модуль schedule – нет. Просто добавлю его в код, объяснений он не потребует. И, для удобства, все выкладки соберу в одном месте.

Код
import json, requests, schedule, time
 
schedule.every().day.at("16:05").do(job, 300)
while True:
    schedule.run_pending()
 
def job(wait_time):
    host = 'http://***:8998'
    data = {'kind': 'spark', 'proxyUser': 'user'}
    headers = {'Content-Type': 'application/json'}
    r = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
 
    session_id = r.json().get('id')
    print("session_id: " + str(session_id))
    session_url = host + r.headers['location']
    r = requests.get(session_url, headers=headers)
 
    timeout = time.time() + wait_time
    sess_state = ['starting', 'success', 'idle']
    while(True):
        time.sleep(7)
        req_st = requests.get(session_url, headers=headers).json().get('state')
        if req_st != 'idle' and time.time() > timeout:
            requests.delete(session_url, headers=headers)
            break
        if req_st == 'idle':
            break
        if req_st not in sess_state:
            send_message("Scheduler_error", req_st)
            break
    print("Session_state: ", req_st)   
    statements_url = session_url + '/statements'
    data = {'code': '1 + 1'}
    r = requests.post(statements_url, data=json.dumps(data),headers=headers)
    statement_url = host + r.headers['location']
    r = requests.get(statement_url, headers=headers)
    while (requests.get(statement_url, headers=headers).json()['progress'] != 1):
        time.sleep(15)
 
    r = requests.get(statement_url, headers=headers).json()['output']
    session_url = 'http://***:8998/sessions/' + str(session_id)
    print(r.get('data').get('text/plain'))
    #requests.delete(session_url, headers=headers)
 
def send_message(subject, text):
    import smtplib
    from email.mime.multipart import MIMEMultipart
    from email.mime.text import MIMEText
    me = "my_email_adress"
    you = "email_adress"
    msg = MIMEMultipart('alternative')
    msg['Subject'] = subject
    msg['From'] = me
    msg['To'] = you
    text = text
    part1 = MIMEText(text, 'plain')
    msg.attach(part1)
    s = smtplib.SMTP('domain.org')
    s.ehlo()
    s.starttls()
 
    s.login("user", "password")
    s.sendmail(me, you, msg.as_string())
    s.quit()


Заключение:


Быть может, это решение не претендует на лучшее, но оно прозрачно для команды аналитиков. Плюсы, которые в нём вижу я:

  • возможность использовать для автоматизации привычный Jupyter
  • наглядное взаимодействие
  • участник команды в праве сам выбрать, каким образом он будет работать с файлами (spark-зоопарк), как следствие, нет необходимости переписывать существующие скрипты

Конечно, при запуске большого количества заданий, придётся следить за освобождающимися ресурсами, настраивать коммуникацию между выгрузками. Эти вопросы решаются в индивидуальном порядке и согласовываются с коллегами.

Будет замечательно, если хотя бы одна команда возьмет это решение на заметку.

Ссылки


Документация Livy
Теги:bigdatalivyautomatizationsparkanalyticsmachine learning
Хабы: Блог компании ООО «Хоум Кредит Энд Финанс Банк» Python Big Data Машинное обучение Hadoop
+8
2,3k 28
Комментировать
Лучшие публикации за сутки