25 April 2010

Служба мгновенных собщений своими руками

Lumber room
Все мы привыкли пользоваться аськой, многие этот функционал реализуют в своих проектах, кто-то использует БД, или сервер очередей, например memcacheq. Есть готовые решения, типа eJabber.

Если интересно, как можно сделать это самому, то wellcom под каст, где будет рассмотрена серверная часть «Службы мгновенных сообщений». С клиентской, я надеюсь, разберетесь сами...

Суть «Службы коротких сообщений — это в принципе и есть сервер очередей, работающий по протоколу HTTP, который легко должен интегрироваться с любым JS фреймворком. Вса результаты выдаются в JSON. Обмен с сервером осуществляется из браузера посредством AJAX.

по методу POST записываем данные по ключу, которым является url (или его часть). По методу GET извлекаем из очереди (по ключу, которым является url) то, что было записано. Итак имеем: одну Хештаблицу, ключом в которой являются uri, а данными — очереди. Очередь представляет строку собщения. При желании можно добавить и время. Хотя есть много идей по развитию.

итак ближе к теме:
  1. #include <sys/types.h>
  2. #include <sys/time.h>
  3. #include <sys/queue.h>
  4. #include <stdlib.h>
  5. #include <err.h>
  6. #include <event.h>
  7. #include <evhttp.h>
  8. #include <map>
  9. #include <string>
  10. #include <queue>
  11. using namespace std;
  12. std::map<string,queue<string> > ht;
  13. void generic_handler(struct evhttp_request *req, void *arg)
  14. {
  15. struct evbuffer *buf;
  16. buf = evbuffer_new();
  17. if (buf == NULL)
  18. err(1, "failed to create response buffer");
  19. string key = evhttp_request_uri(req);
  20. string out;
  21. if (req->type == EVHTTP_REQ_POST) {
  22. const char * str_len = evhttp_find_header(req->input_headers, "Content-Length");
  23. int len = atoi(str_len);
  24. out.assign((const char *)EVBUFFER_DATA(req->input_buffer),len);
  25. if ( ht.find(key) == ht.end() ) {
  26. queue<string> q;
  27. q.push(out);
  28. ht.insert( pair<string,queue<string> >(key,q));
  29. } else
  30. ht[key].push(out);
  31. evbuffer_add_printf(buf, "{\"result\": \"Ok\"}\r\n");
  32. } else {
  33. if ( ht.find(key) == ht.end() ) {
  34. evbuffer_add_printf(buf, "{\"result\": null}\r\n" );
  35. } else {
  36. queue<string> q = ht[key];
  37. if ( q.size() ) {
  38. out = q.front();
  39. q.pop();
  40. ht[key]=q;
  41. evbuffer_add_printf(buf, "{\"result\": \"%s\"}" , out.c_str() );
  42. } else {
  43. evbuffer_add_printf(buf, "{\"result\": null }" )
  44. }
  45. }
  46. }
  47. evhttp_send_reply(req, HTTP_OK, "OK", buf);
  48. }
  49. int main(int argc, char **argv)
  50. {
  51. struct evhttp *httpd;
  52. event_init();
  53. httpd = evhttp_start("0.0.0.0", 8080);
  54. evhttp_set_gencb(httpd, generic_handler, NULL);
  55. event_dispatch();
  56. evhttp_free(httpd);
  57. return 0;
  58. }

немного пояснений по коду:
строки 58-63 инициализация WEB сервера. За основу взять WEB сервер основанный на libevent. У него отличная производительность. На моем ноуте 2.3ГГц он дает производительность 2к qps. Осуществляется обработка любого урла.
стр 20-22 — инициализирем буфер
стр 23 получаем REQUEST_IRI и используем для ключа. Есть много предложений по оптимизации.
стр 26 проверяем на POST. Несомненно можно еще проверить на HEAD (другие методы evhttp не поддерживает). Пока не будем усложнять жизнь.
стр 28-30 формируем переменную, в которой хранятся данные. Так как в буфере накапливается мусор, то мы записывает столько байт, сколько указано в заголовке Content-Length
стр 30-35 Если ключ такой не существует, то заводим новую очередь и вставляем в очередь элемент данных
стр 36 иначе просто вставляем в очередь элемент данных
стр 38 — отрабатывает метод GET
стр 39 проверяет существует ли ключ
стр 40 — нет — выводим сообщение о пустом результате
стр 42- получаем данные по ключу
стр 43 — проверяем пустая ли очередь,
стр 44-47 — нет, выбираем сообщение из очереди и выводим его, очередь уменьшается на одно сообщение
Можно немного пофлеймить, что надо сделать эскейпинг. Да, обязательно это добавлю.
стр 49 да, очередь пуста, сообщаем об этом.
стр 53 финализируем запрос, отправляем код ответа 200 OK

Необходимо отметить, что модель однопоточная, по этому ни каких блокировок на запись делать не надо. Хотя этот вопрос еще мной будет прорабатываться.

Результаты ab
Concurrency Level: 3
Time taken for tests: 0.415 seconds
Complete requests: 1000
Failed requests: 0
Write errors: 0
Total transferred: 83000 bytes
HTML transferred: 19000 bytes
Requests per second: 2409.31 [#/sec] (mean)
Time per request: 1.245 [ms] (mean)
Time per request: 0.415 [ms] (mean, across all concurrent requests)
Transfer rate: 195.29 [Kbytes/sec] received

объем потребляемой память на 10 000 сообщений — чуть более 600К

Вообще-то планируется поставить за nginx, пусть он отвечает за безопасность (ngx_http_accesskey_module)
кусок конфига:
location /test {
proxy_pass 127.0.0.1:8080;
## и еще директивы ngx_http_accesskey_module
}

но через nginx производительность доходит до 800 rps

Есть много идей — как и куда двигаться дальше. Например, отображение статусов активности, антиспам.
Любые иные идеи приветствуются. Пока сижу без работы, по этому нет проекта, на котором могу это опробовать. По моим расчетам одновременно около 10 тыс клиентов спокойно потянет, если делать запросы с частотой 1 -1.5 мин ( 130-200 rps).
Tags:социальные сетисообщенияпередача сообщенийlibevent
Hubs: Lumber room
0
711 19
Comments 17
Top of the last 24 hours