rqbit/crates/peer_binary_protocol/src/lib.rs

601 lines
20 KiB
Rust
Raw Normal View History

2021-07-02 22:32:55 +01:00
pub mod extended;
2021-07-01 23:37:57 +01:00
2021-06-25 13:47:51 +01:00
use bincode::Options;
2021-07-03 19:10:59 +01:00
use buffers::{ByteBuf, ByteString};
use byteorder::{ByteOrder, BE};
2021-07-03 19:10:59 +01:00
use clone_to_owned::CloneToOwned;
2021-07-12 21:59:08 +01:00
use librqbit_core::{constants::CHUNK_SIZE, id20::Id20, lengths::ChunkInfo};
2021-07-02 22:32:55 +01:00
use serde::{Deserialize, Serialize};
2021-06-25 13:47:51 +01:00
2021-07-02 22:32:55 +01:00
use self::extended::{handshake::ExtendedHandshake, ExtendedMessage};
const INTEGER_LEN: usize = 4;
const MSGID_LEN: usize = 1;
const PREAMBLE_LEN: usize = INTEGER_LEN + MSGID_LEN;
const PIECE_MESSAGE_PREAMBLE_LEN: usize = PREAMBLE_LEN + INTEGER_LEN * 2;
pub const PIECE_MESSAGE_DEFAULT_LEN: usize = PIECE_MESSAGE_PREAMBLE_LEN + CHUNK_SIZE as usize;
2021-06-25 13:47:51 +01:00
const NO_PAYLOAD_MSG_LEN: usize = PREAMBLE_LEN;
const PSTR_BT1: &str = "BitTorrent protocol";
const LEN_PREFIX_KEEPALIVE: u32 = 0;
const LEN_PREFIX_CHOKE: u32 = 1;
const LEN_PREFIX_UNCHOKE: u32 = 1;
const LEN_PREFIX_INTERESTED: u32 = 1;
const LEN_PREFIX_NOT_INTERESTED: u32 = 1;
const LEN_PREFIX_HAVE: u32 = 5;
const LEN_PREFIX_PIECE: u32 = 9;
2021-06-25 13:47:51 +01:00
const LEN_PREFIX_REQUEST: u32 = 13;
const MSGID_CHOKE: u8 = 0;
const MSGID_UNCHOKE: u8 = 1;
const MSGID_INTERESTED: u8 = 2;
const MSGID_NOT_INTERESTED: u8 = 3;
const MSGID_HAVE: u8 = 4;
const MSGID_BITFIELD: u8 = 5;
const MSGID_REQUEST: u8 = 6;
const MSGID_PIECE: u8 = 7;
2021-07-01 23:37:57 +01:00
const MSGID_EXTENDED: u8 = 20;
2021-06-25 13:47:51 +01:00
2021-07-02 22:32:55 +01:00
pub const MY_EXTENDED_UT_METADATA: u8 = 3;
2021-07-02 13:00:46 +01:00
2021-06-25 13:47:51 +01:00
#[derive(Debug)]
pub enum MessageDeserializeError {
NotEnoughData(usize, &'static str),
UnsupportedMessageId(u8),
IncorrectLenPrefix {
received: u32,
expected: u32,
msg_id: u8,
},
OtherBincode {
error: bincode::Error,
msg_id: u8,
len_prefix: u32,
name: &'static str,
},
2021-07-02 01:38:07 +01:00
Other(anyhow::Error),
2021-06-25 13:47:51 +01:00
}
pub fn serialize_piece_preamble(chunk: &ChunkInfo, mut buf: &mut [u8]) -> usize {
BE::write_u32(&mut buf[0..4], LEN_PREFIX_PIECE + chunk.size);
buf[4] = MSGID_PIECE;
buf = &mut buf[PREAMBLE_LEN..];
BE::write_u32(&mut buf[0..4], chunk.piece_index.get());
BE::write_u32(&mut buf[4..8], chunk.offset);
PIECE_MESSAGE_PREAMBLE_LEN
}
2021-06-25 13:47:51 +01:00
#[derive(Debug)]
pub struct Piece<ByteBuf> {
pub index: u32,
pub begin: u32,
pub block: ByteBuf,
}
impl<ByteBuf> Piece<ByteBuf>
where
ByteBuf: AsRef<[u8]>,
{
2021-06-28 15:44:29 +01:00
pub fn from_data<T>(index: u32, begin: u32, block: T) -> Piece<ByteBuf>
where
2021-06-28 15:44:29 +01:00
ByteBuf: From<T>,
{
Piece {
index,
begin,
block: ByteBuf::from(block),
}
}
2021-06-26 17:29:59 +01:00
pub fn serialize(&self, mut buf: &mut [u8]) -> usize {
2021-06-25 13:47:51 +01:00
byteorder::BigEndian::write_u32(&mut buf[0..4], self.index);
byteorder::BigEndian::write_u32(&mut buf[4..8], self.begin);
2021-06-26 17:29:59 +01:00
buf = &mut buf[8..];
buf.copy_from_slice(self.block.as_ref());
2021-06-25 13:47:51 +01:00
self.block.as_ref().len() + 8
}
pub fn deserialize<'a>(buf: &'a [u8]) -> Piece<ByteBuf>
where
ByteBuf: From<&'a [u8]> + 'a,
{
let index = byteorder::BigEndian::read_u32(&buf[0..4]);
let begin = byteorder::BigEndian::read_u32(&buf[4..8]);
let block = ByteBuf::from(&buf[8..]);
Piece {
index,
begin,
block,
}
}
}
impl std::fmt::Display for MessageDeserializeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
MessageDeserializeError::NotEnoughData(b, name) => {
write!(
f,
"not enough data to deserialize {}: expected at least {} more bytes",
name, b
)
}
MessageDeserializeError::UnsupportedMessageId(msg_id) => {
write!(f, "unsupported message id {}", msg_id)
}
MessageDeserializeError::IncorrectLenPrefix {
received,
expected,
msg_id,
} => write!(
f,
"incorrect len prefix for message id {}, expected {}, received {}",
msg_id, expected, received
),
MessageDeserializeError::OtherBincode {
error,
msg_id,
name,
len_prefix,
} => write!(
f,
"error deserializing {} (msg_id={}, len_prefix={}): {:?}",
name, msg_id, len_prefix, error
),
2021-07-02 01:38:07 +01:00
MessageDeserializeError::Other(e) => write!(f, "{}", e),
2021-06-25 13:47:51 +01:00
}
}
}
impl std::error::Error for MessageDeserializeError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
MessageDeserializeError::OtherBincode { error, .. } => Some(error),
_ => None,
}
}
}
2021-07-02 01:38:07 +01:00
impl From<anyhow::Error> for MessageDeserializeError {
fn from(e: anyhow::Error) -> Self {
MessageDeserializeError::Other(e)
}
}
2021-06-25 13:47:51 +01:00
#[derive(Debug)]
2021-07-01 23:37:57 +01:00
pub enum Message<ByteBuf: std::hash::Hash + Eq> {
2021-06-25 13:47:51 +01:00
Request(Request),
Bitfield(ByteBuf),
KeepAlive,
Have(u32),
Choke,
Unchoke,
Interested,
NotInterested,
Piece(Piece<ByteBuf>),
2021-07-01 23:37:57 +01:00
Extended(ExtendedMessage<ByteBuf>),
2021-06-25 13:47:51 +01:00
}
pub type MessageBorrowed<'a> = Message<ByteBuf<'a>>;
pub type MessageOwned = Message<ByteString>;
2022-12-04 13:11:40 +00:00
pub type BitfieldBorrowed<'a> = &'a bitvec::slice::BitSlice<u8, bitvec::order::Lsb0>;
pub type BitfieldOwned = bitvec::vec::BitVec<u8, bitvec::order::Lsb0>;
2021-06-25 13:47:51 +01:00
pub struct Bitfield<'a> {
pub data: BitfieldBorrowed<'a>,
}
2021-07-01 23:37:57 +01:00
impl<ByteBuf> CloneToOwned for Message<ByteBuf>
where
ByteBuf: CloneToOwned + std::hash::Hash + Eq,
<ByteBuf as CloneToOwned>::Target: std::hash::Hash + Eq,
{
2021-06-25 13:47:51 +01:00
type Target = Message<<ByteBuf as CloneToOwned>::Target>;
fn clone_to_owned(&self) -> Self::Target {
match self {
Message::Request(req) => Message::Request(*req),
Message::Bitfield(b) => Message::Bitfield(b.clone_to_owned()),
Message::Choke => Message::Choke,
Message::Unchoke => Message::Unchoke,
Message::Interested => Message::Interested,
Message::Piece(piece) => Message::Piece(Piece {
index: piece.index,
begin: piece.begin,
block: piece.block.clone_to_owned(),
}),
Message::KeepAlive => Message::KeepAlive,
Message::Have(v) => Message::Have(*v),
Message::NotInterested => Message::NotInterested,
Message::Extended(e) => Message::Extended(e.clone_to_owned()),
2021-06-25 13:47:51 +01:00
}
}
}
impl<'a> Bitfield<'a> {
pub fn new_from_slice(buf: &'a [u8]) -> anyhow::Result<Self> {
Ok(Self {
2022-12-04 13:11:40 +00:00
data: bitvec::slice::BitSlice::from_slice(buf),
2021-06-25 13:47:51 +01:00
})
}
}
impl<'a> std::fmt::Debug for Bitfield<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Bitfield")
.field("_ones", &self.data.count_ones())
.field("_len", &self.data.len())
.finish()
}
}
impl<ByteBuf> Message<ByteBuf>
where
2021-07-02 01:38:07 +01:00
ByteBuf: AsRef<[u8]> + std::hash::Hash + Eq + Serialize,
2021-06-25 13:47:51 +01:00
{
pub fn len_prefix_and_msg_id(&self) -> (u32, u8) {
match self {
Message::Request(_) => (LEN_PREFIX_REQUEST, MSGID_REQUEST),
Message::Bitfield(b) => (1 + b.as_ref().len() as u32, MSGID_BITFIELD),
Message::Choke => (LEN_PREFIX_CHOKE, MSGID_CHOKE),
Message::Unchoke => (LEN_PREFIX_UNCHOKE, MSGID_UNCHOKE),
Message::Interested => (LEN_PREFIX_INTERESTED, MSGID_INTERESTED),
Message::NotInterested => (LEN_PREFIX_NOT_INTERESTED, MSGID_NOT_INTERESTED),
Message::Piece(p) => (
LEN_PREFIX_PIECE + p.block.as_ref().len() as u32,
MSGID_PIECE,
),
2021-06-25 13:47:51 +01:00
Message::KeepAlive => (LEN_PREFIX_KEEPALIVE, 0),
Message::Have(_) => (LEN_PREFIX_HAVE, MSGID_HAVE),
2021-07-02 01:38:07 +01:00
Message::Extended(_) => (0, MSGID_EXTENDED),
2021-06-25 13:47:51 +01:00
}
}
2021-07-02 13:00:46 +01:00
pub fn serialize(
&self,
out: &mut Vec<u8>,
peer_extended_handshake: Option<&ExtendedHandshake<ByteString>>,
) -> anyhow::Result<usize> {
2021-06-25 13:47:51 +01:00
let (lp, msg_id) = self.len_prefix_and_msg_id();
out.resize(PREAMBLE_LEN, 0);
byteorder::BigEndian::write_u32(&mut out[..4], lp);
out[4] = msg_id;
let ser = bopts();
match self {
Message::Request(request) => {
const MSG_LEN: usize = PREAMBLE_LEN + 12;
out.resize(MSG_LEN, 0);
debug_assert_eq!((&out[PREAMBLE_LEN..]).len(), 12);
ser.serialize_into(&mut out[PREAMBLE_LEN..], request)
.unwrap();
2021-07-02 13:00:46 +01:00
Ok(MSG_LEN)
2021-06-25 13:47:51 +01:00
}
Message::Bitfield(b) => {
let block_len = b.as_ref().len();
let msg_len = PREAMBLE_LEN + block_len;
out.resize(msg_len, 0);
(&mut out[PREAMBLE_LEN..PREAMBLE_LEN + block_len]).copy_from_slice(b.as_ref());
2021-07-02 13:00:46 +01:00
Ok(msg_len)
}
2021-06-27 14:49:41 +01:00
Message::Choke | Message::Unchoke | Message::Interested | Message::NotInterested => {
2021-07-02 13:00:46 +01:00
Ok(PREAMBLE_LEN)
2021-06-27 14:49:41 +01:00
}
2021-06-25 13:47:51 +01:00
Message::Piece(p) => {
let block_len = p.block.as_ref().len();
let payload_len = 8 + block_len;
2021-06-26 17:29:59 +01:00
let msg_len = PREAMBLE_LEN + payload_len;
2021-06-25 13:47:51 +01:00
out.resize(msg_len, 0);
2021-06-26 17:29:59 +01:00
let tmp = &mut out[PREAMBLE_LEN..];
p.serialize(&mut tmp[..payload_len]);
2021-07-02 13:00:46 +01:00
Ok(msg_len)
2021-06-25 13:47:51 +01:00
}
2021-06-26 17:29:59 +01:00
Message::KeepAlive => {
// the len prefix was already written out to buf
2021-07-02 13:00:46 +01:00
Ok(4)
2021-06-26 17:29:59 +01:00
}
2021-06-25 13:47:51 +01:00
Message::Have(v) => {
let msg_len = PREAMBLE_LEN + 4;
out.resize(msg_len, 0);
BE::write_u32(&mut out[PREAMBLE_LEN..], *v);
2021-07-02 13:00:46 +01:00
Ok(msg_len)
2021-06-25 13:47:51 +01:00
}
2021-07-02 01:38:07 +01:00
Message::Extended(e) => {
e.serialize(out, peer_extended_handshake)?;
2021-07-02 01:38:07 +01:00
let msg_size = out.len();
// no fucking idea why +1, but I tweaked that for it all to match up
// with real messages.
BE::write_u32(&mut out[..4], (msg_size - PREAMBLE_LEN + 1) as u32);
2021-07-02 13:00:46 +01:00
Ok(msg_size)
2021-07-02 01:38:07 +01:00
}
2021-06-25 13:47:51 +01:00
}
}
pub fn deserialize<'a>(
buf: &'a [u8],
) -> Result<(Message<ByteBuf>, usize), MessageDeserializeError>
where
2021-07-02 01:38:07 +01:00
ByteBuf: From<&'a [u8]> + 'a + Deserialize<'a>,
2021-06-25 13:47:51 +01:00
{
let len_prefix = match buf.get(0..4) {
Some(bytes) => byteorder::BigEndian::read_u32(bytes),
None => return Err(MessageDeserializeError::NotEnoughData(4, "message")),
};
if len_prefix == 0 {
return Ok((Message::KeepAlive, 4));
}
let msg_id = match buf.get(4) {
Some(msg_id) => *msg_id,
None => return Err(MessageDeserializeError::NotEnoughData(1, "message")),
};
let rest = &buf[5..];
let decoder_config = bincode::DefaultOptions::new()
.with_fixint_encoding()
.with_big_endian();
match msg_id {
MSGID_CHOKE => {
if len_prefix != LEN_PREFIX_CHOKE {
return Err(MessageDeserializeError::IncorrectLenPrefix {
received: len_prefix,
expected: LEN_PREFIX_CHOKE,
msg_id,
});
}
Ok((Message::Choke, NO_PAYLOAD_MSG_LEN))
}
MSGID_UNCHOKE => {
if len_prefix != LEN_PREFIX_UNCHOKE {
return Err(MessageDeserializeError::IncorrectLenPrefix {
received: len_prefix,
expected: LEN_PREFIX_UNCHOKE,
msg_id,
});
}
Ok((Message::Unchoke, NO_PAYLOAD_MSG_LEN))
}
MSGID_INTERESTED => {
if len_prefix != LEN_PREFIX_INTERESTED {
return Err(MessageDeserializeError::IncorrectLenPrefix {
received: len_prefix,
expected: LEN_PREFIX_INTERESTED,
msg_id,
});
}
Ok((Message::Interested, NO_PAYLOAD_MSG_LEN))
}
MSGID_NOT_INTERESTED => {
if len_prefix != LEN_PREFIX_NOT_INTERESTED {
return Err(MessageDeserializeError::IncorrectLenPrefix {
received: len_prefix,
expected: LEN_PREFIX_NOT_INTERESTED,
msg_id,
});
}
Ok((Message::NotInterested, NO_PAYLOAD_MSG_LEN))
}
MSGID_HAVE => {
let expected_len = 4;
match rest.get(..expected_len as usize) {
2021-10-18 13:11:45 +01:00
Some(h) => Ok((Message::Have(BE::read_u32(h)), PREAMBLE_LEN + expected_len)),
2021-06-25 13:47:51 +01:00
None => {
let missing = expected_len - rest.len();
Err(MessageDeserializeError::NotEnoughData(missing, "have"))
}
}
}
MSGID_BITFIELD => {
if len_prefix <= 1 {
return Err(MessageDeserializeError::IncorrectLenPrefix {
expected: 2,
received: len_prefix,
msg_id,
});
}
let expected_len = len_prefix as usize - 1;
match rest.get(..expected_len as usize) {
Some(bitfield) => Ok((
Message::Bitfield(ByteBuf::from(bitfield)),
PREAMBLE_LEN + expected_len,
)),
None => {
let missing = expected_len - rest.len();
Err(MessageDeserializeError::NotEnoughData(missing, "bitfield"))
}
}
}
MSGID_REQUEST => {
let expected_len = 12;
match rest.get(..expected_len as usize) {
Some(b) => {
2021-10-18 13:11:45 +01:00
let request = decoder_config.deserialize::<Request>(b).unwrap();
2021-06-25 13:47:51 +01:00
Ok((Message::Request(request), PREAMBLE_LEN + expected_len))
}
None => {
let missing = expected_len - rest.len();
Err(MessageDeserializeError::NotEnoughData(missing, "request"))
}
}
}
MSGID_PIECE => {
if len_prefix <= 9 {
return Err(MessageDeserializeError::IncorrectLenPrefix {
expected: 10,
received: len_prefix,
msg_id,
});
}
// <len=0009+X> is for "9", "8" is for 2 integer fields in the piece.
let expected_len = len_prefix as usize - 9 + 8;
match rest.get(..expected_len) {
Some(b) => Ok((
2021-10-18 13:11:45 +01:00
Message::Piece(Piece::deserialize(b)),
2021-06-25 13:47:51 +01:00
PREAMBLE_LEN + expected_len,
)),
None => Err(MessageDeserializeError::NotEnoughData(
expected_len - rest.len(),
"piece",
)),
}
}
2021-07-02 01:38:07 +01:00
MSGID_EXTENDED => {
if len_prefix <= 6 {
return Err(MessageDeserializeError::IncorrectLenPrefix {
expected: 6,
received: len_prefix,
msg_id,
});
}
// TODO: NO clue why - 1 here. Empirically figured out.
let expected_len = len_prefix as usize - 1;
match rest.get(..expected_len) {
Some(b) => Ok((
2021-10-18 13:11:45 +01:00
Message::Extended(ExtendedMessage::deserialize(b)?),
2021-07-02 01:38:07 +01:00
PREAMBLE_LEN + expected_len,
)),
None => Err(MessageDeserializeError::NotEnoughData(
expected_len - rest.len(),
"extended",
)),
}
}
2021-06-25 13:47:51 +01:00
msg_id => Err(MessageDeserializeError::UnsupportedMessageId(msg_id)),
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Handshake<'a> {
pub pstr: &'a str,
pub reserved: [u8; 8],
2021-07-13 13:16:59 +01:00
pub info_hash: [u8; 20],
pub peer_id: [u8; 20],
2021-06-25 13:47:51 +01:00
}
fn bopts() -> impl bincode::Options {
bincode::DefaultOptions::new()
.with_fixint_encoding()
.with_big_endian()
}
impl<'a> Handshake<'a> {
2021-07-12 21:59:08 +01:00
pub fn new(info_hash: Id20, peer_id: Id20) -> Handshake<'static> {
2021-06-25 13:47:51 +01:00
debug_assert_eq!(PSTR_BT1.len(), 19);
2021-07-02 01:38:07 +01:00
let mut reserved: u64 = 0;
// supports extended messaging
reserved |= 1 << 20;
let mut reserved_arr = [0u8; 8];
BE::write_u64(&mut reserved_arr, reserved);
2021-06-25 13:47:51 +01:00
Handshake {
pstr: PSTR_BT1,
2021-07-02 01:38:07 +01:00
reserved: reserved_arr,
2021-07-13 13:16:59 +01:00
info_hash: info_hash.0,
peer_id: peer_id.0,
2021-06-25 13:47:51 +01:00
}
}
2021-07-02 13:00:46 +01:00
pub fn supports_extended(&self) -> bool {
self.reserved[5] & 0x10 > 0
}
2021-06-25 13:47:51 +01:00
fn bopts() -> impl bincode::Options {
bincode::DefaultOptions::new()
}
pub fn deserialize(b: &[u8]) -> Result<(Handshake<'_>, usize), MessageDeserializeError> {
let pstr_len = *b
.get(0)
.ok_or(MessageDeserializeError::NotEnoughData(1, "handshake"))?;
let expected_len = 1usize + pstr_len as usize + 48;
let hbuf = b
.get(..expected_len)
.ok_or(MessageDeserializeError::NotEnoughData(
expected_len,
"handshake",
))?;
2021-07-04 14:38:44 +01:00
Ok((
Self::bopts()
2021-10-18 13:11:45 +01:00
.deserialize(hbuf)
2021-07-04 14:38:44 +01:00
.map_err(|e| MessageDeserializeError::Other(e.into()))?,
expected_len,
))
2021-06-25 13:47:51 +01:00
}
pub fn serialize(&self, buf: &mut Vec<u8>) {
Self::bopts().serialize_into(buf, &self).unwrap()
2021-06-25 13:47:51 +01:00
}
}
#[derive(Serialize, Deserialize, Debug, Clone, Copy)]
pub struct Request {
pub index: u32,
pub begin: u32,
pub length: u32,
}
impl Request {
pub fn new(index: u32, begin: u32, length: u32) -> Self {
Self {
index,
begin,
length,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_handshake_serialize() {
2021-07-12 21:59:08 +01:00
let info_hash = Id20([
2021-06-25 13:47:51 +01:00
1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
2021-07-12 21:59:08 +01:00
]);
let peer_id = Id20([
2021-06-25 13:47:51 +01:00
1u8, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
2021-07-12 21:59:08 +01:00
]);
let mut buf = Vec::new();
Handshake::new(info_hash, peer_id).serialize(&mut buf);
assert_eq!(buf.len(), 20 + 20 + 8 + 19 + 1);
2021-06-25 13:47:51 +01:00
}
2021-07-02 01:38:07 +01:00
#[test]
fn test_extended_serialize() {
let msg = Message::Extended(ExtendedMessage::Handshake(ExtendedHandshake::new()));
2021-07-02 01:38:07 +01:00
let mut out = Vec::new();
msg.serialize(&mut out, None).unwrap();
2021-07-02 01:38:07 +01:00
dbg!(out);
}
#[test]
fn test_deserialize_serialize_extended_is_same() {
use std::fs::File;
use std::io::Read;
let mut buf = Vec::new();
File::open("../librqbit/resources/test/extended-handshake.bin")
.unwrap()
.read_to_end(&mut buf)
.unwrap();
let (msg, size) = MessageBorrowed::deserialize(&buf).unwrap();
assert_eq!(size, buf.len());
let mut write_buf = Vec::new();
msg.serialize(&mut write_buf, None).unwrap();
if buf != write_buf {
{
use std::io::Write;
let mut f = std::fs::OpenOptions::new()
.create(true)
.write(true)
.open("/tmp/test_deserialize_serialize_extended_is_same")
.unwrap();
f.write_all(&write_buf).unwrap();
}
panic!("resources/test/extended-handshake.bin did not serialize exactly the same. Dumped to /tmp/test_deserialize_serialize_extended_is_same, you can compare with resources/test/extended-handshake.bin")
}
}
2021-06-25 13:47:51 +01:00
}