Java
17 October 2011

Пишем простой UDP BitTorrent-трекер на Netty + MongoDB

From Sandbox

Введение


В это статье освещается работа UDP Tracker Protocol. Все примеры, приведенные в статье, будут на Java с использованием NIO-фреймворка Netty. В качестве БД взята MongoDB.

Обычно торрент-трекеры работают через протокол HTTP, передавая данные посредством GET-запросов. Работа трекера по протоколу UDP позволяет существенно сократить траффик (более чем в 2 раза), а так же избавиться от ограничения на количество одновременных соединений, которое накладывает протокол TCP.

Ссылка на UDP-трекер в клиенте может выглядеть так: udp://tracker.openbittorrent.com:80/announce, где на месте announce может быть что угодно (либо вообще ничего). А вот указание порта обязательно, в отличие от HTTP трекера.


Общие принципы протокола


Теперь о том, как в общих чертах работает UDP-трекер.
1. Сначала клиент посылает трекеру запрос на соединение (пакет 0x00 Connect). В этом запросе поле connection ID равно 0x41727101980 — это идентификатор протокола. Кроме того, клиент передает ID транзакции, выбранный им случайно.
2. Далее сервер создает клиенту уникальный connection ID, который и передает в ответном пакете. При этом сервер обязан передать ID транзакции, который он получил от клиента.
3. Клиент теперь имеет уникальный ID (который, впрочем, не особо и нужен, если это открытый трекер без регистрации пользователей и учета траффика.) и может слать нам пакеты с анонсами.
4. В ответ на анонс сервер отдает список пиров торрента, интервал обращений клиента к серверу и статистику сидов/пиров.
5. Ещё клиент может отправлять нам Scrape-запросы, где передается несколько хешей торрентов, к которым он хочет получить статистику. Количество запрашиваемых торрентов за 1 запрос не может превышать 74 ввиду ограничений протокола UDP.

Разработка сервера


На данном этапе я советую вам скачать исходники трекера, т.к. в статье я опишу только ключевые моменты. Скачать исходники и используемые библиотеки можно здесь: github.com/lafayette/udp-torrent-tracker

Инициализация Netty.

Executor threadPool = Executors.newCachedThreadPool();
DatagramChannelFactory factory = new NioDatagramChannelFactory(threadPool); // Создаем фабрику для UDP.

bootstrap = new ConnectionlessBootstrap(factory);
bootstrap.getPipeline().addLast("handler", new Handler()); // Handler будет принимать все сообщения от клиентов.

bootstrap.setOption("reuseAddress", true); // При нештатном завершении работы сервера остается открытым порт. К сожалению, ничего лучше reuseAddress придумать не удалось. Буду благодарен, если кто-то подскажет более правильное решение.

// В ShutdowHook мы можем задать действия, которые будут произведены по заверешении работы Netty. Например, закрыть канал и освободить ресурсы.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
	public void run() {
		channel.close();
		bootstrap.releaseExternalResources();
	}
}));

String host = Config.getInstance().getString("listen.host", "127.0.0.1");
Integer port = Config.getInstance().getInt("listen.port", 8080);
InetSocketAddress address = new InetSocketAddress(host, port); // Создаем объект, который указывает, на каком адресе и порте слушать. Можно указать только порт и тогда Netty будет слушать на всех интерфейсах.

logger.info("Listening on " + host + ":" + port);

// И в заключение мы вызываем метод bind, который запускает позволяет начать слушать заданный порт.
bootstrap.bind(address);


Получение сообщений от клиентов.

public class Handler extends SimpleChannelUpstreamHandler {
	private static final Logger logger = Logger.getLogger(Handler.class);

	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
		ChannelBuffer channelBuffer = (ChannelBuffer)e.getMessage(); // Из ChannelBuffer мы будем читать пришедший нам udp-пакет.

		// Каждый пакет, полученный от клиента, содержит как минимум connection ID (long), action ID (int) и transaction ID (int) и не может быть меньше 16 байт.
		if (channelBuffer.readableBytes() < 16) {
			logger.debug("Incorrect packet received from " + e.getRemoteAddress());
		}

		long connectionId = channelBuffer.readLong(); // Здесь можно проверять connectionId, но у нас открытый трекер без учета пользователей и мы можем игнорировать это.
		int actionId = channelBuffer.readInt(); // ID действия. Может быть: 0x00 Connect; 0x01 Announce; 0x02 Scrape; 0x03: Error. Последний (ошибку) отправляет только сервер.
		int transactionId = channelBuffer.readInt(); // ID транзакции. В ответе мы обязаны отправить принятый ID транзакции, иначе клиент нас проигнорирует.

		Action action = Action.byId(actionId);

		ClientRequest request;

		switch (action) {
			case CONNECT:
				request = new ConnectionRequest();
				break;
			case ANNOUNCE:
				request = new AnnounceRequest();
				break;
			case SCRAPE:
				request = new ScrapeRequest();
				break;
			default:
				logger.debug("Incorrect action supplied");
				ErrorResponse.send(e, transactionId, "Incorrect action");
				return;
		}


		// Здесь мы передадим в обработчик запроса все необходимые данные, включая заголовок из идентификаторов соединения, действия и транзакции.
		request.setContext(ctx);
		request.setMessageEvent(e);
		request.setChannelBuffer(channelBuffer);
		request.setConnectionId(connectionId);
		request.setAction(action);
		request.setTransactionId(transactionId);

		// Остается лишь вызвать чтение оставшихся данных для притятого пакета.
		request.read();
	}
}


MongoDB

Для работы с MongoDB я воспользовался замечательной библиотекой для маппинга — Morphia.

Вот как я описал класс для хранения пира:
@Entity("peers")
public class Peer {
	public @Id ObjectId id;
	public @Indexed byte[] infoHash;
	public byte[] peerId;
	public long downloaded;
	public long left;
	public long uploaded;
	public @Transient int event;
	public int ip;
	public short port;
	public @Transient int key;
	public @Transient int numWant;
	public @Transient short extensions;
	public long lastUpdate;

	@PrePersist
	private void prePersist() {
		this.lastUpdate = System.currentTimeMillis();
	}
}

Аннотация Transient означает, что мы не сохраняем это поле в таблицу. Эти поля нам будут нужны только для обработки запроса. Поле infoHash помечено аннотацией Indexed, т.к. мы будем искать подходящих пиров именно по хэшу торрента.

Так же нам нужно создать подключение к БД. Делается это довольно просто:
morphia = new Morphia();
morphia.map(Peer.class); // Говорим Морфии, что хотим маппить этот класс.

mongo = new Mongo(host, port);
datastore = morphia.createDatastore(mongo, "udptracker");


И пример поиска пиров по info_hash
Query<Peer> peersQuery = Datastore.instance().find(Peer.class);
peersQuery.field("infoHash").equal(peer.infoHash);
peersQuery.field("peerId").notEqual(peer.peerId); // Исключаем себя из списка.
peersQuery.limit(peer.numWant).offset(randomOffset); // Ограничение по numWant и случайный набор пиров.


Для лучшего понимания лучше будет заглянуть в документацию Morphia.

В остальном все довольно просто: из полученного СhannelBuffer читаем данные от клиента, а затем в e.getChannel() отправляем ответ. Реализацию всех пакетов вы можете посмотреть в исходниках.

Кроме того для лучшего понимания протокола советую изучить xbtt.sourceforge.net/udp_tracker_protocol.html

Исходники вышеописанного сервера: github.com/lafayette/udp-torrent-tracker

P.S. Сразу хочу сказать, что это первый мой опыт работы как с Netty, так и с MongoDB. По сути я на этом проекте изучал обе этих замечательных вещи. Поэтому очень приветствуются советы как можно было сделать лучше/красивее/по-джедайски.

+20
10.8k 72
Comments 12
Top of the day