9 января 2014

«Дружим» redis с nginx

Высокая производительностьNoSQL
Не секрет, что для защиты от HTTP-DDoS зачастую используют связку nginx в качестве фронтенда и некий другой web-сервер в качестве бакенда. При этом ввиду большой нагрузки возникает проблема хранения логов для дальнейшего их анализа. Можно хранить в текстовом файле, но, естественно, анализировать/ротировать его весьма неудобно. Можно гнать данные напрямую в, например, mysql через пайп, но выигрывая в удобстве анализа мы проигрываем в производительности, особенно это заметно при фрагментации. Золотой серединой, пожалуй, будет no-sql решение.
Для себя я выбрал redis.

Для хранения логов напишем небольшой модуль для nginx. Изначально модуль был реализован c использованием официального Си-шного клиента hiredis, но случайно наткнулся на аналогичный модуль от Валерия Холодкова, который уже обладал большим функционалом, и взаимодействие реализовано без использования сторонних клиентов и решил позаимствовать его идеи (на тестах модули показали практически равные значения производительности).
В модуле от Валерия исправлена работа EVALSHA, устранены небольшие баги и добавлена поддержка SETEX.
ngx_http_redislog_module.c
/*
 * Copyright (C)    2012 Valery Kholodkov
 *                  2014 Alexander V Makkoveev
 */

#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
#include <nginx.h>

#define NGX_SOCKETLOG_FACILITY_LOCAL7      23
#define NGX_SOCKETLOG_SEVERITY_INFO        6

#define NGX_REDIS_APPEND                   "*3" CRLF "$6" CRLF "APPEND" CRLF
#define NGX_REDIS_AUTH                     "*2" CRLF "$4" CRLF "AUTH" CRLF

//#define NGX_DEF_FORMAT    "combined"
#define NGX_DEF_FORMAT      "main"
#define IF_DEBUG            0  
#define IF_DEBUG_2          0
//*****************************************************************************

typedef struct ngx_http_log_op_s  ngx_http_log_op_t;

typedef u_char *(*ngx_http_log_op_run_pt) (ngx_http_request_t *r, u_char *buf,
    ngx_http_log_op_t *op);

typedef size_t (*ngx_http_log_op_getlen_pt) (ngx_http_request_t *r,
    uintptr_t data);

struct ngx_redislog_peer;

typedef void (*ngx_redislog_send_handler_pt)(struct ngx_redislog_peer*);

struct ngx_http_log_op_s {
    size_t                      len;
    ngx_http_log_op_getlen_pt   getlen;
    ngx_http_log_op_run_pt      run;
    uintptr_t                   data;
};

typedef struct {
    ngx_str_t                   name;
#if defined nginx_version && nginx_version >= 7018
    ngx_array_t                *flushes;
#endif
    ngx_array_t                *ops;        /* array of ngx_http_log_op_t */
} ngx_http_log_fmt_t;

typedef struct {
    ngx_array_t                 formats;    /* array of ngx_http_log_fmt_t */
    ngx_uint_t                  combined_used; /* unsigned  combined_used:1 */
} ngx_http_log_main_conf_t;

typedef struct {
    ngx_str_t                           name;
    struct sockaddr                     *sockaddr;
    socklen_t                           socklen;
    ngx_msec_t                          write_timeout;
    ngx_msec_t                          read_timeout;
    ngx_msec_t                          connect_timeout;
    ngx_msec_t                          reconnect_timeout;
    ngx_msec_t                          flush_timeout;
    ngx_msec_t                          ping_timeout;

    ngx_bufs_t                          bufs;
    size_t                              recv_buf_size;

    ngx_str_t                           password;

    unsigned                            authenticate:1;
} ngx_redislog_peer_conf_t;

typedef struct {
    ngx_array_t                         *peers;
} ngx_redislog_conf_t;

typedef struct ngx_redislog_peer {
    ngx_redislog_peer_conf_t           *conf;
    ngx_peer_connection_t               conn;
    ngx_event_t                         reconnect_timer;
    ngx_event_t                         flush_timer;
    ngx_event_t                         ping_timer;
    ngx_log_t                           *log;
    ngx_pool_t                          *pool;

    ngx_chain_t                         *busy;
    ngx_chain_t                         *free;

    ngx_buf_t                           *recv_buf;

    ngx_uint_t                          discarded;
    ngx_uint_t                          reconnect_timeout;

    ngx_uint_t                          num_queued;
    ngx_uint_t                          state;
    u_char                              *password_pos;

    ngx_redislog_send_handler_pt        send_handler;

    unsigned                            connecting:1;
    unsigned                            authenticated:1;
    unsigned                            flush_timer_set:1;
} ngx_redislog_peer_t;

typedef struct {
    ngx_str_t                           peer_name;
    ngx_uint_t                          peer_idx;
    ngx_http_log_fmt_t                  *format;
    ngx_http_complex_value_t            *key;
    ngx_str_t                           command;
    ngx_str_t                           arg1;
//***
    //ngx_str_t                           arg_num;
//***
    ngx_http_complex_value_t            *_if;
    ngx_http_complex_value_t            *ifnot;

    unsigned                            has_arg1;
} ngx_http_redislog_t;

typedef struct {
    ngx_array_t                  *logs;       /* array of ngx_http_redislog_t */
    unsigned                     off;
} ngx_http_redislog_conf_t;

static ngx_array_t ngx_redislog_peers;

static void ngx_redislog_reconnect_peer(ngx_redislog_peer_t *p);
static void ngx_http_redislog_append(ngx_redislog_peer_t *p, u_char *buf, size_t len);
static void ngx_http_redislog_send(ngx_redislog_peer_t *p);
static void ngx_redislog_flush_handler(ngx_event_t*);
static u_char *ngx_redislog_size(u_char*, u_char*, size_t);
static size_t ngx_redislog_size_len(size_t);

static ngx_int_t ngx_redislog_process_buf(ngx_redislog_peer_t*, ngx_buf_t*);

static void ngx_redislog_read_handler(ngx_event_t *rev);
static void ngx_redislog_idle_read_handler(ngx_event_t *rev);

static char *ngx_http_redislog_set_log(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static char *ngx_http_redislog_command(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);

static void *ngx_http_redislog_create_loc_conf(ngx_conf_t *cf);
static char *ngx_http_redislog_merge_loc_conf(ngx_conf_t *cf, void *parent,
    void *child);

static ngx_int_t ngx_http_redislog_yyyy_variable(ngx_http_request_t *r,
    ngx_http_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_http_redislog_yyyymm_variable(ngx_http_request_t *r,
    ngx_http_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_http_redislog_yyyymmdd_variable(ngx_http_request_t *r,
    ngx_http_variable_value_t *v, uintptr_t data);
static ngx_int_t ngx_http_redislog_yyyymmddhh_variable(ngx_http_request_t *r,
    ngx_http_variable_value_t *v, uintptr_t data);

static void *ngx_redislog_create_conf(ngx_cycle_t *cycle);
static ngx_int_t ngx_http_redislog_add_variables(ngx_conf_t *cf);
static ngx_int_t ngx_http_redislog_init(ngx_conf_t *cf);
static ngx_int_t ngx_redislog_init_process(ngx_cycle_t *cycle);

static ngx_command_t ngx_http_redislog_commands[] = {

    { ngx_string("access_redislog"),
      NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_SIF_CONF|NGX_HTTP_LIF_CONF
                        |NGX_HTTP_LMT_CONF|NGX_CONF_TAKE1234,
      ngx_http_redislog_set_log,
      NGX_HTTP_LOC_CONF_OFFSET,
      0,
      NULL },

      ngx_null_command
};

static ngx_http_module_t  ngx_http_redislog_module_ctx = {
    ngx_http_redislog_add_variables,      /* preconfiguration */
    ngx_http_redislog_init,               /* postconfiguration */

    NULL,                                  /* create main configuration */
    NULL,                                  /* init main configuration */

    NULL,                                  /* create server configuration */
    NULL,                                  /* merge server configuration */

    ngx_http_redislog_create_loc_conf,    /* create location configration */
    ngx_http_redislog_merge_loc_conf      /* merge location configration */
};

extern ngx_module_t ngx_http_log_module;

ngx_module_t  ngx_http_redislog_module = {
    NGX_MODULE_V1,
    &ngx_http_redislog_module_ctx,        /* module context */
    ngx_http_redislog_commands,           /* module directives */
    NGX_HTTP_MODULE,                       /* module type */
    NULL,                                  /* init master */
    NULL,                                  /* init module */
    NULL,                                  /* init process */
    NULL,                                  /* init thread */
    NULL,                                  /* exit thread */
    NULL,                                  /* exit process */
    NULL,                                  /* exit master */
    NGX_MODULE_V1_PADDING
};

static ngx_command_t  ngx_redislog_commands[] = {

    { ngx_string("redislog"),
      NGX_MAIN_CONF|NGX_CONF_TAKE23,
      ngx_http_redislog_command,
      0,
      0,
      NULL },

      ngx_null_command
};

static ngx_core_module_t  ngx_redislog_module_ctx = {
    ngx_string("redislog"),
    ngx_redislog_create_conf,
    NULL
};

ngx_module_t  ngx_core_redislog_module = {
    NGX_MODULE_V1,
    &ngx_redislog_module_ctx,             /* module context */
    ngx_redislog_commands,                /* module directives */
    NGX_CORE_MODULE,                       /* module type */
    NULL,                                  /* init master */
    NULL,                                  /* init module */
    ngx_redislog_init_process,            /* init process */
    NULL,                                  /* init thread */
    NULL,                                  /* exit thread */
    NULL,                                  /* exit process */
    NULL,                                  /* exit master */
    NGX_MODULE_V1_PADDING
};

static ngx_http_variable_t ngx_http_redislog_variables[] = {

    { ngx_string("redislog_yyyy"), NULL, ngx_http_redislog_yyyy_variable, 0,
      0, 0 },

    { ngx_string("redislog_yyyymm"), NULL, ngx_http_redislog_yyyymm_variable, 0,
      0, 0 },

    { ngx_string("redislog_yyyymmdd"), NULL, ngx_http_redislog_yyyymmdd_variable, 0,
      0, 0 },

    { ngx_string("redislog_yyyymmddhh"), NULL, ngx_http_redislog_yyyymmddhh_variable, 0,
      0, 0 },

    { ngx_null_string, NULL, NULL, 0, 0, 0 }
};
//-----------------------------------------------------------------------------
ngx_int_t
ngx_http_redislog_handler(ngx_http_request_t *r)
{
    
    u_char                    *line, *p;
    size_t                    len, command_size_len, arg1_size_len, record_len, key_size_len, record_size_len;
    ngx_uint_t                i, l;
    ngx_str_t                 key, _if, ifnot;
    ngx_http_redislog_t      *log;
    ngx_http_log_op_t         *op;
    ngx_http_redislog_conf_t *slcf;
    time_t                    time;
    ngx_tm_t                  tm;
    ngx_redislog_peer_t      **peer;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "http redislog handler");

    slcf = ngx_http_get_module_loc_conf(r, ngx_http_redislog_module);

    ngx_log_debug3(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
           "redislog conf=%p, off=%ud, logs=%p", slcf, slcf->off, slcf->logs);

    if(slcf->off || slcf->logs == NULL) {
        return NGX_OK;
    }

    time = ngx_time();
    ngx_gmtime(time, &tm);

    log = slcf->logs->elts;

    for (l = 0; l < slcf->logs->nelts; l++) {
#if defined nginx_version && nginx_version >= 7018
        ngx_http_script_flush_no_cacheable_variables(r, log[l].format->flushes);
#endif

        len = 0;
        op = log[l].format->ops->elts;
        for (i = 0; i < log[l].format->ops->nelts; i++) {
            if (op[i].len == 0) {
                len += op[i].getlen(r, op[i].data);

            } else {
                len += op[i].len;
            }
        }

        if(log[l].ifnot != NULL) {
            if(ngx_http_complex_value(r, log[l].ifnot, &ifnot) != NGX_OK) {
                return NGX_ERROR;
            }

            if(ifnot.len && (ifnot.len != 1 || ifnot.data[0] != '0')) {
                continue;
            }
        }

        if(log[l]._if != NULL) {
            if(ngx_http_complex_value(r, log[l]._if, &_if) != NGX_OK) {
                return NGX_ERROR;
            }

            if(!_if.len || (_if.len == 1 && _if.data[0] == '0')) {
                continue;
            }
        }

        if(ngx_http_complex_value(r, log[l].key, &key) != NGX_OK) {
            return NGX_ERROR;
        }
        
        command_size_len = ngx_redislog_size_len(log[l].command.len);
        key_size_len = ngx_redislog_size_len(key.len);

        if(log[l].arg1.len) {
            arg1_size_len = ngx_redislog_size_len(log[l].arg1.len);
        }
        else {
            arg1_size_len = 0;
        }

        len += 2 + sizeof(CRLF) - 1 + 1 + command_size_len + 1 + sizeof(CRLF) - 1
            + command_size_len + sizeof(CRLF) - 1 + log[l].command.len + sizeof(CRLF) - 1
            + key_size_len + sizeof(CRLF) - 1 + key.len + sizeof(CRLF) - 1
            + 1 + NGX_OFF_T_LEN + sizeof(CRLF) - 1 + sizeof(CRLF) - 1;

        if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) {
            len++;
        }

        if(log[l].has_arg1) {
            len += arg1_size_len + sizeof(CRLF) - 1 + log[l].arg1.len + sizeof(CRLF) - 1;
        }

#if defined nginx_version && nginx_version >= 7003
        line = ngx_pnalloc(r->pool, len);
#else
        line = ngx_palloc(r->pool, len);
#endif
        if (line == NULL) {
            return NGX_ERROR;
        }

        p = line;

        for(i = 0; i < log[l].format->ops->nelts; i++) {
            p = op[i].run(r, p, &op[i]);
        }

        if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) {
            *p++ = LF;
        }

        record_len = p - line;

        record_size_len = ngx_redislog_size_len(record_len);

        p = line;

        /*
         * Redis append to time series command
            *3
            $6
            APPEND
            $nnn
            key
            $nnn
            log record
        */
        *p++ = '*';

        //*p++ = log[l].has_arg1 ? '4' : '3';
        /* SETEX */
        if(ngx_strncmp(log[l].command.data, "SETEX", 5) == 0)
        *p++ = '4';
        else
        *p++ = log[l].has_arg1 ? '5' : '3';

        p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

        *p++ = '$';
        //*p++ = '0';

        p = ngx_redislog_size(p, p + command_size_len, log[l].command.len);

        p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

        p = ngx_copy(p, log[l].command.data, log[l].command.len);

        p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

        if(log[l].has_arg1 && ngx_strncmp(log[l].command.data, "EVALSHA", 7) == 0) {
            *p++ = '$';
            p = ngx_redislog_size(p, p + arg1_size_len, log[l].arg1.len);
            p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
            p = ngx_copy(p, log[l].arg1.data, log[l].arg1.len);
            p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

            *p++ = '$';
            p = ngx_redislog_size(p, p + 1, 1);
            p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
            p = ngx_copy(p, "1", 1);
            p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
        }
/*
*p++ = '$';
*p++ = '1';
*p++ = LF;
*p++ = '0';
*p++ = LF;
*/

        *p++ = '$';

        p = ngx_redislog_size(p, p + key_size_len, key.len);

        p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

        p = ngx_copy(p, key.data, key.len);

        p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

                if(log[l].has_arg1 && ngx_strncmp(log[l].command.data, "SETEX", 5) == 0) {
            *p++ = '$';
            p = ngx_redislog_size(p, p + arg1_size_len, log[l].arg1.len);
            p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);
            p = ngx_copy(p, log[l].arg1.data, log[l].arg1.len);
            p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

        }

        *p++ = '$';

        p = ngx_redislog_size(p, p + record_size_len, record_len);

        p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

        for(i = 0; i < log[l].format->ops->nelts; i++) {
            p = op[i].run(r, p, &op[i]);
        }

        if(ngx_strncmp(log[l].command.data, "APPEND", 6) == 0) {
            *p++ = LF;
        }

        p = ngx_copy(p, CRLF, sizeof(CRLF) - 1);

        peer = ngx_redislog_peers.elts;

        peer += log[l].peer_idx;

        ngx_http_redislog_append(*peer, line, p - line);
        //ngx_http_redislog_append(*peer, line, p - line + 5);
    }

    return NGX_OK;
}

static u_char *ngx_redislog_size(u_char *p, u_char *q, size_t sz)
{
    u_char *end = q;

    while(p != q) {
        *--q = (sz % 10 + '0');
        sz /= 10;
    }

    return end;
}

static size_t ngx_redislog_size_len(size_t sz)
{
    size_t len = 0;
    
    while(sz != 0) {
        sz /= 10;
        len++;
    }

    return len;
}

static u_char*
ngx_redislog_buf_append(ngx_buf_t *buf, u_char *p, size_t *len)
{
    size_t remaining = buf->end - buf->last;

    if(remaining > *len) {
        remaining = *len;
    }

    buf->last = ngx_copy(buf->last, p, remaining);
    *len -= remaining;

    return p + remaining;
}

static void
ngx_http_redislog_append(ngx_redislog_peer_t *peer, u_char *buf, size_t len)
{
    u_char *p;
    ngx_chain_t *last, *q;
    size_t remaining;
    ngx_uint_t num_busy = 0;

    /*
     * Find last busy buffer
     */
    last = peer->busy;
    
    while(last != NULL && last->next != NULL) {
        last = last->next;
    }

    /*
     * See if message fits into remaining space
     */
    remaining = (last != NULL ? last->buf->end - last->buf->last : 0);

    q = peer->free;

    while(remaining <= len && q != NULL) {
        remaining += (q->buf->end - q->buf->last);
        q = q->next;
    }

    /*
     * No memory for this message, discard it
     */
    if(remaining < len) {
        peer->discarded++;
        return;
    }

    /*
     * Append message to the buffers
     */
    if(last != NULL) {
        p = ngx_redislog_buf_append(last->buf, buf, &len);
    }
    else {
        p = buf;
    }

    while(peer->free != NULL && len != 0) {
        q = peer->free;

        p = ngx_redislog_buf_append(q->buf, p, &len);

        peer->free = peer->free->next;

        q->next = NULL;

        if(last == NULL) {
            peer->busy = q;
        }
        else {
            last->next = q;
        }

        last = q;
    }

    peer->num_queued++;

    q = peer->busy;

    while(q != NULL) {
        num_busy++;
        q = q->next;
    }

    if(!peer->flush_timer_set) {
        peer->flush_timer.handler = ngx_redislog_flush_handler;
        peer->flush_timer.data = peer;
        peer->flush_timer.log = peer->conn.log;

        ngx_add_timer(&peer->flush_timer, peer->conf->flush_timeout);

        peer->flush_timer_set = 1;
    }

    if(num_busy >= 2) {
        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, peer->conn.connection->log, 0,
                       "redislog num queued is now %ud, set read handler", peer->num_queued);

        peer->conn.connection->read->handler = ngx_redislog_read_handler;
        /*
         * Send it
         */
        ngx_http_redislog_send(peer);
    }
}

static void
ngx_http_redislog_send(ngx_redislog_peer_t *p)
{
    ngx_chain_t                         *written;
    ngx_connection_t                    *c;
    ngx_chain_t                         *dummy = NULL;

    c = p->conn.connection;

    if(c == NULL || c->fd == -1) {
        return;
    }

    if(!c->write->ready) {
        return;
    }

    if(p->flush_timer_set) {
        ngx_del_timer(&p->flush_timer);
        p->flush_timer_set = 0;
    }

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "redislog send handler");

    if(p->busy != NULL) {
        written = c->send_chain(c, p->busy, 0);

        if(written == NGX_CHAIN_ERROR) {
            ngx_log_error(NGX_LOG_ERR, c->log, 0,
                          "redislog write error");
            ngx_close_connection(c);
            ngx_redislog_reconnect_peer(p);
            return;
        }

        ngx_chain_update_chains(p->pool, &p->free, &p->busy, &dummy, 0);

        if(written != NULL) {

            if(!c->write->ready && !c->write->timer_set) {
                ngx_add_timer(c->write, p->conf->write_timeout);
            }

            if(ngx_handle_write_event(c->write, 0) != NGX_OK) {
                ngx_close_connection(c);
                ngx_redislog_reconnect_peer(p);
            }

            return;
        }
    }
}

static void ngx_redislog_auth_send(ngx_redislog_peer_t *peer)
{
    ngx_connection_t                    *c;
    ngx_str_t                           *password;
    ssize_t                             n;

    c = peer->conn.connection;

    if(c == NULL || c->fd == -1) {
        return;
    }

    password = &peer->conf->password;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, c->log, 0,
                   "redislog auth send handler");

    n = c->send(c, peer->password_pos, password->len - (peer->password_pos - password->data));

    if(n > 0) {
        peer->password_pos += n;

        if(peer->password_pos >= (password->data + password->len)) {
            peer->send_handler = ngx_http_redislog_send;
            ngx_http_redislog_send(peer);
        }

        return;
    }

    if(n == NGX_ERROR) {
        ngx_close_connection(c);
        ngx_redislog_reconnect_peer(peer);
        return;
    }

    if(!c->write->timer_set) {
        ngx_add_timer(c->write, peer->conf->write_timeout);
    }

    if(ngx_handle_write_event(c->write, 0) != NGX_OK) {
        ngx_close_connection(c);
        ngx_redislog_reconnect_peer(peer);
        return;
    }
}

static void ngx_redislog_flush_handler(ngx_event_t *ev)
{
    ngx_redislog_peer_t    *peer = ev->data;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, peer->log, 0,
                   "redislog flush handler, set read handler");

    peer->flush_timer_set = 0;
    peer->conn.connection->read->handler = ngx_redislog_read_handler;

    ngx_http_redislog_send(peer);
}

static void ngx_redislog_connected_handler(ngx_redislog_peer_t *peer)
{
    ngx_connection_t                    *c;

    c = peer->conn.connection;

    ngx_del_timer(c->read);

    /*
     * Once the connection has been established, we need to
     * reset the reconnect timeout to it's initial value
     */
    peer->reconnect_timeout = peer->conf->reconnect_timeout;

    if(peer->discarded != 0) {
        ngx_log_error(NGX_LOG_ERR, peer->log, 0,
            "redislog peer \"%V\" discarded %ui messages",
            &peer->conf->name, peer->discarded);

        peer->discarded = 0;
    }
}

static ngx_int_t ngx_redislog_process_buf(ngx_redislog_peer_t *peer, ngx_buf_t *buf)
{
    u_char                             *p, *q;

    p = buf->pos;
    q = buf->last;

    while(p != q) {
        if(!peer->state) {
            if(*p == '+' || *p == '-' || *p == ':') {
                if(peer->conf->authenticate && !peer->authenticated) {
                    if(*p == '-') {
                        ngx_log_error(NGX_LOG_ERR, peer->log, 0,
                            "redis authentication failure");
                        return NGX_ERROR;
                    }
                    peer->authenticated = 1;
                }
                else {
                    if(peer->num_queued) {
                        peer->num_queued--;
                    }
                    else {
                        ngx_log_error(NGX_LOG_ERR, peer->log, 0,
                            "too many responses from redis");
                        return NGX_ERROR;
                    }
                }
            }
            if(*p == '-') {
                ngx_log_error(NGX_LOG_ERR, peer->log, 0,
                    "redislog error");
            }
            peer->state++;
        }
        else {
            if(*p == LF) {
                peer->state = 0;
            }
            else if(peer->state == 1) {
                if(*p == CR) {
                    peer->state++;
                }
            }
        }

        p++;
    }

    buf->pos = p;

    return NGX_OK;
}

static void ngx_redislog_read_handler(ngx_event_t *rev)
{
    ngx_connection_t                   *c;
    ngx_redislog_peer_t                *peer;
    ngx_buf_t                          *buf;
    ssize_t                            n, size;
    ngx_int_t                          rc;

    c = rev->data;
    peer = c->data;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,
                   "redislog read handler");

    if(c->read->timer_set) {
        ngx_del_timer(c->read);
    }

    if(rev->timedout || c->error || c->close) {
        if(rev->timedout) {
            ngx_log_error(NGX_LOG_ERR, rev->log, NGX_ETIMEDOUT,
                          "redislog peer timed out");
        }

        if(rev->error) {
            ngx_log_error(NGX_LOG_ERR, rev->log, 0,
                          "redislog peer connection error");
        }

        ngx_close_connection(c);

        if(!c->close) {
            ngx_redislog_reconnect_peer(peer);
        }
        return;
    }

    buf = peer->recv_buf;

    for( ;; ) {
        for( ;; ) {
            if(buf->last == buf->end) {
                break;
            }

            size = buf->end - buf->last;

            n = c->recv(c, buf->last, size);

            ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                           "redislog peer recv %z", n);

            if(n == NGX_AGAIN) {
                break;
            }

            if(n == 0) {
                if(peer->num_queued != 0) {
                    ngx_log_error(NGX_LOG_INFO, c->log, 0,
                                  "redis closed the connection prematurely");
                }
            }

            if(n == 0 || n == NGX_ERROR) {
                c->error = 1;
                goto reconnect;
            }

            buf->last += n;
        }

        rc = ngx_redislog_process_buf(peer, buf);

        if(rc != NGX_OK) {
            goto reconnect;
        }

        buf->pos = buf->last = buf->start;

        ngx_log_debug1(NGX_LOG_DEBUG_HTTP, c->log, 0,
                       "redislog num queued is now %ud", peer->num_queued);

        if(peer->num_queued == 0) {
            break;
        }

        if (!c->read->ready) {
            if(ngx_handle_read_event(c->read, 0) != NGX_OK) {
                goto reconnect;
            }

            if(!c->read->timer_set) {
                ngx_add_timer(c->read, peer->conf->read_timeout);
            }

            return;
        }
    }

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,
                   "redislog set idle read handler");

    c->read->handler = ngx_redislog_idle_read_handler;

    return;

reconnect:
    ngx_close_connection(c);
    ngx_redislog_reconnect_peer(peer);
}

static void ngx_redislog_idle_read_handler(ngx_event_t *rev)
{
    ngx_connection_t                    *c;
    ngx_redislog_peer_t                *peer;

    int                                 n;
    char                                buf[1];
    ngx_err_t                           err;

    c = rev->data;
    peer = c->data;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, rev->log, 0,
                   "redislog idle read handler");

    if(rev->timedout || c->error || c->close) {
        if(rev->timedout) {
            ngx_log_error(NGX_LOG_ERR, rev->log, NGX_ETIMEDOUT,
                          "redislog peer timed out");
        }

        if(rev->error) {
            ngx_log_error(NGX_LOG_ERR, rev->log, 0,
                          "redislog peer connection error");
        }

        ngx_close_connection(c);

        if(!c->close) {
            ngx_redislog_reconnect_peer(peer);
        }
        return;
    }

#if (NGX_HAVE_KQUEUE)
    if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {

        if(!rev->pending_eof) {
            goto no_error;
        }

        rev->eof = 1;
        c->error = 1;

        if(rev->kq_errno) {
            rev->error = 1;
        }

        goto reconnect;
    }
#endif

    n = recv(c->fd, buf, 1, MSG_PEEK);

    err = ngx_socket_errno;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, rev->log, err,
                   "redislog recv(): %d", n);

    if(n > 0) {
        goto no_error;
    }

    if(n == -1) {
        if(err == NGX_EAGAIN) {
            goto no_error;
        }

        rev->error = 1;

    }
    else {
        err = 0;
    }

    rev->eof = 1;
    c->error = 1;

    ngx_log_error(NGX_LOG_ERR, rev->log, err,
                  "redislog connection error");

#if (NGX_HAVE_KQUEUE)
reconnect:
#endif
    ngx_close_connection(c);
    ngx_redislog_reconnect_peer(peer);
    return;

no_error:
    if(peer->connecting) {
        ngx_redislog_connected_handler(peer);
        peer->connecting = 0;
    }
}

static void ngx_redislog_write_handler(ngx_event_t *wev)
{
    ngx_connection_t                    *c;
    ngx_redislog_peer_t                *peer;

    c = wev->data;
    peer = c->data;

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, wev->log, 0,
                   "redislog write handler");

    if(wev->timedout || c->error || c->close) {
        if(wev->timedout) {
            ngx_log_error(NGX_LOG_ERR, wev->log, NGX_ETIMEDOUT,
                          "redislog peer timed out");
        }

        if(wev->error) {
            ngx_log_error(NGX_LOG_ERR, wev->log, 0,
                          "redislog peer connection error");
        }

        ngx_close_connection(c);

        if(!c->close) {
            ngx_redislog_reconnect_peer(peer);
        }
        return;
    }

    if(peer->connecting) {
        ngx_redislog_connected_handler(peer);
        peer->connecting = 0;
    }

    if(c->write->timer_set) {
        ngx_del_timer(c->write);
    }

    peer->send_handler(peer);
}

static ngx_int_t ngx_redislog_connect_peer(ngx_redislog_peer_t *peer)
{
    ngx_int_t               rc;

    ngx_log_error(NGX_LOG_INFO, peer->log, 0,
                  "redislog connect peer \"%V\"", &peer->conf->name);

    peer->conn.sockaddr = peer->conf->sockaddr;
    peer->conn.socklen = peer->conf->socklen;
    peer->conn.name = &peer->conf->name;
    peer->conn.get = ngx_event_get_peer;
    peer->conn.log = peer->log;
    peer->conn.log_error = NGX_ERROR_ERR;

    rc = ngx_event_connect_peer(&peer->conn);

    if (rc == NGX_ERROR || rc == NGX_BUSY || rc == NGX_DECLINED) {
        if(peer->conn.connection) {
            ngx_close_connection(peer->conn.connection);
        }

        return NGX_ERROR;
    }

    peer->conn.connection->data = peer;
    peer->conn.connection->pool = peer->pool;
    
    peer->password_pos = peer->conf->password.data;
    peer->authenticated = 0;

    peer->conn.connection->read->handler = ngx_redislog_read_handler;
    peer->conn.connection->write->handler = ngx_redislog_write_handler;

    peer->send_handler = peer->conf->authenticate ? ngx_redislog_auth_send
        : ngx_http_redislog_send;

    ngx_add_timer(peer->conn.connection->read, peer->conf->connect_timeout);

    peer->connecting = 1;

    return NGX_OK;
}

static void ngx_redislog_connect_handler(ngx_event_t *ev)
{
    ngx_int_t               rc;
    ngx_redislog_peer_t    *peer = ev->data;

    rc = ngx_redislog_connect_peer(peer);

    if(rc != NGX_OK) {
        ngx_redislog_reconnect_peer(peer);
    }
}

static void ngx_redislog_reconnect_peer(ngx_redislog_peer_t *p)
{
    p->conn.connection = NULL;

    p->reconnect_timer.handler = ngx_redislog_connect_handler;
    p->reconnect_timer.data = p;
    p->reconnect_timer.log = p->conn.log;

    ngx_add_timer(&p->reconnect_timer, p->reconnect_timeout);

    p->reconnect_timeout *= 2;

    if(p->discarded != 0) {
        ngx_log_error(NGX_LOG_ERR, p->log, 0,
            "redislog peer \"%V\" discarded %ui messages",
            &p->conf->name, p->discarded);

        p->discarded = 0;
    }
}

static ngx_int_t
ngx_http_redislog_yyyy_variable(ngx_http_request_t *r,
    ngx_http_variable_value_t *v, uintptr_t data)
{
    u_char *line;

    line = ngx_palloc(r->pool, sizeof("yyyy")-1);

    if(line == NULL) {
        return NGX_ERROR;    
    }

    (void) ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);

    v->valid = 1;
    v->no_cacheable = 1;
    v->not_found = 0;

    v->data = line;
    v->len = sizeof("yyyy")-1;

    return NGX_OK;
}

static ngx_int_t
ngx_http_redislog_yyyymm_variable(ngx_http_request_t *r,
    ngx_http_variable_value_t *v, uintptr_t data)
{
    u_char *line, *p;

    line = ngx_palloc(r->pool, sizeof("yyyymm")-1);

    if(line == NULL) {
        return NGX_ERROR;    
    }

    p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);
    (void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1);

    v->valid = 1;
    v->no_cacheable = 1;
    v->not_found = 0;

    v->data = line;
    v->len = sizeof("yyyymm")-1;

    return NGX_OK;
}

static ngx_int_t
ngx_http_redislog_yyyymmdd_variable(ngx_http_request_t *r,
    ngx_http_variable_value_t *v, uintptr_t data)
{
    u_char *line, *p;

    line = ngx_palloc(r->pool, sizeof("yyyymmdd")-1);

    if(line == NULL) {
        return NGX_ERROR;    
    }

    p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);
    p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1);
    (void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 8, sizeof("dd")-1);

    v->valid = 1;
    v->no_cacheable = 1;
    v->not_found = 0;

    v->data = line;
    v->len = sizeof("yyyymmdd")-1;

    return NGX_OK;
}

static ngx_int_t
ngx_http_redislog_yyyymmddhh_variable(ngx_http_request_t *r,
    ngx_http_variable_value_t *v, uintptr_t data)
{
    u_char *line, *p;

    line = ngx_palloc(r->pool, sizeof("yyyymmddhh")-1);

    if(line == NULL) {
        return NGX_ERROR;    
    }

    p = ngx_copy(line, ngx_cached_http_log_iso8601.data, sizeof("yyyy")-1);
    p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 5, sizeof("mm")-1);
    p = ngx_copy(p, ngx_cached_http_log_iso8601.data + 8, sizeof("dd")-1);
    (void) ngx_copy(p, ngx_cached_http_log_iso8601.data + 11, sizeof("hh")-1);

    v->valid = 1;
    v->no_cacheable = 1;
    v->not_found = 0;

    v->data = line;
    v->len = sizeof("yyyymmddhh")-1;

    return NGX_OK;
}

static void *
ngx_http_redislog_create_loc_conf(ngx_conf_t *cf)
{
    ngx_http_redislog_conf_t  *conf;

    conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_redislog_conf_t));
    if (conf == NULL) {
        return NGX_CONF_ERROR;
    }

    return conf;
}

static char *
ngx_http_redislog_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
{
    ngx_http_redislog_conf_t *prev = parent;
    ngx_http_redislog_conf_t *conf = child;

    if(conf->logs || conf->off) {
        return NGX_CONF_OK;
    }

    conf->logs = prev->logs;
    conf->off = prev->off;

    return NGX_CONF_OK;
}

static void *
ngx_redislog_create_conf(ngx_cycle_t *cycle)
{
    ngx_redislog_conf_t  *slcf;

    slcf = ngx_pcalloc(cycle->pool, sizeof(ngx_redislog_conf_t));
    if(slcf == NULL) {
        return NULL;
    }

    return slcf;
}

static ngx_int_t
ngx_http_redislog_add_variables(ngx_conf_t *cf)
{
    ngx_http_variable_t  *var, *v;

    for (v = ngx_http_redislog_variables; v->name.len; v++) {

        var = ngx_http_add_variable(cf, &v->name, v->flags);
        if (var == NULL) {
            return NGX_ERROR;
        }

        var->get_handler = v->get_handler;
        var->data = v->data;
    }

    return NGX_OK;
}

static ngx_int_t
ngx_http_redislog_find_peer_by_name(ngx_conf_t *cf, ngx_str_t *name)
{
    ngx_redislog_conf_t         *slcf;
    ngx_redislog_peer_conf_t    *pc;
    ngx_uint_t                   i;

    slcf = (ngx_redislog_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, ngx_core_redislog_module);

    pc = slcf->peers->elts;

    for(i = 0; i < slcf->peers->nelts; i++) {
        if(pc[i].name.len == name->len
            && ngx_strncmp(pc[i].name.data, name->data, name->len) == 0)
        {
            return i;
        }
    }

    return NGX_DECLINED;
}

static char *
ngx_http_redislog_set_log(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_http_redislog_conf_t            *slcf = conf;

    ngx_uint_t                          i;
    ngx_str_t                           *value, name, command, arg1, _if;
    //ngx_str_t                           arg_num;

    ngx_http_redislog_t                 *log;
    ngx_http_log_fmt_t                  *fmt;
    ngx_http_log_main_conf_t            *lmcf;
    ngx_int_t                           rc;
    ngx_http_compile_complex_value_t    ccv;
    unsigned                            format_set;

    format_set = 0;
    value = cf->args->elts;

    if (ngx_strcmp(value[1].data, "off") == 0) {
        slcf->off = 1;
        return NGX_CONF_OK;
    }
    slcf->off = 0;

    if (slcf->logs == NULL) {
        slcf->logs = ngx_array_create(cf->pool, 2, sizeof(ngx_http_redislog_t));
        if (slcf->logs == NULL) {
            return NGX_CONF_ERROR;
        }
    }

    lmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_log_module);

    if(lmcf == NULL) {
        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                           "redislog module requires log module to be compiled in");
        return NGX_CONF_ERROR;
    }

    log = ngx_array_push(slcf->logs);
    if (log == NULL) {
        return NGX_CONF_ERROR;
    }

    ngx_memzero(log, sizeof(ngx_http_redislog_t));

    log->peer_name = value[1];

    rc = ngx_http_redislog_find_peer_by_name(cf, &log->peer_name);

    if(rc == NGX_DECLINED) {
        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                           "redislog peer %V is not defined", &log->peer_name);
        return NGX_CONF_ERROR;
    }

    log->peer_idx = rc;

    /*
     * Create and compile key
     */
    log->key = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
    if(log->key == NULL) {
        return NGX_CONF_ERROR;
    }

    ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));

    ccv.cf = cf;
    ccv.value = &value[2];
    ccv.complex_value = log->key;

    if(ngx_http_compile_complex_value(&ccv) != NGX_OK) {
        return NGX_CONF_ERROR;
    }

    ngx_str_set(&command, "APPEND");
    //ngx_str_set(&arg1, "");
    ngx_str_set(&arg1, "test1");
    //ngx_str_set(&arg_num, "0");
    ngx_str_set(&name, "main");


    if (cf->args->nelts >= 4) {
        for (i = 3; i < cf->args->nelts; i++) {

            if (ngx_strncmp(value[i].data, "format=", 7) == 0) {

                format_set = 1;

                name = value[i];

                name.len -= 7;
                name.data += 7;

                if (ngx_strcmp(name.data, "combined") == 0) {
                    lmcf->combined_used = 1;
                }
                continue;
            }

            if (ngx_strncmp(value[i].data, "command=", 8) == 0) {

                command = value[i];

                command.len -= 8;
                command.data += 8;

                continue;
            }

            if (ngx_strncmp(value[i].data, "arg1=", 5) == 0) {

                arg1 = value[i];

                arg1.len -= 5;
                arg1.data += 5;

                log->has_arg1 = 1;

                continue;
            }

            if (ngx_strncmp(value[i].data, "if=", 3) == 0) {
                if(log->_if != NULL) {
                    continue;
                }

                _if = value[i];

                _if.len -= 3;
                _if.data += 3;

                /*
                 * Create and compile if script
                 */
                log->_if = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
                if(log->_if == NULL) {
                    return NGX_CONF_ERROR;
                }

                ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));

                ccv.cf = cf;
                ccv.value = &_if;
                ccv.complex_value = log->_if;

                if(ngx_http_compile_complex_value(&ccv) != NGX_OK) {
                    return NGX_CONF_ERROR;
                }

                continue;
            }

            if (ngx_strncmp(value[i].data, "ifnot=", 6) == 0) {
                if(log->ifnot != NULL) {
                    continue;
                }

                _if = value[i];

                _if.len -= 6;
                _if.data += 6;

                /*
                 * Create and compile if script
                 */
                log->ifnot = ngx_palloc(cf->pool, sizeof(ngx_http_complex_value_t));
                if(log->ifnot == NULL) {
                    return NGX_CONF_ERROR;
                }

                ngx_memzero(&ccv, sizeof(ngx_http_compile_complex_value_t));

                ccv.cf = cf;
                ccv.value = &_if;
                ccv.complex_value = log->ifnot;

                if(ngx_http_compile_complex_value(&ccv) != NGX_OK) {
                    return NGX_CONF_ERROR;
                }

                continue;
            }

            ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                               "invalid parameter \"%V\"", &value[i]);
            return NGX_CONF_ERROR;
        }
    }

    if(!format_set) {
        name.len = sizeof(NGX_DEF_FORMAT) - 1;
        name.data = (u_char *) NGX_DEF_FORMAT;
        lmcf->combined_used = 1;
        
    }

    log->command = command;

    if(log->has_arg1) {
        log->arg1 = arg1;
    }

    fmt = lmcf->formats.elts;
    for (i = 0; i < lmcf->formats.nelts; i++) {
        if (fmt[i].name.len == name.len
            && ngx_strcasecmp(fmt[i].name.data, name.data) == 0)
        {
            log->format = &fmt[i];
            goto done;
        }
    }

    ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
                       "unknown log format \"%V\"", &name);
    return NGX_CONF_ERROR;

done:

    return NGX_CONF_OK;
}

static char *
ngx_http_redislog_command(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
    ngx_str_t                           *value;
    ngx_redislog_conf_t                 *slcf;
    ngx_url_t                           u;
    ngx_redislog_peer_conf_t            *peer;
    u_char                              *p;
    size_t                              pass_size_len;

    slcf = (ngx_redislog_conf_t *) ngx_get_conf(cf->cycle->conf_ctx, ngx_core_redislog_module);

    value = cf->args->elts;

    ngx_memzero(&u, sizeof(ngx_url_t));

    u.url = value[2];
    u.default_port = 6379;
    u.no_resolve = 0;

    if(ngx_parse_url(cf->pool, &u) != NGX_OK) {
        ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, "%V: %s", &u.host, u.err);
        return NGX_CONF_ERROR;
    }

    if(slcf->peers == NULL) {
        slcf->peers = ngx_array_create(cf->pool, 2, sizeof(ngx_redislog_peer_conf_t));
        if (slcf->peers == NULL) {
            return NGX_CONF_ERROR;
        }
    }

    peer = ngx_array_push(slcf->peers);
    if(peer == NULL) {
        return NGX_CONF_ERROR;
    }

    peer->name = value[1];
    peer->sockaddr = u.addrs[0].sockaddr;
    peer->socklen = u.addrs[0].socklen;

    if(cf->args->nelts >= 4) {
        /*
         * Alloc space for authentication packet and create it
         */
        pass_size_len = ngx_redislog_size_len(value[3].len);

        peer->password.len = sizeof(NGX_REDIS_AUTH)-1 + 1 + pass_size_len
            + sizeof(CRLF)-1 + value[3].len + sizeof(CRLF)-1;

        peer->password.data = ngx_palloc(cf->pool, peer->password.len);
        if(peer->password.data == NULL) {
            return NGX_CONF_ERROR;
        }

        p = ngx_copy(peer->password.data, NGX_REDIS_AUTH, sizeof(NGX_REDIS_AUTH)-1);    
        
        *p++ = '$';

        p = ngx_redislog_size(p, p + pass_size_len, value[3].len);

        p = ngx_copy(p, CRLF, sizeof(CRLF)-1);

        p = ngx_copy(p, value[3].data, value[3].len);

        p = ngx_copy(p, CRLF, sizeof(CRLF)-1);

        peer->authenticate = 1;
    }

    peer->write_timeout     = 30000;
    peer->read_timeout      = 30000;
    peer->connect_timeout   = 30000;
    peer->reconnect_timeout = 5000;
    peer->flush_timeout     = 2000;
    peer->ping_timeout      = 30000;

    peer->bufs.num          = 200;
    peer->bufs.size         = 2048;
    peer->recv_buf_size     = 1024;

    return NGX_CONF_OK;
}

static ngx_int_t
ngx_http_redislog_init(ngx_conf_t *cf)
{
    ngx_http_core_main_conf_t    *cmcf;
    ngx_http_handler_pt          *h;

    cmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_core_module);

    h = ngx_array_push(&cmcf->phases[NGX_HTTP_LOG_PHASE].handlers);
    if (h == NULL) {
        return NGX_ERROR;
    }

    *h = ngx_http_redislog_handler;

    return NGX_OK;
}

static ngx_int_t
ngx_redislog_init_process(ngx_cycle_t *cycle)
{
    ngx_int_t                           rc;
    ngx_redislog_conf_t                *slcf;
    ngx_uint_t                          i;
    ngx_redislog_peer_conf_t           *pc;
    ngx_redislog_peer_t                *peer, **ppeer;

    slcf = (ngx_redislog_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_redislog_module);

    if(slcf->peers == NULL || slcf->peers->nelts == 0) {
        return NGX_OK;
    }

    rc = ngx_array_init(&ngx_redislog_peers, cycle->pool,
        slcf->peers->nelts, sizeof(ngx_redislog_peer_t*));

    if(rc != NGX_OK) {
        return rc;
    }

    pc = slcf->peers->elts;

    for(i = 0; i < slcf->peers->nelts; i++) {
        ppeer = ngx_array_push(&ngx_redislog_peers);

        if(ppeer == NULL) {
            return NGX_ERROR;
        }

        peer = ngx_pcalloc(cycle->pool, sizeof(ngx_redislog_peer_t));

        if(peer == NULL) {
            return NGX_ERROR;
        }

        peer->free = ngx_create_chain_of_bufs(cycle->pool, &pc[i].bufs);

        if(peer->free == NULL) {
            return NGX_ERROR;
        }

        peer->recv_buf = ngx_create_temp_buf(cycle->pool, pc[i].recv_buf_size);

        if(peer->recv_buf == NULL) {
            return NGX_HTTP_INTERNAL_SERVER_ERROR;
        }

        *ppeer = peer;

        peer->pool = cycle->pool;
        peer->conf = &pc[i];
        peer->log = cycle->log;

        peer->reconnect_timeout = pc[i].reconnect_timeout;

        ngx_redislog_connect_peer(peer);
    }
    
    return NGX_OK;
}




Пример конфигурации (пишем с ключом «$remote_addr^$status^$connection^$connection_requests^$msec» значение «$http_user_agent», которое будет «жить» 20 секунд):
root@redis:~ # cat /usr/local/etc/nginx/nginx.conf | egrep "(access_redislog|log_format)" | grep -v "#"
log_format  main  '$http_user_agent';
access_redislog test $remote_addr^$status^$connection^$connection_requests^$msec command=SETEX arg1=20;

Мониторим в redis:
root@redis:~ # redis-cli
redis 127.0.0.1:6379> monitor
OK
1389288187.176047 [0 78.72.78.56:31865] "SETEX" "78.72.78.209^200^9^1^1389288185.175" "20" "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36" 


Решаем аналогичную задачу с помощью EVALSHA (просто для примера, конечно, так делать не надо):
Создаём ключ:
# sha1 -s "return {redis.call('SET',KEYS[1],ARGV[1]); redis.call('EXPIRE',KEYS[1],20);}"
SHA1 ("return {redis.call('SET',KEYS[1],ARGV[1]); redis.call('EXPIRE',KEYS[1],20);}") = c82db723c86778baa89099e1b65ebf21cb48ce34
# cat /usr/local/etc/nginx/nginx.conf | egrep "(access_redislog|log_format)" | grep -v "#"
log_format  main  '$http_user_agent';
access_redislog test $remote_addr^$status^$connection^$connection_requests^$msec command=EVALSHA arg1=c82db723c86778baa89099e1b65ebf21cb48ce34;


Мониторим:
1389290907.814024 [0 78.72.78.56:21435] "EVALSHA" "c82db723c86778baa89099e1b65ebf21cb48ce34" "1" "78.72.78.209^200^9^2^1389290905.813" "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36"
1389290907.814091 [0 lua] "SET" "78.72.78.209^200^9^2^1389290905.813" "Mozilla/5.0 (Windows NT 6.3; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/31.0.1650.63 Safari/537.36"
1389290907.814132 [0 lua] "EXPIRE" "78.72.78.209^200^9^2^1389290905.813" "20"


Обе операции (SETEX и EVALSHA) атомарны, но, естественно, при наличии нативного SETEX дёргать LUA для таких задач неразумно.
Для наглядности пару стресс-тестов:

С использованием нативного SETEX
ab
# ab -n 100000 -c 4 -v 0 http://127.0.0.1:80/
This is ApacheBench, Version 2.3 <$Revision: 655654 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
Completed 10000 requests
Completed 20000 requests
Completed 30000 requests
Completed 40000 requests
Completed 50000 requests
Completed 60000 requests
Completed 70000 requests
Completed 80000 requests
Completed 90000 requests
Completed 100000 requests
Finished 100000 requests


Server Software:        nginx/1.4.3
Server Hostname:        127.0.0.1
Server Port:            80

Document Path:          /
Document Length:        612 bytes

Concurrency Level:      4
Time taken for tests:   4.338 seconds
Complete requests:      100000
Failed requests:        0
Write errors:           0
Total transferred:      84400000 bytes
HTML transferred:       61200000 bytes
Requests per second:    23050.90 [#/sec] (mean)
Time per request:       0.174 [ms] (mean)
Time per request:       0.043 [ms] (mean, across all concurrent requests)
Transfer rate:          18998.99 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.0      0       0
Processing:     0    0   0.1      0       1
Waiting:        0    0   0.1      0       1
Total:          0    0   0.1      0       1

Percentage of the requests served within a certain time (ms)
  50%      0
  66%      0
  75%      0
  80%      0
  90%      0
  95%      0
  98%      0
  99%      0
 100%      1 (longest request)



С использованием SETEX реализованного через EVAL
ab
# ab -n 100000 -c 4 -v 0 http://127.0.0.1:80/
This is ApacheBench, Version 2.3 <$Revision: 655654 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
Completed 10000 requests
Completed 20000 requests
Completed 30000 requests
Completed 40000 requests
Completed 50000 requests
Completed 60000 requests
Completed 70000 requests
Completed 80000 requests
Completed 90000 requests
Completed 100000 requests
Finished 100000 requests


Server Software:        nginx/1.4.3
Server Hostname:        127.0.0.1
Server Port:            80

Document Path:          /
Document Length:        612 bytes

Concurrency Level:      4
Time taken for tests:   59.520 seconds
Complete requests:      100000
Failed requests:        1413
   (Connect: 0, Receive: 0, Length: 1413, Exceptions: 0)
Write errors:           0
Total transferred:      83207428 bytes
HTML transferred:       60335244 bytes
Requests per second:    1680.12 [#/sec] (mean)
Time per request:       2.381 [ms] (mean)
Time per request:       0.595 [ms] (mean, across all concurrent requests)
Transfer rate:          1365.22 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   1.1      0     251
Processing:     0    2  13.0      0     251
Waiting:        0    2  12.9      0     251
Total:          0    2  13.0      0     253

Percentage of the requests served within a certain time (ms)
  50%      0
  66%      0
  75%      1
  80%      1
  90%      1
  95%      1
  98%     44
  99%     77
 100%    253 (longest request)



Ну и с записью просто в файл (в память)
ab
# ab -n 100000 -c 4 -v 0 http://127.0.0.1:80/
This is ApacheBench, Version 2.3 <$Revision: 655654 $>
Copyright 1996 Adam Twiss, Zeus Technology Ltd, http://www.zeustech.net/
Licensed to The Apache Software Foundation, http://www.apache.org/

Benchmarking 127.0.0.1 (be patient)
Completed 10000 requests
Completed 20000 requests
Completed 30000 requests
Completed 40000 requests
Completed 50000 requests
Completed 60000 requests
Completed 70000 requests
Completed 80000 requests
Completed 90000 requests
Completed 100000 requests
Finished 100000 requests


Server Software:        nginx/1.4.3
Server Hostname:        127.0.0.1
Server Port:            80

Document Path:          /
Document Length:        612 bytes

Concurrency Level:      4
Time taken for tests:   4.544 seconds
Complete requests:      100000
Failed requests:        0
Write errors:           0
Total transferred:      84400000 bytes
HTML transferred:       61200000 bytes
Requests per second:    22007.62 [#/sec] (mean)
Time per request:       0.182 [ms] (mean)
Time per request:       0.045 [ms] (mean, across all concurrent requests)
Transfer rate:          18139.09 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    0   0.0      0       0
Processing:     0    0   0.0      0       1
Waiting:        0    0   0.0      0       1
Total:          0    0   0.0      0       1

Percentage of the requests served within a certain time (ms)
  50%      0
  66%      0
  75%      0
  80%      0
  90%      0
  95%      0
  98%      0
  99%      0
 100%      1 (longest request)



Тем самым вышеописанная реализация хранения с SETEX позволила добиться равных результатов с хранением в файле плюс мы получили авторотацию логов без каких-либо особенных затрат.
Теги:nginxredisddos
Хабы: Высокая производительность NoSQL
+9
16,6k 93
Комментарии 40
Лучшие публикации за сутки