data_conversion.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. use std::fmt;
  2. use std::error::Error;
  3. use swarm_consensus::BlockID;
  4. use swarm_consensus::CastID;
  5. use swarm_consensus::Data;
  6. use swarm_consensus::GnomeId;
  7. use swarm_consensus::Header;
  8. use swarm_consensus::Message;
  9. use swarm_consensus::Nat;
  10. use swarm_consensus::NeighborRequest;
  11. use swarm_consensus::NeighborResponse;
  12. use swarm_consensus::Neighborhood;
  13. use swarm_consensus::NetworkSettings;
  14. // use swarm_consensus::NetworkSettings;
  15. use swarm_consensus::Payload;
  16. use swarm_consensus::PortAllocationRule;
  17. use swarm_consensus::SwarmID;
  18. use swarm_consensus::SwarmTime;
  19. use bytes::BufMut;
  20. use bytes::Bytes;
  21. use bytes::BytesMut;
  22. use std::net::IpAddr;
  23. // 1234567890123456789012345678901234567890|
  24. // _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ |
  25. // HPPPNNNNSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS|
  26. // H = header: 0 - Sync
  27. // 1 - Block
  28. // PPP = payload: 000 - KeepAlive
  29. // 100 - Block
  30. // 010 - NeighborResponse
  31. // 001 - NeighborRequest
  32. // 101 - Unicast TODO
  33. // 110 - Multicast TODO
  34. // 011 - Broadcast TODO
  35. // 111 - Bye
  36. // NNNN = Neighborhood value
  37. // SS... = SwarmTime value
  38. pub fn bytes_to_message(bytes: &Bytes) -> Result<Message, ConversionError> {
  39. // println!("Bytes to message: {:?}", bytes);
  40. // println!("decoding: {:#08b} {:?}", bytes[0], bytes);
  41. let bytes_len = bytes.len();
  42. let swarm_time: SwarmTime = SwarmTime(as_u32_be(&[bytes[1], bytes[2], bytes[3], bytes[4]]));
  43. let neighborhood: Neighborhood = Neighborhood(bytes[0] & 0b0_000_1111);
  44. let (header, data_idx) = if bytes[0] & 0b1_000_0000 == 128 {
  45. let block_id: u32 = as_u32_be(&[bytes[5], bytes[6], bytes[7], bytes[8]]);
  46. (Header::Block(BlockID(block_id)), 9)
  47. } else {
  48. (Header::Sync, 5)
  49. };
  50. let payload: Payload = if bytes[0] & 0b0_111_0000 == 112 {
  51. println!("bytes[0]: {:#b}", bytes[0]);
  52. Payload::Bye
  53. } else if bytes[0] & 0b0_111_0000 == 80 {
  54. // println!("UNICAST!!");
  55. let cid: CastID = CastID(bytes[data_idx]);
  56. let data: Data = Data(as_u32_be(&[
  57. bytes[data_idx + 1],
  58. bytes[data_idx + 2],
  59. bytes[data_idx + 3],
  60. bytes[data_idx + 4],
  61. ]));
  62. Payload::Unicast(cid, data)
  63. } else if bytes[0] & 0b0_111_0000 == 16 {
  64. println!("len: {}", bytes_len);
  65. let request_type: u8 = bytes[data_idx];
  66. let nr = match request_type {
  67. 255 => {
  68. let st_value: u32 = as_u32_be(&[
  69. bytes[data_idx + 1],
  70. bytes[data_idx + 2],
  71. bytes[data_idx + 3],
  72. bytes[data_idx + 4],
  73. ]);
  74. NeighborRequest::ListingRequest(SwarmTime(st_value))
  75. }
  76. 254 => {
  77. let swarm_id = SwarmID(bytes[data_idx + 1]);
  78. let mut cast_ids = [CastID(0); 256];
  79. let mut inserted = 0;
  80. for c_id in &bytes[data_idx + 2..bytes_len] {
  81. cast_ids[inserted] = CastID(*c_id);
  82. inserted += 1;
  83. }
  84. NeighborRequest::UnicastRequest(swarm_id, cast_ids)
  85. }
  86. 253 => {
  87. let count = bytes[data_idx + 2];
  88. let mut data = [BlockID(0); 128];
  89. for i in 0..count as usize {
  90. let bid = as_u32_be(&[
  91. bytes[4 * i + data_idx + 3],
  92. bytes[4 * i + data_idx + 4],
  93. bytes[4 * i + data_idx + 5],
  94. bytes[4 * i + data_idx + 6],
  95. ]);
  96. data[i as usize] = BlockID(bid);
  97. }
  98. NeighborRequest::PayloadRequest(count, data)
  99. }
  100. 252 => {
  101. let net_set = parse_network_settings(bytes.slice(data_idx + 1..bytes_len));
  102. NeighborRequest::ForwardConnectRequest(net_set)
  103. }
  104. 251 => {
  105. let id = bytes[data_idx + 1];
  106. let mut g_id: u64 = ((bytes[data_idx + 2]) as u64) << 56;
  107. g_id += ((bytes[data_idx + 3]) as u64) << 48;
  108. g_id += ((bytes[data_idx + 4]) as u64) << 40;
  109. g_id += ((bytes[data_idx + 5]) as u64) << 32;
  110. g_id += ((bytes[data_idx + 6]) as u64) << 24;
  111. g_id += ((bytes[data_idx + 7]) as u64) << 16;
  112. g_id += ((bytes[data_idx + 8]) as u64) << 8;
  113. g_id += (bytes[data_idx + 9]) as u64;
  114. let net_set = parse_network_settings(bytes.slice(data_idx + 10..bytes_len));
  115. NeighborRequest::ConnectRequest(id, GnomeId(g_id), net_set)
  116. }
  117. other => {
  118. // TODO
  119. let data: u32 = as_u32_be(&[
  120. bytes[data_idx + 2],
  121. bytes[data_idx + 3],
  122. bytes[data_idx + 4],
  123. bytes[data_idx + 5],
  124. ]);
  125. NeighborRequest::CustomRequest(other, Data(data))
  126. }
  127. };
  128. Payload::Request(nr)
  129. } else if bytes[0] & 0b0_111_0000 == 64 {
  130. // let bid: u32 = as_u32_be(&[bytes[6], bytes[7], bytes[8], bytes[9]]);
  131. // let data = Data(as_u32_be(&[bytes[10], bytes[11], bytes[12], bytes[13]]));
  132. let bid: u32 = as_u32_be(&[bytes[5], bytes[6], bytes[7], bytes[8]]);
  133. let data = Data(as_u32_be(&[
  134. bytes[data_idx],
  135. bytes[data_idx + 1],
  136. bytes[data_idx + 2],
  137. bytes[data_idx + 3],
  138. ]));
  139. Payload::Block(BlockID(bid), data)
  140. } else if bytes[0] & 0b0_111_0000 == 32 {
  141. let response_type = bytes[data_idx];
  142. let nr = match response_type {
  143. 255 => {
  144. let count = bytes[data_idx + 1];
  145. let mut data = [BlockID(0); 128];
  146. for i in 0..count as usize {
  147. let bid: u32 = as_u32_be(&[
  148. bytes[4 * i + data_idx + 2],
  149. bytes[4 * i + data_idx + 3],
  150. bytes[4 * i + data_idx + 4],
  151. bytes[4 * i + data_idx + 5],
  152. ]);
  153. data[i] = BlockID(bid);
  154. }
  155. NeighborResponse::Listing(count, data)
  156. }
  157. 254 => {
  158. NeighborResponse::Unicast(SwarmID(bytes[data_idx + 1]), CastID(bytes[data_idx + 2]))
  159. }
  160. 253 => {
  161. let b_id: u32 = as_u32_be(&[
  162. bytes[data_idx + 1],
  163. bytes[data_idx + 2],
  164. bytes[data_idx + 3],
  165. bytes[data_idx + 4],
  166. ]);
  167. let data: u32 = as_u32_be(&[
  168. bytes[data_idx + 5],
  169. bytes[data_idx + 6],
  170. bytes[data_idx + 7],
  171. bytes[data_idx + 8],
  172. ]);
  173. NeighborResponse::Block(BlockID(b_id), Data(data))
  174. }
  175. 252 => {
  176. let net_set = parse_network_settings(bytes.slice(data_idx + 1..bytes_len));
  177. NeighborResponse::ForwardConnectResponse(net_set)
  178. }
  179. 251 => NeighborResponse::ForwardConnectFailed,
  180. 250 => {
  181. let id = bytes[data_idx + 1];
  182. NeighborResponse::AlreadyConnected(id)
  183. }
  184. 249 => {
  185. let id = bytes[data_idx + 1];
  186. let net_set = parse_network_settings(bytes.slice(data_idx + 2..bytes_len));
  187. NeighborResponse::ConnectResponse(id, net_set)
  188. }
  189. _other => {
  190. // TODO
  191. NeighborResponse::CustomResponse(bytes[data_idx + 1], Data(0))
  192. }
  193. };
  194. Payload::Response(nr)
  195. // TODO: handle Unicast/Multicast/Broadcast messages
  196. } else {
  197. Payload::KeepAlive
  198. };
  199. Ok(Message {
  200. swarm_time,
  201. neighborhood,
  202. header,
  203. payload,
  204. })
  205. }
  206. fn as_u32_be(array: &[u8; 4]) -> u32 {
  207. ((array[0] as u32) << 24)
  208. + ((array[1] as u32) << 16)
  209. + ((array[2] as u32) << 8)
  210. + (array[3] as u32)
  211. }
  212. #[derive(Debug)]
  213. pub struct ConversionError;
  214. // Failed,
  215. // }
  216. impl fmt::Display for ConversionError {
  217. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  218. // match self {
  219. // ConversionError::Failed => write!(f, "ConversionError: Failed"),
  220. // }
  221. write!(f, "ConversionError")
  222. }
  223. }
  224. impl Error for ConversionError {}
  225. pub fn message_to_bytes(msg: Message) -> Bytes {
  226. // println!("Message to bytes: {:?}", msg);
  227. let mut bytes = BytesMut::with_capacity(1033);
  228. let nhood = msg.neighborhood.0;
  229. if nhood > 15 {
  230. panic!("Can't handle this!");
  231. }
  232. bytes.put_u8(nhood);
  233. bytes.put_u32(msg.swarm_time.0);
  234. let mut block_id_inserted = false;
  235. if let Header::Block(block_id) = msg.header {
  236. bytes[0] |= 0b1_000_0000;
  237. bytes.put_u32(block_id.0);
  238. block_id_inserted = true;
  239. }
  240. bytes[0] |= match msg.payload {
  241. Payload::KeepAlive => 0b0_000_0000,
  242. Payload::Unicast(cid, data) => {
  243. println!("Unicast into bytes");
  244. bytes.put_u8(cid.0);
  245. bytes.put_u32(data.0);
  246. 0b0_101_0000
  247. }
  248. Payload::Multicast(_mid, _data) => 0b0_110_0000,
  249. Payload::Broadcast(_bid, _data) => 0b0_011_0000,
  250. Payload::Bye => 0b0_111_0000,
  251. Payload::Block(block_id, data) => {
  252. if !block_id_inserted {
  253. bytes.put_u32(block_id.0);
  254. };
  255. bytes.put_u32(data.0);
  256. 0b0_100_0000
  257. }
  258. Payload::Response(neighbor_response) => {
  259. match neighbor_response {
  260. NeighborResponse::Listing(count, data) => {
  261. bytes.put_u8(255);
  262. bytes.put_u8(count);
  263. for chunk in data {
  264. bytes.put_u32(chunk.0);
  265. }
  266. }
  267. NeighborResponse::Unicast(swarm_id, cast_id) => {
  268. bytes.put_u8(254);
  269. bytes.put_u8(swarm_id.0);
  270. bytes.put_u8(cast_id.0);
  271. }
  272. NeighborResponse::Block(b_id, data) => {
  273. bytes.put_u8(253);
  274. bytes.put_u32(b_id.0);
  275. bytes.put_u32(data.0);
  276. }
  277. NeighborResponse::ForwardConnectResponse(network_settings) => {
  278. bytes.put_u8(252);
  279. insert_network_settings(&mut bytes, network_settings);
  280. }
  281. NeighborResponse::ForwardConnectFailed => {
  282. bytes.put_u8(251);
  283. }
  284. NeighborResponse::AlreadyConnected(id) => {
  285. bytes.put_u8(250);
  286. bytes.put_u8(id);
  287. }
  288. NeighborResponse::ConnectResponse(id, network_settings) => {
  289. bytes.put_u8(249);
  290. bytes.put_u8(id);
  291. insert_network_settings(&mut bytes, network_settings);
  292. }
  293. NeighborResponse::CustomResponse(id, data) => {
  294. bytes.put_u8(id);
  295. bytes.put_u32(data.0);
  296. } // _ => todo!(),
  297. }
  298. 0b0_010_0000
  299. }
  300. Payload::Request(nr) => {
  301. match nr {
  302. NeighborRequest::ListingRequest(st) => {
  303. bytes.put_u8(255);
  304. bytes.put_u32(st.0);
  305. }
  306. NeighborRequest::UnicastRequest(swarm_id, cast_ids) => {
  307. bytes.put_u8(254);
  308. bytes.put_u8(swarm_id.0);
  309. for c_id in cast_ids {
  310. bytes.put_u8(c_id.0);
  311. }
  312. }
  313. NeighborRequest::PayloadRequest(count, data) => {
  314. bytes.put_u8(253);
  315. bytes.put_u8(count);
  316. for chunk in data {
  317. bytes.put_u32(chunk.0);
  318. }
  319. }
  320. NeighborRequest::ForwardConnectRequest(network_settings) => {
  321. bytes.put_u8(252);
  322. insert_network_settings(&mut bytes, network_settings);
  323. }
  324. NeighborRequest::ConnectRequest(id, gnome_id, network_settings) => {
  325. bytes.put_u8(251);
  326. bytes.put_u8(id);
  327. bytes.put_u64(gnome_id.0);
  328. insert_network_settings(&mut bytes, network_settings);
  329. }
  330. NeighborRequest::CustomRequest(id, data) => {
  331. bytes.put_u8(id);
  332. bytes.put_u32(data.0);
  333. }
  334. }
  335. 0b0_001_0000
  336. }
  337. };
  338. // println!("encoded: {:#08b} {:?}", bytes[0], bytes);
  339. bytes.split().into()
  340. }
  341. fn insert_network_settings(bytes: &mut BytesMut, network_settings: NetworkSettings) {
  342. bytes.put_u8(network_settings.nat_type as u8);
  343. bytes.put_u16(network_settings.pub_port);
  344. bytes.put_u8(network_settings.port_allocation.0 as u8);
  345. bytes.put_i8(network_settings.port_allocation.1);
  346. let pub_ip = network_settings.pub_ip;
  347. match pub_ip {
  348. std::net::IpAddr::V4(ip4) => {
  349. for b in ip4.octets() {
  350. bytes.put_u8(b);
  351. }
  352. }
  353. std::net::IpAddr::V6(ip4) => {
  354. for b in ip4.octets() {
  355. bytes.put_u8(b);
  356. }
  357. }
  358. }
  359. }
  360. fn parse_network_settings(bytes: Bytes) -> NetworkSettings {
  361. let mut bytes_iter = bytes.into_iter();
  362. let raw_nat_type = bytes_iter.next().unwrap();
  363. let nat_type = match raw_nat_type {
  364. 0 => Nat::Unknown,
  365. 1 => Nat::None,
  366. 2 => Nat::FullCone,
  367. 4 => Nat::AddressRestrictedCone,
  368. 8 => Nat::PortRestrictedCone,
  369. 16 => Nat::SymmetricWithPortControl,
  370. 32 => Nat::Symmetric,
  371. _ => {
  372. println!("Unrecognized NatType while parsing: {}", raw_nat_type);
  373. Nat::Unknown
  374. }
  375. };
  376. let mut port_bytes: [u8; 2] = [0, 0];
  377. port_bytes[0] = bytes_iter.next().unwrap();
  378. port_bytes[1] = bytes_iter.next().unwrap();
  379. let pub_port: u16 = ((port_bytes[0]) as u16) << 8 | port_bytes[1] as u16;
  380. let port_allocation_rule = match bytes_iter.next().unwrap() {
  381. 0 => PortAllocationRule::Random,
  382. 1 => PortAllocationRule::FullCone,
  383. 2 => PortAllocationRule::AddressSensitive,
  384. 4 => PortAllocationRule::PortSensitive,
  385. _ => PortAllocationRule::Random,
  386. };
  387. let delta_port = bytes_iter.next().unwrap() as i8;
  388. // port_bytes[0] = bytes_iter.next().unwrap();
  389. // port_bytes[1] = bytes_iter.next().unwrap();
  390. // let port_range_min: u16 = ((port_bytes[0]) as u16) << 8 | port_bytes[1] as u16;
  391. // port_bytes[0] = bytes_iter.next().unwrap();
  392. // port_bytes[1] = bytes_iter.next().unwrap();
  393. // let port_range_max: u16 = ((port_bytes[0]) as u16) << 8 | port_bytes[1] as u16;
  394. let ip_bytes: Vec<u8> = bytes_iter.collect();
  395. let bytes_len = ip_bytes.len();
  396. let pub_ip = if bytes_len == 4 {
  397. let array: [u8; 4] = ip_bytes.try_into().unwrap();
  398. IpAddr::from(array)
  399. } else if bytes_len == 16 {
  400. let array: [u8; 16] = ip_bytes.try_into().unwrap();
  401. IpAddr::from(array)
  402. } else {
  403. println!("Unable to parse IP addr from: {:?}", ip_bytes);
  404. IpAddr::from([0, 0, 0, 0])
  405. };
  406. NetworkSettings {
  407. pub_ip,
  408. pub_port,
  409. nat_type,
  410. port_allocation: (port_allocation_rule, delta_port),
  411. }
  412. }