diff --git a/src/common/frames.rs b/src/common/frames.rs index 30c1251b..9724ad8f 100644 --- a/src/common/frames.rs +++ b/src/common/frames.rs @@ -54,9 +54,8 @@ impl Frame { // Read from UDP socket, set addr to source pub async fn recv_from(&mut self, socket: &UdpSocket) -> IoResult<(usize, SocketAddr)> { - let mut buf = BytesMut::zeroed(65536); - let (size, source) = socket.recv_from(&mut buf).await?; - buf.truncate(size); + let mut buf = BytesMut::with_capacity(65536); + let (size, source) = socket.recv_buf_from(&mut buf).await?; self.body = buf.freeze(); self.addr = Some(source.into()); Ok((size, source)) @@ -228,18 +227,14 @@ where self.remaining = Some(BytesMut::with_capacity(65536 * 2)); } let mut buf = self.remaining.take().unwrap(); - let mut last = buf.split(); - buf.reserve(65536); - unsafe { - buf.set_len(65536); + if buf.remaining_mut() < 65536 { + buf.reserve(65536); } - let len = self.inner.read(&mut buf).await?; + let len = self.inner.read_buf(&mut buf).await?; if len == 0 { return Ok(None); } - buf.truncate(len); - last.unsplit(buf); - self.remaining = Some(last); + self.remaining = Some(buf); } } } @@ -397,18 +392,14 @@ where } let mut buf = self.remaining.take().unwrap(); - let mut last = buf.split(); - buf.reserve(65536); - unsafe { - buf.set_len(65536); + if buf.remaining_mut() < 65536 { + buf.reserve(65536); } - let len = self.inner.read(&mut buf).await?; + let len = self.inner.read_buf(&mut buf).await?; if len == 0 { return Ok(None); } - buf.truncate(len); - last.unsplit(buf); - self.remaining = Some(last); + self.remaining = Some(buf); } } } diff --git a/src/listeners/tproxy.rs b/src/listeners/tproxy.rs index de0bd3c8..fb989020 100644 --- a/src/listeners/tproxy.rs +++ b/src/listeners/tproxy.rs @@ -233,8 +233,12 @@ impl TProxyListener { timeouts: &Timeouts, queue: &Sender, ) -> Result<()> { - let mut buf = BytesMut::zeroed(65536); + let mut buf = BytesMut::with_capacity(65536); + unsafe { + buf.set_len(65536); + } let (size, src, dst) = listener.recv_msg(&mut buf).await.context("accept")?; + buf.truncate(size); let src = try_map_v4_addr(src); let dst = try_map_v4_addr(dst); if match dst.ip() { @@ -243,7 +247,6 @@ impl TProxyListener { } { return Ok(()); } - buf.truncate(size); let mut buf = Frame::from_body(buf.freeze()); buf.addr = Some(dst.into()); diff --git a/src/rules/mod.rs b/src/rules/mod.rs index 46135508..0966eccb 100644 --- a/src/rules/mod.rs +++ b/src/rules/mod.rs @@ -120,16 +120,16 @@ impl Rule { rules_metrics().execute_time.start_timer() }; let t = Instant::now(); - let ret = if self.filter.is_none() { - true - } else { - match self.filter.as_ref().unwrap().evaluate(request).await { + let ret = if let Some(filter) = &self.filter { + match filter.evaluate(request).await { Ok(b) => b, Err(e) => { trace!("error evaluating filter: {:?}", e); false } } + } else { + true }; let t = t.elapsed().as_nanos() as u64; #[cfg(feature = "metrics")]