Java
17 January 2012

Высокопроизводительный NIO-сервер на Netty

From Sandbox
Преамбула

Здравствуйте. Я являюсь главным разработчиком крупнейшего в СНГ сервера Minecraft (не буду рекламировать, кому надо, те знают). Уже почти год мы пишем свою реализацию сервера, рассчитанную на больше чем 40 человек (мы хотим видеть цифру в 500 хотя бы). Пока всё было удачно, но последнее время система начала упираться в то, что из-за не самой удачной реализации сети (1 поток на ввод, 1 на вывод + 1 на обработку), при 300 игроках онлайн работает более 980 потоков (+ системные), что в сочетании с производительностью дефолтного io Явы даёт огромное падение производительности, и уже при 100 игроках сервер в основном занимается тем, что пишет/читает в/из сети.

Поэтому я решила переходить на NIO. В руки совершенно случайно попала библиотека Netty, структура которой показалась просто идеально подходящей для того, чтобы встроить её в уже готовое работающее решение. К сожалению, мануалов по Netty мало не только на русском, но и на английском языках, поэтому приходилось много экспериментировать и лазить в код библиотеки, чтобы найти лучший способ.

Здесь я постараюсь расписать серверную часть работы с сетью через Netty, может быть это кому-то будет полезно.

Создание сервера


ExecutorService bossExec = new OrderedMemoryAwareThreadPoolExecutor(1, 400000000, 2000000000, 60, TimeUnit.SECONDS);
ExecutorService ioExec = new OrderedMemoryAwareThreadPoolExecutor(4 /* число рабочих потоков */, 400000000, 2000000000, 60, TimeUnit.SECONDS);
ServerBootstrap networkServer = new ServerBootstrap(new NioServerSocketChannelFactory(bossExec, ioExec,  4 /* то же самое число рабочих потоков */));
networkServer.setOption("backlog", 500);
networkServer.setOption("connectTimeoutMillis", 10000);
networkServer.setPipelineFactory(new ServerPipelineFactory());
Channel channel = networkServer.bind(new InetSocketAddress(address, port));

Используется OrderedMemoryAwareThreadPoolExecutor для выполнения задач Netty, по опыту французских коллег они самые эффективные. Можно использовать другие Executor'ы, например Executors.newFixedThreadPool(n). Ни в коем случае не используйте Executors.newCachedThreadPool(), он создаёт неоправданно много потоков и ни какого выигрыша от Netty почти нет. Использовать более 4 рабочих потоков нет смысла, т.к. они более чем справляются с огромной нагрузкой (программисты из Xebia-France на 4 потоках тянули более 100 000 одновременных подключений). Босс-потоки должны быть по одному на каждый слушаемый порт. Channel, который возвращает функция bind, а так же ServerBootsrap необходимо сохранить, чтобы потом можно было остановить сервер.

PipelineFactory


То, как будут обрабатываться подключения и пакеты клиента, определяет PipelineFactory, которая при открытии канала с клиентом создаёт для него pipeline, в котором определены обработчики событий, которые происходят на канале. В нашем случае, это ServerPipelineFactory:
public class ServerPipelineFactory implements ChannelPipelineFactory {
	@Override
	public ChannelPipeline getPipeline() throws Exception {
		PacketFrameDecoder decoder = new PacketFrameDecoder();
		PacketFrameEncoder encoder = new PacketFrameEncoder();
		return Channels.pipeline(decoder, encoder, new PlayerHandler(decoder, encoder));
	}
}

В данном коде PacketFrameDecoder, PacketFrameEncoder и PlayerHandler — обработчки событий, которые мы определяем. Функция Channels.pipeline() создаёт новый pipeline с переданными ей обработчиками. Будьте внимательны: события проходят обработчики в том порядке, в котором Вы передали из функции pipeline!

Протокол


Немного опишу протокол, чтобы дальше было понятно.

Обмен данными происходит с помощью объектов классов, расширяющих класс Packet, в которых определены две функции, get(ChannelBuffer input) и send(ChannelBuffer output). Соответственно, первая функция читает необходимые данные из канала, вторая — пишет данные пакета в канал.
public abstract class Packet {
	public static Packet read(ChannelBuffer buffer) throws IOException {
		int id = buffer.readUnsignedShort(); // Получаем ID пришедшего пакета, чтобы определить, каким классом его читать
		Packet packet = getPacket(id); // Получаем инстанс пакета с этим ID
		if(packet == null)
			throw new IOException("Bad packet ID: " + id); // Если произошла ошибка и такого пакета не может быть, генерируем исключение
		packet.get(buffer); // Читаем в пакет данные из буфера
		return packet;
	}

	public statuc Packet write(Packet packet, ChannelBuffer buffer) {
		buffer.writeChar(packet.getId()); // Отправляем ID пакета
		packet.send(buffer); // Отправляем данные пакета
	}

	// Функции, которые должен реализовать каждый класс пакета
	public abstract void get(ChannelBuffer buffer);
	public abstract void send(ChannelBuffer buffer);
}

Пример пары пакетов для наглядности:
// Пакет, которым клиент передаёт серверу свой логин
public class Packet1Login extends Packet {
	public String login;

	public void get(ChannelBuffer buffer) {
		int length = buffer.readShort();
		StringBuilder builder = new StringBuilder();
		for(int i = 0; i < length ++i)
			builder.append(buffer.readChar());
		login = builder.toString();
	}

	public void send(ChannelBuffer buffer) {
		// Тело отправки пустое, т.к. сервер не посылает этот пакет
	}
}

// Пакет, которым сервер выкидывает клиента с указаной причиной, или клиент отключается от сервера
public class Packet255KickDisconnect extends Packet {
	public String reason;

	public void get(ChannelBuffer buffer) {
		int length = buffer.readShort();
		StringBuilder builder = new StringBuilder();
		for(int i = 0; i < length ++i)
			builder.append(buffer.readChar());
		reason = builder.toString();
	}

	public void send(ChannelBuffer buffer) {
		buffer.writeShort(reason.length());
		for(int i = 0; i < reason.length(); ++i) {
			buffer.writeChar(reason.getCharAt(i));
		}
	}
}

ChannelBuffer очень похож на DataInputStream и DataOutputStream в одном лице. Большинство функций если не такие же, то очень похожи. Заметьте, что я не забочусь о проверке того, хватает ли в буфере байт для чтения, как будто я работаю с блокирующим IO. Об этом далее…

Работа с клиентом


Работа с клиентом в основном определяется классом PlayerHandler:
public class PlayerHandler extends SimpleChannelUpstreamHandler {
	
	private PlayerWorkerThread worker;
	
	@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
                // Событие вызывается при подключении клиента. Я создаю здесь Worker игрока — объект, который занимается обработкой данных игрока непостредственно.
                // Я передаю ему канал игрока (функция e.getChannel()), чтобы он мог в него посылать пакеты
		worker = new PlayerWorkerThread(this, e.getChannel());
	}
	@Override
	public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
                // Событие закрытия канала. Используется в основном, чтобы освободить ресурсы, или выполнить другие действия, которые происходят при отключении пользователя. Если его не обработать, Вы можете и не заметить, что пользователь отключился, если он напрямую не сказал этого серверу, а просто оборвался канал.
		worker.disconnectedFromChannel();
	}
	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
                // Функция принимает уже готовые Packet'ы от игрока, поэтому их можно сразу посылать в worker. За их формирование отвечает другой обработчик.
		if(e.getChannel().isOpen())
			worker.acceptPacket((Packet) e.getMessage());
	}
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
                // На канале произошло исключение. Выводим ошибку, закрываем канал.
		Server.logger.log(Level.WARNING, "Exception from downstream", e.getCause());
		ctx.getChannel().close();
	}
}

Worker может посылать игроку данные просто функцией channel.write(packet), где channel — канал игрока, который передаётся ему при подключении, а packet — объект класса Packet. За кодирование пакетов будет отвечать уже Encoder.

Decoder и Encoder


Собственно, сама важная часть системы — они отвечают за формирование пакетов Packet из потока пользователя и за отправку таких же пакетов в поток.

Encoder очень прост, он отправляет пакеты игроку:
public class PacketFrameEncoder extends OneToOneEncoder {
	@Override
	protected Object encode(ChannelHandlerContext channelhandlercontext, Channel channel, Object obj) throws Exception {
		if(!(obj instanceof Packet))
			return obj; // Если это не пакет, то просто пропускаем его дальше
		Packet p = (Packet) obj;
		
		ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); // Создаём динамический буфер для записи в него данных из пакета. Если Вы точно знаете длину пакета, Вам не обязательно использовать динамический буфер — ChannelBuffers предоставляет и буферы фиксированной длинны, они могут быть эффективнее.
		Packet.write(p, buffer); // Пишем пакет в буфер
		return buffer; // Возвращаем буфер, который и будет записан в канал
	}
}


Decoder уже гораздо сложнее. Дело в том, что в буфере, пришедшем от клиента, может просто не оказаться достаточного количества байт для чтения всего пакета. В этом случае, нам поможет класс ReplayingDecoder. Нам всего лишь нужно реализовать его функцию decode и читать в ней данные из потока, не заботясь не о чём:
public class PacketFrameDecoder extends ReplayingDecoder<VoidEnum> {
	@Override
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		ctx.sendUpstream(e);
	}
	@Override
	public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
		ctx.sendUpstream(e);
	}
	@Override
	protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer, VoidEnum e) throws Exception {
		return Packet.read(buffer);
	}
}

Спрашивается, как это работает? Очень просто, перед вызовом функции decode декодер помечает текущий индекс чтения, если при чтении из буфера в нём не хватит данных, будет сгенерировано исключение. При этом буфер вернётся в начальное положение и decode будет повторён, когда больше данных будет получено от пользователя. В случае успешного чтения (возвращён не null), декодер попытается вызвать функции decode ещё раз, уже на оставшихся в буфере данных, если в нём есть ещё хотя бы один байт.

Не медленно ли всё это работает, если он генерирует исключение? Медленнее, чем, например, проверка количества данных в буфере и оценка, хватит ли их для чтения пакета. Но он использует кэшированное исключение, поэтому не тратится время на заполнения stacktrace и даже создание нового объекта исключения. Подробнее об и некоторых других, повышающих эффективность, функцийя ReplayingDecoder можно почитать здесь

Вы так же можете поэкспериментировать с FrameDecoder'ом, если, например, Вы можете заранее определить размер пакета по его ID.

Кажется, это всё


Результаты получились отличными. Во-первых, сервер больше не сыпет тысячей потоков — 4 потока Netty + 4 потока обработки данных прекрасно справляются с 250+ клиентами (тестирование продолжается). Во-вторых, нагрузка на процессор стала значительно меньшей и перестала линейно расти от числа подключений. В-третьих, время отклика в некоторых случаях стало меньше.

Надеюсь кому-нибудь это будет полезно. Старалась передать как можно больше важных данных, могла переборщить. Примеров ведь много не бывает? Спрашивайте Ваши ответы и не судите строго — первый раз пишу на хабр.

Постскриптум: ещё несколько полезных вещей


У Netty есть ещё несколько интересных особенностей, которые заслуживают отдельного упоминания:

Во-первых, остановка сервера:
ChannelFuture future = channel.close();
future.awaitUninterruptibly();

Где channel — канал, который возвратила функция bind в начале. future.awaitUninterruptibly() дождётся, пока канал закроется и выполнение кода продолжится.

Самое интересное: ChannelFuture. Когда мы отправляем на канал пакет, функцией channel.write(packet), она возвращает ChannelFuture — это особый объект, который отслеживает состояние выполняемого действия. Через него можно проверить, выполнилось ли действие.

Например, мы хотим послать клиенту пакет отключения и закрыть за ним канал. Если мы сделаем
channel.write(new Packet255KickDisconnect("Пока!"));
channel.close();

то с вероятностью 99%, мы получим ChannelClosedException и пакет до клиента не дойдёт. Но можно сделать так:
ChannelFuture future = channel.write(new Packet255KickDisconnect("Пока!"));
try {
      future.await(10000); // Ждём не более 10 секунд, пока действие закончится
} catch(InterruptedException e) {}
channel.close();

То всё будет супер, кроме того, что это может заблокировать поток выполнения, пока пакет не отправится пользователю. Поэтому на ChannelFuture можно повесит listener — объект, который будет уведомлён о том, что событие совершилось и выполнит какие-либо действия. Для закрытия соединения есть уже готовый listener ChannelFutureListener.CLOSE. Пример использования:
ChannelFuture future = channel.write(new Packet255KickDisconnect("Пока!"));
furute.addListener(ChannelFutureListener.CLOSE);

Эффект тот же, блокировок нет. Разобраться в том, как создать свой листенер не сложно — там всего одна функция. Откройте любой готовый класс, здесь я не буду приводить пример.

Ещё важная информация

Как правильно было замечено в комментариях, следует предупредить о том, что в обработчиках (handler-ах, которые висят на pipeline) лучше не стоит использовать блокирующие операции или ожидание. В противном случае, Вы рискуете навсегда потерять поток обработки или просто сильно затормозить обработку событий остальных клиентов.

Так же в обработчике ни в коем случае нельзя «ждать будущего», т.е. выполнять .await() или .awaitUninterruptibly() на любом ChannelFuture. Во-первых, у Вас ничего не получится, их нельзя вызывать из обработчиков — система не даст сделать такую глупость и сгенерирует исключение. Во-вторых, если бы этого не было, Ваш поток опять же мог бы умереть оставив других клиентов без обслуживания.

Вообще, все действия, выполняемые в ChannelHandler'ах должны быть как можно более простыми и неблокирующими. Ни в коем случае не обрабатывайте данные прямо в них — кладите пакеты в очередь и обрабатывайте их в другом потоке.

+74
103.3k 439
Comments 64
Top of the day