Pull to refresh

Кроссплатформенный многопоточный TCP/IP сервер на C++

Reading time 16 min
Views 84K

Решил задаться целью написать простой в использовании и при этом быстрый многопоточного TCP/IP сервера на C++ и при этом кроссплатформенный — как минимум чтобы работал на платформах Windows и Linux без требования как-либо изменять код за пределами самописной библиотеки. Ранее, на чистом C++ без библиотек вроде Qt, сетевым программировнием не занимался, и предвещал себе долгое время мучений с платформо-зависимостью. Но как оказалось всё гораздо проще чем казалось на первый взгляд, ведь в основном интерфейсы сокетов обоих систем похожи как две капли воды и различаются лишь в мелких деталях.


Для начала определим общий для клиента и сервера заголовок:


general.h


#ifndef GENERAL_H
#define GENERAL_H

#ifdef _WIN32
#else
#define SD_BOTH 0
#endif

#include <cstdint>
#include <cstring>
#include <cinttypes>
#include <malloc.h> 

// IP 127.0.0.1
uint32_t LOCALHOST_IP = 0x0100007f;

// Код состояния сокета
enum class SocketStatus : uint8_t {
  connected = 0,
  err_socket_init = 1,
  err_socket_bind = 2,
  err_socket_connect = 3,
  disconnected = 4
};

// Буффер данных куда у нас будет приниматься данные от другой стороны
struct DataBuffer {
  int size = 0;
  void* data_ptr = nullptr;

  DataBuffer() = default;
  DataBuffer(int size, void* data_ptr) : size(size), data_ptr(data_ptr) {}
  DataBuffer(const DataBuffer& other) : size(other.size), data_ptr(malloc(size)) {memcpy(data_ptr, other.data_ptr, size);}
  DataBuffer(DataBuffer&& other) : size(other.size), data_ptr(other.data_ptr) {other.data_ptr = nullptr;}
  ~DataBuffer() {if(data_ptr) free(data_ptr); data_ptr = nullptr;}

  bool isEmpty() {return !data_ptr || !size;}
  operator bool() {return data_ptr && size;}
};
// в последней версии библиотеки typedef от std::vector<uint8_t>

// Тип сокета
enum class SocketType : uint8_t {
  client_socket = 0,
  server_socket = 1
};

// Базовый класс TCP клиента
class TcpClientBase {
public:
  typedef SocketStatus status;
  virtual ~TcpClientBase() {};
  virtual status disconnect() = 0;
  virtual status getStatus() const = 0;
  virtual bool sendData(const void* buffer, const size_t size) const = 0;
  virtual DataBuffer loadData() = 0;
  virtual uint32_t getHost() const = 0;
  virtual uint16_t getPort() const = 0;
  virtual SocketType getType() const = 0;
};

#endif // GENERAL_H 

Итак интерфейсы классов сервера и клиента(со стороны сервера) выглядят следующим образом:


TcpServer.h


#ifndef TCPSERVER_H
#define TCPSERVER_H

#include <functional>
#include <list>

#include <thread>
#include <mutex>
#include <shared_mutex>

#ifdef _WIN32 // Windows NT

#include <WinSock2.h>
#include <mstcpip.h>

#else // *nix

#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>

#endif

#include "general.h"

#ifdef _WIN32 // Windows NT
typedef int SockLen_t;
typedef SOCKADDR_IN SocketAddr_in;
typedef SOCKET Socket;
typedef u_long ka_prop_t;
#else // POSIX
typedef socklen_t SockLen_t;
typedef struct sockaddr_in SocketAddr_in;
typedef int Socket;
typedef int ka_prop_t;
#endif

// Конфигурация Keep-Alive соединения
struct KeepAliveConfig{
  ka_prop_t ka_idle = 120;
  ka_prop_t ka_intvl = 3;
  ka_prop_t ka_cnt = 5;
};

// Класс Tcp сервера
struct TcpServer {
  // Класс клиента сервера (реализация определена ниже)
  struct Client;
  // Тип обработчик данных клиента
  typedef std::function<void(DataBuffer, Client&)> handler_function_t;
  // Тип обработчика подключения/отсоединения клиента
  typedef std::function<void(Client&)> con_handler_function_t;

  // Коды статуса сервера
  enum class status : uint8_t {
    up = 0,
    err_socket_init = 1,
    err_socket_bind = 2,
    err_scoket_keep_alive = 3,
    err_socket_listening = 4,
    close = 5
  };

private:
  // Сокет сервера
  Socket serv_socket;
  // Порт сервера
  uint16_t port;
  // Код статуса
  status _status = status::close;
  // Обработчик данных от клиента
  handler_function_t handler;
  // Обработчик подключения клиента
  con_handler_function_t connect_hndl = [](Client&){};
  // Обработчик отсоединения клиента
  con_handler_function_t disconnect_hndl = [](Client&){};
  // Поток-обработчик подключений
  std::thread accept_handler_thread;
  // Поток ожидания данных
  std::thread data_waiter_thread;
  // Тип итератора клиента
  typedef std::list<std::unique_ptr<Client>>::iterator ClientIterator;

  // Keep-Alive конфигурация
  KeepAliveConfig ka_conf;

  // Список клиентов
  std::list<std::unique_ptr<Client>> client_list;
  // Мьютекс для синзронизации потоков подключения и ожидания данных
  std::mutex client_mutex; 

  // Для систем Windows так же требуется
  // структура определяющая версию WinSocket
#ifdef _WIN32 // Windows NT
  WSAData w_data;
#endif

  // Включить Keep-Alive для сокета
  bool enableKeepAlive(Socket socket);
  // Метод обработчика подключений
  void handlingAcceptLoop();
  // Метод ожидания данных
  void waitingDataLoop();

public:
  // Упрощённый конструктор с указанием:
  // * порта
  // * обработчика данных
  // * конфигурации Keep-Alive
  TcpServer(const uint16_t port,
            handler_function_t handler,
            KeepAliveConfig ka_conf = {});
  // Конструктор с указанием:
  // * порта
  // * обработчика данных
  // * обработчика подключений
  // * обработчика отключений
  // * конфигурации Keep-Alive
  TcpServer(const uint16_t port,
            handler_function_t handler,
            con_handler_function_t connect_hndl,
            con_handler_function_t disconnect_hndl,
            KeepAliveConfig ka_conf = {});

  // Деструктор
  ~TcpServer();

  // Заменить обработчик данных
  void setHandler(handler_function_t handler);
  // Getter порта
  uint16_t getPort() const;
  // Setter порта
  uint16_t setPort(const uint16_t port);
  // Getter кода статуса сервера
  status getStatus() const {return _status;}
  // Метод запуска сервера
  status start();
  // Метод остановки сервера
  void stop();
  // Метод для входа присоединения циклических потоков сервера
  void joinLoop();

  // Исходящее подключение от сервера к другому серверу
  bool connectTo(uint32_t host, uint16_t port, con_handler_function_t connect_hndl);

  // Отправить данные всем клиентам сервера
  void sendData(const void* buffer, const size_t size);
  // Отправить данные клиенту по порту и хосту
  bool sendDataBy(uint32_t host, uint16_t port, const void* buffer, const size_t size);
  // Отключить клиента по порту и хосту
  bool disconnectBy(uint32_t host, uint16_t port);
  // Отключить всех клиентов
  void disconnectAll();
};

// Класс клиента (со стороны сервера)
struct TcpServer::Client : public TcpClientBase {
  friend struct TcpServer;

  // Мьютекс для синхронизации обработки данныз
  std::mutex access_mtx;
  // Адрес клиента
  SocketAddr_in address;
  // Сокет слиента
  Socket socket;
  // Код статуса клиента
  status _status = status::connected;

public:
  // Конструктор с указанием:
  // * сокета клиента
  // * адреса клиента
  Client(Socket socket, SocketAddr_in address);
  // Деструктор
  virtual ~Client() override;
  // Getter хоста
  virtual uint32_t getHost() const override;
  // Getter порта
  virtual uint16_t getPort() const override;
  // Getter кода статуса подключения
  virtual status getStatus() const override {return _status;}
  // Отключить клиента
  virtual status disconnect() override;
  // Получить данные от клиента
  virtual DataBuffer loadData() override;
  // Отправить данные клиенту
  virtual bool sendData(const void* buffer, const size_t size) const override;
  // Определить "сторону" клиента
  virtual SocketType getType() const override {return SocketType::server_socket;}
};

#endif // TCPSERVER_H

Как можно заметить на данном этапи зависимости операционных систем без особых проблем решаются при помощи макросов и псевдонимов типов данных. Так же в Windows части TcpServer-хедера присутствует структура для обозначения используемой версии WinSocket — WSAData w_data;(см. WSAData)


Перейдём к реализации сервера:


TcpServer.cpp


#include "../include/TcpServer.h"
#include <chrono>
#include <cstring>
#include <mutex>

#ifdef _WIN32
// Макросы для выражений зависимых от OS
#define WIN(exp) exp
#define NIX(exp)

// Конвертировать WinSocket код ошибки в Posix код ошибки
inline int convertError() {
    switch (WSAGetLastError()) {
    case 0:
        return 0;
    case WSAEINTR:
        return EINTR;
    case WSAEINVAL:
        return EINVAL;
    case WSA_INVALID_HANDLE:
        return EBADF;
    case WSA_NOT_ENOUGH_MEMORY:
        return ENOMEM;
    case WSA_INVALID_PARAMETER:
        return EINVAL;
    case WSAENAMETOOLONG:
        return ENAMETOOLONG;
    case WSAENOTEMPTY:
        return ENOTEMPTY;
    case WSAEWOULDBLOCK:
        return EAGAIN;
    case WSAEINPROGRESS:
        return EINPROGRESS;
    case WSAEALREADY:
        return EALREADY;
    case WSAENOTSOCK:
        return ENOTSOCK;
    case WSAEDESTADDRREQ:
        return EDESTADDRREQ;
    case WSAEMSGSIZE:
        return EMSGSIZE;
    case WSAEPROTOTYPE:
        return EPROTOTYPE;
    case WSAENOPROTOOPT:
        return ENOPROTOOPT;
    case WSAEPROTONOSUPPORT:
        return EPROTONOSUPPORT;
    case WSAEOPNOTSUPP:
        return EOPNOTSUPP;
    case WSAEAFNOSUPPORT:
        return EAFNOSUPPORT;
    case WSAEADDRINUSE:
        return EADDRINUSE;
    case WSAEADDRNOTAVAIL:
        return EADDRNOTAVAIL;
    case WSAENETDOWN:
        return ENETDOWN;
    case WSAENETUNREACH:
        return ENETUNREACH;
    case WSAENETRESET:
        return ENETRESET;
    case WSAECONNABORTED:
        return ECONNABORTED;
    case WSAECONNRESET:
        return ECONNRESET;
    case WSAENOBUFS:
        return ENOBUFS;
    case WSAEISCONN:
        return EISCONN;
    case WSAENOTCONN:
        return ENOTCONN;
    case WSAETIMEDOUT:
        return ETIMEDOUT;
    case WSAECONNREFUSED:
        return ECONNREFUSED;
    case WSAELOOP:
        return ELOOP;
    case WSAEHOSTUNREACH:
        return EHOSTUNREACH;
    default:
        return EIO;
    }
}

#else
// Макросы для выражений зависимых от OS
#define WIN(exp)
#define NIX(exp) exp
#endif

// Реализация конструктора сервера с указанием
// * порта
// * обработчика данных
// * Keep-Alive конфигурации
TcpServer::TcpServer(const uint16_t port,
                     handler_function_t handler,
                     KeepAliveConfig ka_conf)
  : TcpServer(port, handler, [](Client&){}, [](Client&){}, ka_conf) {}

// Реализация конструктора сервера с указанием
// * порта
// * обработчика данных
// * обработчика подключения
// * обработчика отключения
// * Keep-Alive конфигурации
TcpServer::TcpServer(const uint16_t port,
                     handler_function_t handler,
                     con_handler_function_t connect_hndl,
                     con_handler_function_t disconnect_hndl,
                     KeepAliveConfig ka_conf)
  : port(port), handler(handler), connect_hndl(connect_hndl), disconnect_hndl(disconnect_hndl), ka_conf(ka_conf) {}

// Деструктор сервера
// автоматически закрывает сокет сервера 
TcpServer::~TcpServer() {
  if(_status == status::up)
    stop();
    WIN(WSACleanup());
}

// Setter обработчика данных
void TcpServer::setHandler(TcpServer::handler_function_t handler) {this->handler = handler;}

// Getter порта
uint16_t TcpServer::getPort() const {return port;}
// Setter порта
uint16_t TcpServer::setPort( const uint16_t port) {
    this->port = port;
    start();
    return port;
}

// Реализация запуска сервера
TcpServer::status TcpServer::start() {
  int flag;
  // Если сервер запущен, то отключаем его
  if(_status == status::up) stop();

  // Для Windows указываем версию WinSocket
  WIN(if(WSAStartup(MAKEWORD(2, 2), &w_data) == 0) {})

  // Задаём адрес сервера
  SocketAddr_in address;
  // INADDR_ANY - любой IP адрес
  address.sin_addr
      WIN(.S_un.S_addr)NIX(.s_addr) = INADDR_ANY;
  // Задаём порт сервера
  address.sin_port = htons(port);
  // Семейство сети AF_INET - IPv4 (AF_INET6 - IPv6)
  address.sin_family = AF_INET;

  // Создаём TCP сокет
  if((serv_socket = socket(AF_INET, SOCK_STREAM, 0)) WIN(== INVALID_SOCKET)NIX(== -1))
     return _status = status::err_socket_init;

  flag = true;
  // Устанавливаем параметр сокета SO_REUSEADDR в true (подробнее https://it.wikireading.ru/7093)
  if((setsockopt(serv_socket, SOL_SOCKET, SO_REUSEADDR, WIN((char*))&flag, sizeof(flag)) == -1) ||
     // Привязываем к сокету адрес и порт
     (bind(serv_socket, (struct sockaddr*)&address, sizeof(address)) WIN(== SOCKET_ERROR)NIX(< 0)))
     return _status = status::err_socket_bind;

  // Активируем ожидание фходящих соединений
  if(listen(serv_socket, SOMAXCONN) WIN(== SOCKET_ERROR)NIX(< 0))
    return _status = status::err_socket_listening;

  _status = status::up;
  // Запускаем поток ожидания соединений
  accept_handler_thread = std::thread([this]{handlingAcceptLoop();});
  // Запускаем поток ожидания данных
  data_waiter_thread = std::thread([this]{waitingDataLoop();});
  return _status;
}

// Реализация остановки сервера
void TcpServer::stop() {
  _status = status::close;
  // Закрываем сокет
  WIN(closesocket)NIX(close)(serv_socket);
  // Ожидаем завершения потоков
  joinLoop();
  // Вычищаем список клиентов
  client_list.clear();
}

// "Вхождение" в потоки ожидания
void TcpServer::joinLoop() {accept_handler_thread.join(); data_waiter_thread.join();}

// Создание подключение со стороны сервера
// (подключение аналогично клиентоскому, но обрабатывается
// тем же обработчиком, что и входящие соединения)
bool TcpServer::connectTo(uint32_t host, uint16_t port, con_handler_function_t connect_hndl) {
  Socket client_socket;
  SocketAddr_in address;
  // Создание TCP сокета
  if((client_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_IP)) WIN(== INVALID_SOCKET) NIX(< 0)) return false;

  new(&address) SocketAddr_in;
  address.sin_family = AF_INET;
  address.sin_addr.s_addr = host;
  WIN(address.sin_addr.S_un.S_addr = host;)
  NIX(address.sin_addr.s_addr = host;)

  address.sin_port = htons(port);

  // Установка соединения
  if(connect(client_socket, (sockaddr *)&address, sizeof(address))
     WIN(== SOCKET_ERROR)NIX(!= 0)
     ) {
    WIN(closesocket(client_socket);)NIX(close(client_socket);)
    return false;
  }

  // Активация Keep-Alive
  if(!enableKeepAlive(client_socket)) {
    shutdown(client_socket, 0);
    WIN(closesocket)NIX(close)(client_socket)
  }

  std::unique_ptr<Client> client(new Client(client_socket, address));
  // Запуск обработчика подключения
  connect_hndl(*client);
  // Добавление клиента в список клиентов
  client_mutex.lock();
  client_list.emplace_back(std::move(client));
  client_mutex.unlock();
  return true;
}

// Отправка данных всем клиентам
void TcpServer::sendData(const void* buffer, const size_t size) {
  for(std::unique_ptr<Client>& client : client_list)
    client->sendData(buffer, size);
}

// Отправка данных по конкретному хосту и порту
bool TcpServer::sendDataBy(uint32_t host, uint16_t port, const void* buffer, const size_t size) {
  bool data_is_sended = false;
  for(std::unique_ptr<Client>& client : client_list)
    if(client->getHost() == host &&
       client->getPort() == port) {
      client->sendData(buffer, size);
      data_is_sended = true;
    }
  return data_is_sended;
}

// Отключение клиента по конкретному хосту и порту
bool TcpServer::disconnectBy(uint32_t host, uint16_t port) {
  bool client_is_disconnected = false;
  for(std::unique_ptr<Client>& client : client_list)
    if(client->getHost() == host &&
       client->getPort() == port) {
      client->disconnect();
      client_is_disconnected = true;
    }
  return client_is_disconnected;
}

// Отключение всех клиентов
void TcpServer::disconnectAll() {
  for(std::unique_ptr<Client>& client : client_list)
    client->disconnect();
}

// Цикл обработки входящих подключений
// (исполняется в отдельном потоке)
void TcpServer::handlingAcceptLoop() {
  SockLen_t addrlen = sizeof(SocketAddr_in);
  // Пока сервер запущен
  while (_status == status::up) {
    SocketAddr_in client_addr;
    // Принятеи новго подключения (блокирующи вызов)
    if (Socket client_socket = accept(serv_socket, (struct sockaddr*)&client_addr, &addrlen);
        client_socket WIN(!= 0)NIX(>= 0) && _status == status::up) {
      // Если получен сокет с ошибкой продолжить ожидание
      if(client_socket == WIN(INVALID_SOCKET)NIX(-1)) continue;

      // Активировать Keep-Alive для клиента
      if(!enableKeepAlive(client_socket)) {
        shutdown(client_socket, 0);
        WIN(closesocket)NIX(close)(client_socket);
      }

      std::unique_ptr<Client> client(new Client(client_socket, client_addr));
      // Запустить обработчик подключений
      connect_hndl(*client);
      // Добавить клиента в список клиентов
      client_mutex.lock();
      client_list.emplace_back(std::move(client));
      client_mutex.unlock();
    }
  }

}

// Цикл ожидания данных
void TcpServer::waitingDataLoop() {
  using namespace std::chrono_literals;
  while (true) {
    client_mutex.lock();
    // Перебрать всех клиентов
    for(auto it = client_list.begin(), end = client_list.end(); it != end; ++it) {
      auto& client = *it;
      // Если unique_ptr содержит объект клиента
      if(client){
        if(DataBuffer data = client->loadData(); data.size) {
          // При наличии данных запустить обработку входящих данных в отдельном потоке
          std::thread([this, _data = std::move(data), &client]{
            client->access_mtx.lock();
            handler(_data, *client);
            client->access_mtx.unlock();
          }).detach();
        } else if(client->_status == SocketStatus::disconnected) {
          // При отключении клиента запустить обработку в отдельном потоке
          std::thread([this, &client, it]{
            // Извлечь объект клиента из unique_ptr в списке
            client->access_mtx.lock();
            Client* pointer = client.release();
            client = nullptr;
            pointer->access_mtx.unlock();
            // Запуск обработчика отключения
            disconnect_hndl(*pointer);
            // Удалить элемент клиента из списка
            client_list.erase(it);
            // Удалить объект клиента
            delete pointer;
          }).detach();
        }
      }
    }
    client_mutex.unlock();
    // Ожидание 50 млисекунд так как в данном потоке
    // не содержится блокирующих вызовов и данный
    // цикл сильно повышает загруженность CPU
    std::this_thread::sleep_for(50ms);
  }
}

// Функция запуска и конфигурации Keep-Alive для сокета
bool TcpServer::enableKeepAlive(Socket socket) {
  int flag = 1;
#ifdef _WIN32
  tcp_keepalive ka {1, ka_conf.ka_idle * 1000, ka_conf.ka_intvl * 1000};
  if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, (const char *) &flag, sizeof(flag)) != 0) return false;
  unsigned long numBytesReturned = 0;
  if(WSAIoctl(socket, SIO_KEEPALIVE_VALS, &ka, sizeof (ka), nullptr, 0, &numBytesReturned, 0, nullptr) != 0) return false;
#else //POSIX
  if(setsockopt(socket, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) == -1) return false;
  if(setsockopt(socket, IPPROTO_TCP, TCP_KEEPIDLE, &ka_conf.ka_idle, sizeof(ka_conf.ka_idle)) == -1) return false;
  if(setsockopt(socket, IPPROTO_TCP, TCP_KEEPINTVL, &ka_conf.ka_intvl, sizeof(ka_conf.ka_intvl)) == -1) return false;
  if(setsockopt(socket, IPPROTO_TCP, TCP_KEEPCNT, &ka_conf.ka_cnt, sizeof(ka_conf.ka_cnt)) == -1) return false;
#endif
  return true;
}

Реализация для Linux и Windows практически идентична за исключением некотрых мест который без проблем обкладываются макросами. Теперь же мы перейдём непосредственно к реализации класса клиента:


TcpServerClient.cpp


#include "../include/TcpServer.h"

#ifdef _WIN32
// Макросы для выражений зависимых от OS
#define WIN(exp) exp
#define NIX(exp)

// Конвертировать WinSocket код ошибки в Posix код ошибки
inline int convertError() {
    switch (WSAGetLastError()) {
    case 0:
        return 0;
    case WSAEINTR:
        return EINTR;
    case WSAEINVAL:
        return EINVAL;
    case WSA_INVALID_HANDLE:
        return EBADF;
    case WSA_NOT_ENOUGH_MEMORY:
        return ENOMEM;
    case WSA_INVALID_PARAMETER:
        return EINVAL;
    case WSAENAMETOOLONG:
        return ENAMETOOLONG;
    case WSAENOTEMPTY:
        return ENOTEMPTY;
    case WSAEWOULDBLOCK:
        return EAGAIN;
    case WSAEINPROGRESS:
        return EINPROGRESS;
    case WSAEALREADY:
        return EALREADY;
    case WSAENOTSOCK:
        return ENOTSOCK;
    case WSAEDESTADDRREQ:
        return EDESTADDRREQ;
    case WSAEMSGSIZE:
        return EMSGSIZE;
    case WSAEPROTOTYPE:
        return EPROTOTYPE;
    case WSAENOPROTOOPT:
        return ENOPROTOOPT;
    case WSAEPROTONOSUPPORT:
        return EPROTONOSUPPORT;
    case WSAEOPNOTSUPP:
        return EOPNOTSUPP;
    case WSAEAFNOSUPPORT:
        return EAFNOSUPPORT;
    case WSAEADDRINUSE:
        return EADDRINUSE;
    case WSAEADDRNOTAVAIL:
        return EADDRNOTAVAIL;
    case WSAENETDOWN:
        return ENETDOWN;
    case WSAENETUNREACH:
        return ENETUNREACH;
    case WSAENETRESET:
        return ENETRESET;
    case WSAECONNABORTED:
        return ECONNABORTED;
    case WSAECONNRESET:
        return ECONNRESET;
    case WSAENOBUFS:
        return ENOBUFS;
    case WSAEISCONN:
        return EISCONN;
    case WSAENOTCONN:
        return ENOTCONN;
    case WSAETIMEDOUT:
        return ETIMEDOUT;
    case WSAECONNREFUSED:
        return ECONNREFUSED;
    case WSAELOOP:
        return ELOOP;
    case WSAEHOSTUNREACH:
        return EHOSTUNREACH;
    default:
        return EIO;
    }
}

#else
// Макросы для выражений зависимых от OS
#define WIN(exp)
#define NIX(exp) exp
#endif

#include <iostream>

// Реализация загрузки данных
DataBuffer TcpServer::Client::loadData() {
  // Если клиент не подключён вернуть пустой буффер
  if(_status != SocketStatus::connected) return DataBuffer();
  using namespace std::chrono_literals;
  DataBuffer buffer;
  int err;

  // Чтение длинный данных в неблокирующем режиме
  // MSG_DONTWAIT - Unix флаг неблокирующего режима для recv
  // FIONBIO - Windows-флаг неблокирующего режима для ioctlsocket
  WIN(if(u_long t = true; SOCKET_ERROR == ioctlsocket(socket, FIONBIO, &t)) return DataBuffer();)
  int answ = recv(socket, (char*)&buffer.size, sizeof (buffer.size), NIX(MSG_DONTWAIT)WIN(0));

  // Обработка отключения
  if(!answ) {
    disconnect();
    return DataBuffer();
  } else if(answ == -1) {
    // Чтение кода ошибки
    WIN(
      err = convertError();
      if(!err) {
        SockLen_t len = sizeof (err);
        getsockopt (socket, SOL_SOCKET, SO_ERROR, WIN((char*))&err, &len);
      }
    )NIX(
      SockLen_t len = sizeof (err);
      getsockopt (socket, SOL_SOCKET, SO_ERROR, WIN((char*))&err, &len);
      if(!err) err = errno;
    )

    // Отключение неблокирующего режима для Windows
    WIN(if(u_long t = false; SOCKET_ERROR == ioctlsocket(socket, FIONBIO, &t)) return DataBuffer();) 

    // Обработка ошибки при наличии
    switch (err) {
      case 0: break;
        // Keep alive timeout
      case ETIMEDOUT:
      case ECONNRESET:
      case EPIPE:
        disconnect();
        [[fallthrough]];
        // No data
      case EAGAIN: return DataBuffer();
      default:
        disconnect();
        std::cerr << "Unhandled error!\n"
                    << "Code: " << err << " Err: " << std::strerror(err) << '\n';
      return DataBuffer();
    }
  }

  // Если прочитанный размер нулевой, то вернуть пустой буффер
  if(!buffer.size) return DataBuffer();
  // Если размер не нулевой выделить буффер в куче для чтения данных
  buffer.data_ptr = (char*)malloc(buffer.size);
  // Чтение данных в блокирующем режиме
  recv(socket, (char*)buffer.data_ptr, buffer.size, 0);
  // Возврат буффера с прочитанными данными
  return buffer;
}

// Обработка отключения клиента
TcpClientBase::status TcpServer::Client::disconnect() {
  _status = status::disconnected;
  // Если сокет не валидный прекратить обработку
  if(socket == WIN(INVALID_SOCKET)NIX(-1)) return _status;
  // Отключение сокета
  shutdown(socket, SD_BOTH)
  // Закрытие сокета
  WIN(closesocket)NIX(close)(socket);
  // Установление в сокета не валидного значения
  socket = WIN(INVALID_SOCKET)NIX(-1);
  return _status;
}

// Отправка данных
bool TcpServer::Client::sendData(const void* buffer, const size_t size) const {
  // Если сокет закрыт вернуть false
  if(_status != SocketStatus::connected) return false;
  // Сформировать сообщение с длинной в начале
  void* send_buffer = malloc(size + sizeof (int));
  memcpy(reinterpret_cast<char*>(send_buffer) + sizeof(int), buffer, size);
  *reinterpret_cast<int*>(send_buffer) = size;

  // Отправить сообщение
  if(send(socket, reinterpret_cast<char*>(send_buffer), size + sizeof (int), 0) < 0) return false;
  // Вычистить буффер сообщения
  free(send_buffer);
  return true;
}

// Конструктор клиента
TcpServer::Client::Client(Socket socket, SocketAddr_in address)
  : address(address), socket(socket) {}

// Деструктор клиента с закрытием сокета
TcpServer::Client::~Client() {
  if(socket == WIN(INVALID_SOCKET)NIX(-1)) return;
  shutdown(socket, SD_BOTH);
  WIN(closesocket(socket);)
  NIX(close(socket);)
}

// Получить хост клиента
uint32_t TcpServer::Client::getHost() const {return NIX(address.sin_addr.s_addr) WIN(address.sin_addr.S_un.S_addr);}
// Получить порт клиента
uint16_t TcpServer::Client::getPort() const {return address.sin_port;}

Пример использования:


main.cpp


#include <iostream>

// Парсинр IPv4-адреса и порта в std::string
std::string getHostStr(const TcpServer::Client& client) {
uint32_t ip = client.getHost ();
return std::string() + std::to_string(int(reinterpret_cast<char*>(&ip)[0])) + '.' +
        std::to_string(int(reinterpret_cast<char*>(&ip)[1])) + '.' +
        std::to_string(int(reinterpret_cast<char*>(&ip)[2])) + '.' +
        std::to_string(int(reinterpret_cast<char*>(&ip)[3])) + ':' +
        std::to_string( client.getPort ());
}

int main() {
    // Создание экземпляра сервера
    TcpServer server( 8080, [](DataBuffer data, TcpServer::Client& client){
        std::cout << "("<<getHostStr(client)<<")[ " << data.size << " bytes ]: " << (char*)data.data_ptr << '\n';
        client.sendData("Hello, client!", sizeof("Hello, client!"));
        }, {1, 1, 1}); // Keep alive{ожидание:1s, интервал: 1s, кол-во пакетов: 1};

    // Запуск сервера
    if(server.start() == TcpServer::status::up) {
        std::cout << "Server is up!" << std::endl;
        server.joinLoop(); //Joing to the client handling loop
    } else {
        std::cout << "Server start error! Error code:" << int(server.getStatus()) << std::endl;
        return -1;
    }

}

Источники:



UPDATE: Реализация многопоточного TCP-сервера представленная в данной статье имеет такой недостаток как "потокове голодание" при слишком большом колличестве подключённых к серверу клиентов, по скольку для обработки каждого клиента создаётся отдельный поток, который находится в состоянии активного ожидания данных от клиента. Для уменьшения влияния данного недостатка было принято решение изменить модель организации многопоточности приложения. Новая модель представлена на следующей схеме:


image


Как можно заметить, при данной модели организации многопоточности имеется всего один поток-обходчик, который проходит все клиенты и проверяет наличие пришедших от них данных. При условии найденых данных поток-обходчик переходит в состояние обработки данных клиента предварительно создав новый поток-обходчик, который идёт дальше. После обработки данных поток-обработчик переносит объект клиента в конец очереди ожидания и самоуничтожается.


Реализацию с применением данного исправления можно увидеть в данном репозитории GitHub. Данный сервер рассчитан только для отправки бинарных данных по протоколу TCP, поскольку отправляет вместе с данными заголовок с их размером в байтах, так что для реализации HTTP-сервера данная реализация не подходит. Реализованно это именно так по причине того что TCP/IP — потоковый протокол, то есть данные идущие через TCP/IP никак не терминируются, что осложняет работу именно в случаях работы с сырыми данными, когда поток нельзя терминировать "завершающим символом", как это принято в текстовых потоках.


UPDATE 2: На текущий момент, реализация представленная в репозитории GitHub лишена недостатка с "потоковым голоданием". Операции чтения префикса размера бинарного пакета при получении его от клиента, а так же приём новых соединений переведены в неблокирующий режим, а многопоточность реализована с помощью простой реализации пула потоков.

Tags:
Hubs:
+5
Comments 40
Comments Comments 40

Articles