/ crates / iroh-automerge / src / codec.rs
codec.rs
 1  //! From https://github.com/n0-computer/iroh-examples/blob/6af4d24151b53b93e1d97061c792f77b33917ec2/iroh-automerge-repo/src/codec.rs
 2  //!
 3  //! A simple implementation of a tokio-util codec (the [`Encoder`] and [`Decoder`] traits)
 4  //! that wraps [`LengthDelimitedCodec`] and works on `Vec<u8>` instead of [`Bytes`].
 5  //!
 6  //! Also adds a bit of tracing to the encoding and decoding.
 7  //!
 8  //! This codec allows turning things that implement `AsyncRead` or `AsyncWrite`
 9  //! into `Stream`s and `Sink`s that work on whole messages instead of individual bytes.
10  use bytes::Bytes;
11  use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
12  
13  #[derive(Clone)]
14  pub(crate) struct Codec {
15      remote_endpoint_id: iroh::EndpointId, // used for tracing
16      inner: LengthDelimitedCodec,
17  }
18  
19  impl Codec {
20      pub(crate) fn new(remote_endpoint_id: iroh::EndpointId) -> Self {
21          Self {
22              remote_endpoint_id,
23              inner: LengthDelimitedCodec::new(), // using default values
24          }
25      }
26  }
27  
28  impl Encoder<Vec<u8>> for Codec {
29      type Error = std::io::Error;
30  
31      fn encode(&mut self, bytes: Vec<u8>, dst: &mut bytes::BytesMut) -> Result<(), Self::Error> {
32          let len = bytes.len();
33          let result = self.inner.encode(Bytes::from(bytes), dst);
34          if result.is_ok() {
35              tracing::trace!(len, %self.remote_endpoint_id, "encoded msg");
36          }
37          result
38      }
39  }
40  
41  impl Decoder for Codec {
42      type Item = Vec<u8>;
43  
44      type Error = std::io::Error;
45  
46      fn decode(&mut self, src: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
47          let Some(bytes) = self.inner.decode(src)? else {
48              return Ok(None);
49          };
50  
51          tracing::trace!(len = bytes.len(), %self.remote_endpoint_id, "decoded msg");
52  
53          Ok(Some(Vec::from(bytes)))
54      }
55  
56      fn decode_eof(&mut self, buf: &mut bytes::BytesMut) -> Result<Option<Self::Item>, Self::Error> {
57          let Some(bytes) = self.inner.decode_eof(buf)? else {
58              return Ok(None);
59          };
60  
61          tracing::trace!(len = bytes.len(), %self.remote_endpoint_id, "decoded msg");
62  
63          Ok(Some(Vec::from(bytes)))
64      }
65  }