Pull to refresh

Redis, hiredis, libev и multithread. Часть 1

Reading time4 min
Views15K
Появилась задача: имеется большой поток данных от множества клиентов, который нужно обработать очень быстро (желательно в реалтайме), сложить в БД и разослать другому множеству клиентов, при этом данные не структурируются ни в таблицы, ни в документы. Выбор технологий для реализации пал на Redis + C++.


Выбор библиотеки для Redis


Если заглянуть сюда, то видим 4 варианта библиотек для реализации своего клиента для Redis под данную задачу: hiredis, credis, libredis и C++ Client.
Казалось бы, последнее — это то, что нужно. Но эта библиотека находится в статусе «expiremental», что сразу же намекает на то, что нужно вооружаться напильником, энтузиазмом и большим терпением. А деньги платят не за количество труда, а за результат. К тому же функциональность этого клиента пока что оставляет желать лучшего.
Libredis. Эта штука видимо является плагином к движку PHP. Не подходит.
Credis. Очень простенькая библиотека, позволяющая выполнять только базовые команды Redis, без возможности выстроить составную команду (эта возможность стоит у них сейчас в TODO), а ведь есть модель данных, и хотелось бы делать атомарно несколько запросов одновременно.
Итак, выбор пал на hiredis — официальный клиент.

Выбор событийно-ориентированной библиотеки


Следуя свежим веяниям IT-технологий и учитывая огромное количество клиентов, я задумался о том, что необходимо использовать неблокирующее соединение с Redis, тем более эта СУБД предоставляет такую возможность. В свою очередь hiredis предоставляет адаптеры для использования с 3 библиотеками событийно-ориентированной обработки данных: это ae, libevent и libev.
Тут важно подчеркнуть, что клиент для Redis — это всего лишь часть сервиса, которую нужно отправить в отдельный поток и каким-то образом отправлять в этот поток данные из других потоков, которые обрабатывают соединения с клиентами.
Ae. Не понял, что за библиотека, гугл мне не помог, если кто знает — просьба откомментить пруфлинком.
Libevent. Нет поддержки версий 2.x.x, только 1.x.x. Этот факт делает невозможным использование функционала в многопоточном приложении.
Итак, выбор пал на libev. Далее подробнее.

Hiredis+libev+multithread


Статей, как оказалось, нет под данной теме. Есть публикации от производителей hiredis о том, как осуществить однопоточную событийно-ориентированную обработку данных (вот пруф) и есть документация по libev, в которой четко сказано: многопоточность не поддерживается по умолчанию, потому что нет однозначного алгоритма сделать потокобезопасную реализацию, но есть возможность создать отдельный watcher с типом ev_async, который будет принимать данные из других потоков.
Хочу еще сразу сделать оговорку: я попробовал несколько вариантов написания кода, но либо знаний не хватает, либо надо глубже в исходники рыть, вобщем в итоге лучшим пока что вариантом оказалось просто скопировать предоставленный адаптер для libev в свой проект и дописать необходимый функционал.
Итак. Для начала в структуру события redisLibevEvents добавляем еще один watcher:

typedef struct redisLibevEvents {
    redisAsyncContext *context;
    struct ev_loop *loop;
    int reading, writing;
    ev_io rev, wev;
    ev_async aev; // Это специально обученный watcher для получения событий из других потоков.
} redisLibevEvents;

Далее. Добавляем функцию-callback для обработки событий, приходящих из других потоков:

void redisLibevAsyncEvent(EV_P_ ev_async *watcher, int revents) { // Функция написана по образу и подобию callback-ов для watcher-ов типа ev_io.
#if EV_MULTIPLICITY
    ((void)loop);
#endif
    ((void)revents);

    redisLibevEvents *e = (redisLibevEvents*)watcher->data;
    redisAsyncHandleRead(e->context); // Отправляем событие в обработку.
    free((redisLibevEvents*)watcher->data); // Тут освобождаем память, которую будем динамически выделять для создания события.
}

Далее. В функцию привязывания контекста к циклу обработки событий, которая называется redisLibevAttach, добавляем в конец следущее:

ev_async_init(&e->aev, redisLibevAsyncEvent); // Инициализируем watcher.
ev_async_start(e->loop, &e->aev); // И запускаем.

Теперь не хватает только одного: получения указателя на созданный watcher, так как он нужен для отправления событий. Для этого обрабатываем еще разок напильником функцию redisLibevAttach:

int redisLibevAttach(EV_P_ redisAsyncContext *ac, ev_async **_pEvAsync) {
...
*_pEvAsync = &e->aev;
return REDIS_OK;
}

Собственно после уже можно использовать.
Функция инициализации для цикла событий:

m_pRedisAsyncContext = redisAsyncConnect(m_strIP, m_nDBPort); // Создаем контекст.
m_pEventLoop = ev_loop_new(EVFLAG_AUTO); // Создаем кастомный цикл событий.
redisLibevAttach(m_pEventLoop, m_pRedisAsyncContext, &m_pAEV); // Привязываем к циклу событий контекст.
redisAsyncSetConnectCallback(m_pRedisAsyncContext, redisConnectCallbackFunction); // Привязываем callback для события соединения с Redis.
redisAsyncSetDisconnectCallback(m_pRedisAsyncContext, redisDisconnectCallbackFunction); // Привязываем callback для дисконнекта.

Функция потока:

ev_loop(m_pEventLoop, 0); // Запуск цикла обработки событий.

Ну и функция добавления нового события:

// Сначала ожидаем семафор, или мьютекс (кому как больше нравится) для синхронизации потоков.
redisAsyncCommand(m_pRedisAsyncContext, redisAsyncCommandCallbackFunction, _pPrivData, _pcQuery); // Добавляем в контекст запрос (_pcQuery) с информацией для его идентификации (_pPrivData) и callback-функцией для получения результата.
redisLibevEvents * pRedisLibevEvent =  (redisLibevEvents*)malloc(sizeof(redisLibevEvents)); // Создаем структуру для события.
pRedisLibevEvent->context = m_pRedisAsyncContext; // Помещаем контекст в событие.
m_pAEV->data = pRedisLibevEvent; // Рассказываем watcher-у про созданное событие.
ev_async_send(m_pEventLoop, m_pAEV); // Отправляем событие на обработку.
// Отпускаем семафор/мьютекс.


Ну вот и все. Полученная смесь C и C++ вполне жизнеспособна, хоть и не очень красиво выглядит.
Надеюсь кому-нибудь пригодится, пока не вышли официальные годные реализации на С++.

P.S.: Читайте продолжение во второй части.

Tags:
Hubs:
+35
Comments19

Articles