/ crates / tor-async-utils / src / sinkext.rs
sinkext.rs
 1  //! Extension trait for `Sink`.
 2  
 3  use std::{
 4      marker::PhantomData,
 5      pin::Pin,
 6      task::{Context, Poll},
 7  };
 8  
 9  use futures::{ready, sink::Sink};
10  use pin_project::pin_project;
11  
12  /// Extension trait for `Sink`
13  pub trait SinkExt<Item>: Sink<Item> {
14      /// As `Sink::with`, but takes a function that returns an `Item` rather
15      /// than `Future<Output=Item>`.
16      fn with_fn<F, T, E>(self, func: F) -> WithFn<Self, F, T, E>
17      // or error?
18      where
19          Self: Sized,
20          F: FnMut(T) -> Result<Item, E>,
21          E: From<Self::Error>;
22  }
23  
24  impl<Item, S> SinkExt<Item> for S
25  where
26      S: Sink<Item>,
27  {
28      fn with_fn<F, T, E>(self, func: F) -> WithFn<Self, F, T, E>
29      where
30          Self: Sized,
31          F: FnMut(T) -> Result<Item, E>,
32          E: From<Self::Error>,
33      {
34          WithFn {
35              sink: self,
36              func,
37              _phantom: PhantomData,
38          }
39      }
40  }
41  
42  /// Sink returned by [`SinkExt::with_fn`].
43  #[pin_project]
44  pub struct WithFn<S, F, T, E> {
45      /// The underlying sink
46      #[pin]
47      sink: S,
48      /// The user-provided function.
49      func: F,
50      /// Phantom data to ensure type consistency.
51      _phantom: PhantomData<fn() -> Result<T, E>>,
52  }
53  
54  impl<S, Item, F, T, E> Sink<T> for WithFn<S, F, T, E>
55  where
56      S: Sink<Item>,
57      F: FnMut(T) -> Result<Item, E>,
58      E: From<S::Error>,
59  {
60      type Error = E;
61  
62      fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
63          ready!(self.project().sink.poll_ready(cx))?;
64          Poll::Ready(Ok(()))
65      }
66  
67      fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
68          ready!(self.project().sink.poll_flush(cx))?;
69          Poll::Ready(Ok(()))
70      }
71  
72      fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
73          ready!(self.project().sink.poll_close(cx))?;
74          Poll::Ready(Ok(()))
75      }
76  
77      fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> {
78          let this = self.project();
79          let item = (this.func)(item)?;
80          this.sink.start_send(item).map_err(E::from)
81      }
82  }