futures_lite/io.rs
1//! Tools and combinators for I/O.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::io::{self, AsyncReadExt};
7//!
8//! # spin_on::spin_on(async {
9//! let input: &[u8] = b"hello";
10//! let mut reader = io::BufReader::new(input);
11//!
12//! let mut contents = String::new();
13//! reader.read_to_string(&mut contents).await?;
14//! # std::io::Result::Ok(()) });
15//! ```
16
17#[doc(no_inline)]
18pub use std::io::{Error, ErrorKind, Result, SeekFrom};
19
20#[doc(no_inline)]
21pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
22
23use std::borrow::{Borrow, BorrowMut};
24use std::boxed::Box;
25use std::cmp;
26use std::fmt;
27use std::future::Future;
28use std::io::{IoSlice, IoSliceMut};
29use std::mem;
30use std::pin::Pin;
31use std::string::String;
32use std::sync::{Arc, Mutex};
33use std::task::{Context, Poll};
34use std::vec;
35use std::vec::Vec;
36
37use futures_core::stream::Stream;
38use pin_project_lite::pin_project;
39
40use crate::future;
41use crate::ready;
42
43const DEFAULT_BUF_SIZE: usize = 8 * 1024;
44
45/// Copies the entire contents of a reader into a writer.
46///
47/// This function will read data from `reader` and write it into `writer` in a streaming fashion
48/// until `reader` returns EOF.
49///
50/// On success, returns the total number of bytes copied.
51///
52/// # Examples
53///
54/// ```
55/// use futures_lite::io::{self, BufReader, BufWriter};
56///
57/// # spin_on::spin_on(async {
58/// let input: &[u8] = b"hello";
59/// let reader = BufReader::new(input);
60///
61/// let mut output = Vec::new();
62/// let writer = BufWriter::new(&mut output);
63///
64/// io::copy(reader, writer).await?;
65/// # std::io::Result::Ok(()) });
66/// ```
67pub async fn copy<R, W>(reader: R, writer: W) -> Result<u64>
68where
69 R: AsyncRead,
70 W: AsyncWrite,
71{
72 pin_project! {
73 struct CopyFuture<R, W> {
74 #[pin]
75 reader: R,
76 #[pin]
77 writer: W,
78 amt: u64,
79 }
80 }
81
82 impl<R, W> Future for CopyFuture<R, W>
83 where
84 R: AsyncBufRead,
85 W: AsyncWrite,
86 {
87 type Output = Result<u64>;
88
89 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
90 let mut this = self.project();
91 loop {
92 let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
93 if buffer.is_empty() {
94 ready!(this.writer.as_mut().poll_flush(cx))?;
95 return Poll::Ready(Ok(*this.amt));
96 }
97
98 let i = ready!(this.writer.as_mut().poll_write(cx, buffer))?;
99 if i == 0 {
100 return Poll::Ready(Err(ErrorKind::WriteZero.into()));
101 }
102 *this.amt += i as u64;
103 this.reader.as_mut().consume(i);
104 }
105 }
106 }
107
108 let future = CopyFuture {
109 reader: BufReader::new(reader),
110 writer,
111 amt: 0,
112 };
113 future.await
114}
115
116/// Asserts that a type implementing [`std::io`] traits can be used as an async type.
117///
118/// The underlying I/O handle should never block nor return the [`ErrorKind::WouldBlock`] error.
119/// This is usually the case for in-memory buffered I/O.
120///
121/// # Examples
122///
123/// ```
124/// use futures_lite::io::{AssertAsync, AsyncReadExt};
125///
126/// let reader: &[u8] = b"hello";
127///
128/// # spin_on::spin_on(async {
129/// let mut async_reader = AssertAsync::new(reader);
130/// let mut contents = String::new();
131///
132/// // This line works in async manner - note that there is await:
133/// async_reader.read_to_string(&mut contents).await?;
134/// # std::io::Result::Ok(()) });
135/// ```
136#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
137pub struct AssertAsync<T>(T);
138
139impl<T> Unpin for AssertAsync<T> {}
140
141impl<T> AssertAsync<T> {
142 /// Wraps an I/O handle implementing [`std::io`] traits.
143 ///
144 /// # Examples
145 ///
146 /// ```
147 /// use futures_lite::io::AssertAsync;
148 ///
149 /// let reader: &[u8] = b"hello";
150 ///
151 /// let async_reader = AssertAsync::new(reader);
152 /// ```
153 #[inline(always)]
154 pub fn new(io: T) -> Self {
155 AssertAsync(io)
156 }
157
158 /// Gets a reference to the inner I/O handle.
159 ///
160 /// # Examples
161 ///
162 /// ```
163 /// use futures_lite::io::AssertAsync;
164 ///
165 /// let reader: &[u8] = b"hello";
166 ///
167 /// let async_reader = AssertAsync::new(reader);
168 /// let r = async_reader.get_ref();
169 /// ```
170 #[inline(always)]
171 pub fn get_ref(&self) -> &T {
172 &self.0
173 }
174
175 /// Gets a mutable reference to the inner I/O handle.
176 ///
177 /// # Examples
178 ///
179 /// ```
180 /// use futures_lite::io::AssertAsync;
181 ///
182 /// let reader: &[u8] = b"hello";
183 ///
184 /// let mut async_reader = AssertAsync::new(reader);
185 /// let r = async_reader.get_mut();
186 /// ```
187 #[inline(always)]
188 pub fn get_mut(&mut self) -> &mut T {
189 &mut self.0
190 }
191
192 /// Extracts the inner I/O handle.
193 ///
194 /// # Examples
195 ///
196 /// ```
197 /// use futures_lite::io::AssertAsync;
198 ///
199 /// let reader: &[u8] = b"hello";
200 ///
201 /// let async_reader = AssertAsync::new(reader);
202 /// let inner = async_reader.into_inner();
203 /// ```
204 #[inline(always)]
205 pub fn into_inner(self) -> T {
206 self.0
207 }
208}
209
210fn assert_async_wrapio<F, T>(mut f: F) -> Poll<std::io::Result<T>>
211where
212 F: FnMut() -> std::io::Result<T>,
213{
214 loop {
215 match f() {
216 Err(err) if err.kind() == ErrorKind::Interrupted => {}
217 res => return Poll::Ready(res),
218 }
219 }
220}
221
222impl<T: std::io::Read> AsyncRead for AssertAsync<T> {
223 #[inline]
224 fn poll_read(
225 mut self: Pin<&mut Self>,
226 _: &mut Context<'_>,
227 buf: &mut [u8],
228 ) -> Poll<Result<usize>> {
229 assert_async_wrapio(move || self.0.read(buf))
230 }
231
232 #[inline]
233 fn poll_read_vectored(
234 mut self: Pin<&mut Self>,
235 _: &mut Context<'_>,
236 bufs: &mut [IoSliceMut<'_>],
237 ) -> Poll<Result<usize>> {
238 assert_async_wrapio(move || self.0.read_vectored(bufs))
239 }
240}
241
242impl<T: std::io::Write> AsyncWrite for AssertAsync<T> {
243 #[inline]
244 fn poll_write(
245 mut self: Pin<&mut Self>,
246 _: &mut Context<'_>,
247 buf: &[u8],
248 ) -> Poll<Result<usize>> {
249 assert_async_wrapio(move || self.0.write(buf))
250 }
251
252 #[inline]
253 fn poll_write_vectored(
254 mut self: Pin<&mut Self>,
255 _: &mut Context<'_>,
256 bufs: &[IoSlice<'_>],
257 ) -> Poll<Result<usize>> {
258 assert_async_wrapio(move || self.0.write_vectored(bufs))
259 }
260
261 #[inline]
262 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
263 assert_async_wrapio(move || self.0.flush())
264 }
265
266 #[inline]
267 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
268 self.poll_flush(cx)
269 }
270}
271
272impl<T: std::io::Seek> AsyncSeek for AssertAsync<T> {
273 #[inline]
274 fn poll_seek(
275 mut self: Pin<&mut Self>,
276 _: &mut Context<'_>,
277 pos: SeekFrom,
278 ) -> Poll<Result<u64>> {
279 assert_async_wrapio(move || self.0.seek(pos))
280 }
281}
282
283/// A wrapper around a type that implements `AsyncRead` or `AsyncWrite` that converts `Pending`
284/// polls to `WouldBlock` errors.
285///
286/// This wrapper can be used as a compatibility layer between `AsyncRead` and `Read`, for types
287/// that take `Read` as a parameter.
288///
289/// # Examples
290///
291/// ```
292/// use std::io::Read;
293/// use std::task::{Poll, Context};
294///
295/// fn poll_for_io(cx: &mut Context<'_>) -> Poll<usize> {
296/// // Assume we have a library that's built around `Read` and `Write` traits.
297/// use cooltls::Session;
298///
299/// // We want to use it with our writer that implements `AsyncWrite`.
300/// let writer = Stream::new();
301///
302/// // First, we wrap our `Writer` with `AsyncAsSync` to convert `Pending` polls to `WouldBlock`.
303/// use futures_lite::io::AsyncAsSync;
304/// let writer = AsyncAsSync::new(cx, writer);
305///
306/// // Now, we can use it with `cooltls`.
307/// let mut session = Session::new(writer);
308///
309/// // Match on the result of `read()` and translate it to poll.
310/// match session.read(&mut [0; 1024]) {
311/// Ok(n) => Poll::Ready(n),
312/// Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Poll::Pending,
313/// Err(err) => panic!("unexpected error: {}", err),
314/// }
315/// }
316///
317/// // Usually, poll-based functions are best wrapped using `poll_fn`.
318/// use futures_lite::future::poll_fn;
319/// # futures_lite::future::block_on(async {
320/// poll_fn(|cx| poll_for_io(cx)).await;
321/// # });
322/// # struct Stream;
323/// # impl Stream {
324/// # fn new() -> Stream {
325/// # Stream
326/// # }
327/// # }
328/// # impl futures_lite::io::AsyncRead for Stream {
329/// # fn poll_read(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<std::io::Result<usize>> {
330/// # Poll::Ready(Ok(0))
331/// # }
332/// # }
333/// # mod cooltls {
334/// # pub struct Session<W> {
335/// # reader: W,
336/// # }
337/// # impl<W> Session<W> {
338/// # pub fn new(reader: W) -> Session<W> {
339/// # Session { reader }
340/// # }
341/// # }
342/// # impl<W: std::io::Read> std::io::Read for Session<W> {
343/// # fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
344/// # self.reader.read(buf)
345/// # }
346/// # }
347/// # }
348/// ```
349#[derive(Debug)]
350pub struct AsyncAsSync<'r, 'ctx, T> {
351 /// The context we are using to poll the future.
352 pub context: &'r mut Context<'ctx>,
353
354 /// The actual reader/writer we are wrapping.
355 pub inner: T,
356}
357
358impl<'r, 'ctx, T> AsyncAsSync<'r, 'ctx, T> {
359 /// Wraps an I/O handle implementing [`AsyncRead`] or [`AsyncWrite`] traits.
360 ///
361 /// # Examples
362 ///
363 /// ```
364 /// use futures_lite::io::AsyncAsSync;
365 /// use std::task::Context;
366 /// use waker_fn::waker_fn;
367 ///
368 /// let reader: &[u8] = b"hello";
369 /// let waker = waker_fn(|| {});
370 /// let mut context = Context::from_waker(&waker);
371 ///
372 /// let async_reader = AsyncAsSync::new(&mut context, reader);
373 /// ```
374 #[inline]
375 pub fn new(context: &'r mut Context<'ctx>, inner: T) -> Self {
376 AsyncAsSync { context, inner }
377 }
378
379 /// Attempt to shutdown the I/O handle.
380 ///
381 /// # Examples
382 ///
383 /// ```
384 /// use futures_lite::io::AsyncAsSync;
385 /// use std::task::Context;
386 /// use waker_fn::waker_fn;
387 ///
388 /// let reader: Vec<u8> = b"hello".to_vec();
389 /// let waker = waker_fn(|| {});
390 /// let mut context = Context::from_waker(&waker);
391 ///
392 /// let mut async_reader = AsyncAsSync::new(&mut context, reader);
393 /// async_reader.close().unwrap();
394 /// ```
395 #[inline]
396 pub fn close(&mut self) -> Result<()>
397 where
398 T: AsyncWrite + Unpin,
399 {
400 self.poll_with(|io, cx| io.poll_close(cx))
401 }
402
403 /// Poll this `AsyncAsSync` for some function.
404 ///
405 /// # Examples
406 ///
407 /// ```
408 /// use futures_lite::io::{AsyncAsSync, AsyncRead};
409 /// use std::task::Context;
410 /// use waker_fn::waker_fn;
411 ///
412 /// let reader: &[u8] = b"hello";
413 /// let waker = waker_fn(|| {});
414 /// let mut context = Context::from_waker(&waker);
415 ///
416 /// let mut async_reader = AsyncAsSync::new(&mut context, reader);
417 /// let r = async_reader.poll_with(|io, cx| io.poll_read(cx, &mut [0; 1024]));
418 /// assert_eq!(r.unwrap(), 5);
419 /// ```
420 #[inline]
421 pub fn poll_with<R>(
422 &mut self,
423 f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<R>>,
424 ) -> Result<R>
425 where
426 T: Unpin,
427 {
428 match f(Pin::new(&mut self.inner), self.context) {
429 Poll::Ready(res) => res,
430 Poll::Pending => Err(ErrorKind::WouldBlock.into()),
431 }
432 }
433}
434
435impl<T: AsyncRead + Unpin> std::io::Read for AsyncAsSync<'_, '_, T> {
436 #[inline]
437 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
438 self.poll_with(|io, cx| io.poll_read(cx, buf))
439 }
440
441 #[inline]
442 fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
443 self.poll_with(|io, cx| io.poll_read_vectored(cx, bufs))
444 }
445}
446
447impl<T: AsyncWrite + Unpin> std::io::Write for AsyncAsSync<'_, '_, T> {
448 #[inline]
449 fn write(&mut self, buf: &[u8]) -> Result<usize> {
450 self.poll_with(|io, cx| io.poll_write(cx, buf))
451 }
452
453 #[inline]
454 fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
455 self.poll_with(|io, cx| io.poll_write_vectored(cx, bufs))
456 }
457
458 #[inline]
459 fn flush(&mut self) -> Result<()> {
460 self.poll_with(|io, cx| io.poll_flush(cx))
461 }
462}
463
464impl<T: AsyncSeek + Unpin> std::io::Seek for AsyncAsSync<'_, '_, T> {
465 #[inline]
466 fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
467 self.poll_with(|io, cx| io.poll_seek(cx, pos))
468 }
469}
470
471impl<T> AsRef<T> for AsyncAsSync<'_, '_, T> {
472 #[inline]
473 fn as_ref(&self) -> &T {
474 &self.inner
475 }
476}
477
478impl<T> AsMut<T> for AsyncAsSync<'_, '_, T> {
479 #[inline]
480 fn as_mut(&mut self) -> &mut T {
481 &mut self.inner
482 }
483}
484
485impl<T> Borrow<T> for AsyncAsSync<'_, '_, T> {
486 #[inline]
487 fn borrow(&self) -> &T {
488 &self.inner
489 }
490}
491
492impl<T> BorrowMut<T> for AsyncAsSync<'_, '_, T> {
493 #[inline]
494 fn borrow_mut(&mut self) -> &mut T {
495 &mut self.inner
496 }
497}
498
499/// Blocks on all async I/O operations and implements [`std::io`] traits.
500///
501/// Sometimes async I/O needs to be used in a blocking manner. If calling [`future::block_on()`]
502/// manually all the time becomes too tedious, use this type for more convenient blocking on async
503/// I/O operations.
504///
505/// This type implements traits [`Read`][`std::io::Read`], [`Write`][`std::io::Write`], or
506/// [`Seek`][`std::io::Seek`] if the inner type implements [`AsyncRead`], [`AsyncWrite`], or
507/// [`AsyncSeek`], respectively.
508///
509/// If writing data through the [`Write`][`std::io::Write`] trait, make sure to flush before
510/// dropping the [`BlockOn`] handle or some buffered data might get lost.
511///
512/// # Examples
513///
514/// ```
515/// use futures_lite::io::BlockOn;
516/// use futures_lite::pin;
517/// use std::io::Read;
518///
519/// let reader: &[u8] = b"hello";
520/// pin!(reader);
521///
522/// let mut blocking_reader = BlockOn::new(reader);
523/// let mut contents = String::new();
524///
525/// // This line blocks - note that there is no await:
526/// blocking_reader.read_to_string(&mut contents)?;
527/// # std::io::Result::Ok(())
528/// ```
529#[derive(Debug)]
530pub struct BlockOn<T>(T);
531
532impl<T> BlockOn<T> {
533 /// Wraps an async I/O handle into a blocking interface.
534 ///
535 /// # Examples
536 ///
537 /// ```
538 /// use futures_lite::io::BlockOn;
539 /// use futures_lite::pin;
540 ///
541 /// let reader: &[u8] = b"hello";
542 /// pin!(reader);
543 ///
544 /// let blocking_reader = BlockOn::new(reader);
545 /// ```
546 pub fn new(io: T) -> BlockOn<T> {
547 BlockOn(io)
548 }
549
550 /// Gets a reference to the async I/O handle.
551 ///
552 /// # Examples
553 ///
554 /// ```
555 /// use futures_lite::io::BlockOn;
556 /// use futures_lite::pin;
557 ///
558 /// let reader: &[u8] = b"hello";
559 /// pin!(reader);
560 ///
561 /// let blocking_reader = BlockOn::new(reader);
562 /// let r = blocking_reader.get_ref();
563 /// ```
564 pub fn get_ref(&self) -> &T {
565 &self.0
566 }
567
568 /// Gets a mutable reference to the async I/O handle.
569 ///
570 /// # Examples
571 ///
572 /// ```
573 /// use futures_lite::io::BlockOn;
574 /// use futures_lite::pin;
575 ///
576 /// let reader: &[u8] = b"hello";
577 /// pin!(reader);
578 ///
579 /// let mut blocking_reader = BlockOn::new(reader);
580 /// let r = blocking_reader.get_mut();
581 /// ```
582 pub fn get_mut(&mut self) -> &mut T {
583 &mut self.0
584 }
585
586 /// Extracts the inner async I/O handle.
587 ///
588 /// # Examples
589 ///
590 /// ```
591 /// use futures_lite::io::BlockOn;
592 /// use futures_lite::pin;
593 ///
594 /// let reader: &[u8] = b"hello";
595 /// pin!(reader);
596 ///
597 /// let blocking_reader = BlockOn::new(reader);
598 /// let inner = blocking_reader.into_inner();
599 /// ```
600 pub fn into_inner(self) -> T {
601 self.0
602 }
603}
604
605impl<T: AsyncRead + Unpin> std::io::Read for BlockOn<T> {
606 fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
607 future::block_on(self.0.read(buf))
608 }
609}
610
611impl<T: AsyncBufRead + Unpin> std::io::BufRead for BlockOn<T> {
612 fn fill_buf(&mut self) -> Result<&[u8]> {
613 future::block_on(self.0.fill_buf())
614 }
615
616 fn consume(&mut self, amt: usize) {
617 Pin::new(&mut self.0).consume(amt)
618 }
619}
620
621impl<T: AsyncWrite + Unpin> std::io::Write for BlockOn<T> {
622 fn write(&mut self, buf: &[u8]) -> Result<usize> {
623 future::block_on(self.0.write(buf))
624 }
625
626 fn flush(&mut self) -> Result<()> {
627 future::block_on(self.0.flush())
628 }
629}
630
631impl<T: AsyncSeek + Unpin> std::io::Seek for BlockOn<T> {
632 fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
633 future::block_on(self.0.seek(pos))
634 }
635}
636
637pin_project! {
638 /// Adds buffering to a reader.
639 ///
640 /// It can be excessively inefficient to work directly with an [`AsyncRead`] instance. A
641 /// [`BufReader`] performs large, infrequent reads on the underlying [`AsyncRead`] and
642 /// maintains an in-memory buffer of the incoming byte stream.
643 ///
644 /// [`BufReader`] can improve the speed of programs that make *small* and *repeated* reads to
645 /// the same file or networking socket. It does not help when reading very large amounts at
646 /// once, or reading just once or a few times. It also provides no advantage when reading from
647 /// a source that is already in memory, like a `Vec<u8>`.
648 ///
649 /// When a [`BufReader`] is dropped, the contents of its buffer are discarded. Creating
650 /// multiple instances of [`BufReader`] on the same reader can cause data loss.
651 ///
652 /// # Examples
653 ///
654 /// ```
655 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
656 ///
657 /// # spin_on::spin_on(async {
658 /// let input: &[u8] = b"hello";
659 /// let mut reader = BufReader::new(input);
660 ///
661 /// let mut line = String::new();
662 /// reader.read_line(&mut line).await?;
663 /// # std::io::Result::Ok(()) });
664 /// ```
665 pub struct BufReader<R> {
666 #[pin]
667 inner: R,
668 buf: Box<[u8]>,
669 pos: usize,
670 cap: usize,
671 }
672}
673
674impl<R: AsyncRead> BufReader<R> {
675 /// Creates a buffered reader with the default buffer capacity.
676 ///
677 /// The default capacity is currently 8 KB, but that may change in the future.
678 ///
679 /// # Examples
680 ///
681 /// ```
682 /// use futures_lite::io::BufReader;
683 ///
684 /// let input: &[u8] = b"hello";
685 /// let reader = BufReader::new(input);
686 /// ```
687 pub fn new(inner: R) -> BufReader<R> {
688 BufReader::with_capacity(DEFAULT_BUF_SIZE, inner)
689 }
690
691 /// Creates a buffered reader with the specified capacity.
692 ///
693 /// # Examples
694 ///
695 /// ```
696 /// use futures_lite::io::BufReader;
697 ///
698 /// let input: &[u8] = b"hello";
699 /// let reader = BufReader::with_capacity(1024, input);
700 /// ```
701 pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
702 BufReader {
703 inner,
704 buf: vec![0; capacity].into_boxed_slice(),
705 pos: 0,
706 cap: 0,
707 }
708 }
709}
710
711impl<R> BufReader<R> {
712 /// Gets a reference to the underlying reader.
713 ///
714 /// It is not advisable to directly read from the underlying reader.
715 ///
716 /// # Examples
717 ///
718 /// ```
719 /// use futures_lite::io::BufReader;
720 ///
721 /// let input: &[u8] = b"hello";
722 /// let reader = BufReader::new(input);
723 ///
724 /// let r = reader.get_ref();
725 /// ```
726 pub fn get_ref(&self) -> &R {
727 &self.inner
728 }
729
730 /// Gets a mutable reference to the underlying reader.
731 ///
732 /// It is not advisable to directly read from the underlying reader.
733 ///
734 /// # Examples
735 ///
736 /// ```
737 /// use futures_lite::io::BufReader;
738 ///
739 /// let input: &[u8] = b"hello";
740 /// let mut reader = BufReader::new(input);
741 ///
742 /// let r = reader.get_mut();
743 /// ```
744 pub fn get_mut(&mut self) -> &mut R {
745 &mut self.inner
746 }
747
748 /// Gets a pinned mutable reference to the underlying reader.
749 ///
750 /// It is not advisable to directly read from the underlying reader.
751 fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
752 self.project().inner
753 }
754
755 /// Returns a reference to the internal buffer.
756 ///
757 /// This method will not attempt to fill the buffer if it is empty.
758 ///
759 /// # Examples
760 ///
761 /// ```
762 /// use futures_lite::io::BufReader;
763 ///
764 /// let input: &[u8] = b"hello";
765 /// let reader = BufReader::new(input);
766 ///
767 /// // The internal buffer is empty until the first read request.
768 /// assert_eq!(reader.buffer(), &[]);
769 /// ```
770 pub fn buffer(&self) -> &[u8] {
771 &self.buf[self.pos..self.cap]
772 }
773
774 /// Unwraps the buffered reader, returning the underlying reader.
775 ///
776 /// Note that any leftover data in the internal buffer will be lost.
777 ///
778 /// # Examples
779 ///
780 /// ```
781 /// use futures_lite::io::BufReader;
782 ///
783 /// let input: &[u8] = b"hello";
784 /// let reader = BufReader::new(input);
785 ///
786 /// assert_eq!(reader.into_inner(), input);
787 /// ```
788 pub fn into_inner(self) -> R {
789 self.inner
790 }
791
792 /// Invalidates all data in the internal buffer.
793 #[inline]
794 fn discard_buffer(self: Pin<&mut Self>) {
795 let this = self.project();
796 *this.pos = 0;
797 *this.cap = 0;
798 }
799}
800
801impl<R: AsyncRead> AsyncRead for BufReader<R> {
802 fn poll_read(
803 mut self: Pin<&mut Self>,
804 cx: &mut Context<'_>,
805 buf: &mut [u8],
806 ) -> Poll<Result<usize>> {
807 // If we don't have any buffered data and we're doing a massive read
808 // (larger than our internal buffer), bypass our internal buffer
809 // entirely.
810 if self.pos == self.cap && buf.len() >= self.buf.len() {
811 let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
812 self.discard_buffer();
813 return Poll::Ready(res);
814 }
815 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
816 let nread = std::io::Read::read(&mut rem, buf)?;
817 self.consume(nread);
818 Poll::Ready(Ok(nread))
819 }
820
821 fn poll_read_vectored(
822 mut self: Pin<&mut Self>,
823 cx: &mut Context<'_>,
824 bufs: &mut [IoSliceMut<'_>],
825 ) -> Poll<Result<usize>> {
826 let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
827 if self.pos == self.cap && total_len >= self.buf.len() {
828 let res = ready!(self.as_mut().get_pin_mut().poll_read_vectored(cx, bufs));
829 self.discard_buffer();
830 return Poll::Ready(res);
831 }
832 let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
833 let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
834 self.consume(nread);
835 Poll::Ready(Ok(nread))
836 }
837}
838
839impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
840 fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
841 let mut this = self.project();
842
843 // If we've reached the end of our internal buffer then we need to fetch
844 // some more data from the underlying reader.
845 // Branch using `>=` instead of the more correct `==`
846 // to tell the compiler that the pos..cap slice is always valid.
847 if *this.pos >= *this.cap {
848 debug_assert!(*this.pos == *this.cap);
849 *this.cap = ready!(this.inner.as_mut().poll_read(cx, this.buf))?;
850 *this.pos = 0;
851 }
852 Poll::Ready(Ok(&this.buf[*this.pos..*this.cap]))
853 }
854
855 fn consume(self: Pin<&mut Self>, amt: usize) {
856 let this = self.project();
857 *this.pos = cmp::min(*this.pos + amt, *this.cap);
858 }
859}
860
861impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
862 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
863 f.debug_struct("BufReader")
864 .field("reader", &self.inner)
865 .field(
866 "buffer",
867 &format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
868 )
869 .finish()
870 }
871}
872
873impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
874 /// Seeks to an offset, in bytes, in the underlying reader.
875 ///
876 /// The position used for seeking with [`SeekFrom::Current`] is the position the underlying
877 /// reader would be at if the [`BufReader`] had no internal buffer.
878 ///
879 /// Seeking always discards the internal buffer, even if the seek position would otherwise fall
880 /// within it. This guarantees that calling [`into_inner()`][`BufReader::into_inner()`]
881 /// immediately after a seek yields the underlying reader at the same position.
882 ///
883 /// See [`AsyncSeek`] for more details.
884 ///
885 /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the
886 /// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If
887 /// the second seek returns `Err`, the underlying reader will be left at the same position it
888 /// would have if you called [`seek()`][`AsyncSeekExt::seek()`] with `SeekFrom::Current(0)`.
889 fn poll_seek(
890 mut self: Pin<&mut Self>,
891 cx: &mut Context<'_>,
892 pos: SeekFrom,
893 ) -> Poll<Result<u64>> {
894 let result: u64;
895 if let SeekFrom::Current(n) = pos {
896 let remainder = (self.cap - self.pos) as i64;
897 // it should be safe to assume that remainder fits within an i64 as the alternative
898 // means we managed to allocate 8 exbibytes and that's absurd.
899 // But it's not out of the realm of possibility for some weird underlying reader to
900 // support seeking by i64::min_value() so we need to handle underflow when subtracting
901 // remainder.
902 if let Some(offset) = n.checked_sub(remainder) {
903 result = ready!(self
904 .as_mut()
905 .get_pin_mut()
906 .poll_seek(cx, SeekFrom::Current(offset)))?;
907 } else {
908 // seek backwards by our remainder, and then by the offset
909 ready!(self
910 .as_mut()
911 .get_pin_mut()
912 .poll_seek(cx, SeekFrom::Current(-remainder)))?;
913 self.as_mut().discard_buffer();
914 result = ready!(self
915 .as_mut()
916 .get_pin_mut()
917 .poll_seek(cx, SeekFrom::Current(n)))?;
918 }
919 } else {
920 // Seeking with Start/End doesn't care about our buffer length.
921 result = ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos))?;
922 }
923 self.discard_buffer();
924 Poll::Ready(Ok(result))
925 }
926}
927
928impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
929 fn poll_write(
930 mut self: Pin<&mut Self>,
931 cx: &mut Context<'_>,
932 buf: &[u8],
933 ) -> Poll<Result<usize>> {
934 self.as_mut().get_pin_mut().poll_write(cx, buf)
935 }
936
937 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
938 self.as_mut().get_pin_mut().poll_flush(cx)
939 }
940
941 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
942 self.as_mut().get_pin_mut().poll_close(cx)
943 }
944}
945
946pin_project! {
947 /// Adds buffering to a writer.
948 ///
949 /// It can be excessively inefficient to work directly with something that implements
950 /// [`AsyncWrite`]. For example, every call to [`write()`][`AsyncWriteExt::write()`] on a TCP
951 /// stream results in a system call. A [`BufWriter`] keeps an in-memory buffer of data and
952 /// writes it to the underlying writer in large, infrequent batches.
953 ///
954 /// [`BufWriter`] can improve the speed of programs that make *small* and *repeated* writes to
955 /// the same file or networking socket. It does not help when writing very large amounts at
956 /// once, or writing just once or a few times. It also provides no advantage when writing to a
957 /// destination that is in memory, like a `Vec<u8>`.
958 ///
959 /// Unlike [`std::io::BufWriter`], this type does not write out the contents of its buffer when
960 /// it is dropped. Therefore, it is important that users explicitly flush the buffer before
961 /// dropping the [`BufWriter`].
962 ///
963 /// # Examples
964 ///
965 /// ```
966 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
967 ///
968 /// # spin_on::spin_on(async {
969 /// let mut output = Vec::new();
970 /// let mut writer = BufWriter::new(&mut output);
971 ///
972 /// writer.write_all(b"hello").await?;
973 /// writer.flush().await?;
974 /// # std::io::Result::Ok(()) });
975 /// ```
976 pub struct BufWriter<W> {
977 #[pin]
978 inner: W,
979 buf: Vec<u8>,
980 written: usize,
981 }
982}
983
984impl<W: AsyncWrite> BufWriter<W> {
985 /// Creates a buffered writer with the default buffer capacity.
986 ///
987 /// The default capacity is currently 8 KB, but that may change in the future.
988 ///
989 /// # Examples
990 ///
991 /// ```
992 /// use futures_lite::io::BufWriter;
993 ///
994 /// let mut output = Vec::new();
995 /// let writer = BufWriter::new(&mut output);
996 /// ```
997 pub fn new(inner: W) -> BufWriter<W> {
998 BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner)
999 }
1000
1001 /// Creates a buffered writer with the specified buffer capacity.
1002 ///
1003 /// # Examples
1004 ///
1005 /// ```
1006 /// use futures_lite::io::BufWriter;
1007 ///
1008 /// let mut output = Vec::new();
1009 /// let writer = BufWriter::with_capacity(100, &mut output);
1010 /// ```
1011 pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
1012 BufWriter {
1013 inner,
1014 buf: Vec::with_capacity(capacity),
1015 written: 0,
1016 }
1017 }
1018
1019 /// Gets a reference to the underlying writer.
1020 ///
1021 /// # Examples
1022 ///
1023 /// ```
1024 /// use futures_lite::io::BufWriter;
1025 ///
1026 /// let mut output = Vec::new();
1027 /// let writer = BufWriter::new(&mut output);
1028 ///
1029 /// let r = writer.get_ref();
1030 /// ```
1031 pub fn get_ref(&self) -> &W {
1032 &self.inner
1033 }
1034
1035 /// Gets a mutable reference to the underlying writer.
1036 ///
1037 /// It is not advisable to directly write to the underlying writer.
1038 ///
1039 /// # Examples
1040 ///
1041 /// ```
1042 /// use futures_lite::io::BufWriter;
1043 ///
1044 /// let mut output = Vec::new();
1045 /// let mut writer = BufWriter::new(&mut output);
1046 ///
1047 /// let r = writer.get_mut();
1048 /// ```
1049 pub fn get_mut(&mut self) -> &mut W {
1050 &mut self.inner
1051 }
1052
1053 /// Gets a pinned mutable reference to the underlying writer.
1054 ///
1055 /// It is not not advisable to directly write to the underlying writer.
1056 fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
1057 self.project().inner
1058 }
1059
1060 /// Unwraps the buffered writer, returning the underlying writer.
1061 ///
1062 /// Note that any leftover data in the internal buffer will be lost. If you don't want to lose
1063 /// that data, flush the buffered writer before unwrapping it.
1064 ///
1065 /// # Examples
1066 ///
1067 /// ```
1068 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
1069 ///
1070 /// # spin_on::spin_on(async {
1071 /// let mut output = vec![1, 2, 3];
1072 /// let mut writer = BufWriter::new(&mut output);
1073 ///
1074 /// writer.write_all(&[4]).await?;
1075 /// writer.flush().await?;
1076 /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4]);
1077 /// # std::io::Result::Ok(()) });
1078 /// ```
1079 pub fn into_inner(self) -> W {
1080 self.inner
1081 }
1082
1083 /// Returns a reference to the internal buffer.
1084 ///
1085 /// # Examples
1086 ///
1087 /// ```
1088 /// use futures_lite::io::BufWriter;
1089 ///
1090 /// let mut output = Vec::new();
1091 /// let writer = BufWriter::new(&mut output);
1092 ///
1093 /// // The internal buffer is empty until the first write request.
1094 /// assert_eq!(writer.buffer(), &[]);
1095 /// ```
1096 pub fn buffer(&self) -> &[u8] {
1097 &self.buf
1098 }
1099
1100 /// Flush the buffer.
1101 fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1102 let mut this = self.project();
1103 let len = this.buf.len();
1104 let mut ret = Ok(());
1105
1106 while *this.written < len {
1107 match this
1108 .inner
1109 .as_mut()
1110 .poll_write(cx, &this.buf[*this.written..])
1111 {
1112 Poll::Ready(Ok(0)) => {
1113 ret = Err(Error::new(
1114 ErrorKind::WriteZero,
1115 "Failed to write buffered data",
1116 ));
1117 break;
1118 }
1119 Poll::Ready(Ok(n)) => *this.written += n,
1120 Poll::Ready(Err(ref e)) if e.kind() == ErrorKind::Interrupted => {}
1121 Poll::Ready(Err(e)) => {
1122 ret = Err(e);
1123 break;
1124 }
1125 Poll::Pending => return Poll::Pending,
1126 }
1127 }
1128
1129 if *this.written > 0 {
1130 this.buf.drain(..*this.written);
1131 }
1132 *this.written = 0;
1133
1134 Poll::Ready(ret)
1135 }
1136}
1137
1138impl<W: fmt::Debug> fmt::Debug for BufWriter<W> {
1139 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1140 f.debug_struct("BufWriter")
1141 .field("writer", &self.inner)
1142 .field("buf", &self.buf)
1143 .finish()
1144 }
1145}
1146
1147impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
1148 fn poll_write(
1149 mut self: Pin<&mut Self>,
1150 cx: &mut Context<'_>,
1151 buf: &[u8],
1152 ) -> Poll<Result<usize>> {
1153 if self.buf.len() + buf.len() > self.buf.capacity() {
1154 ready!(self.as_mut().poll_flush_buf(cx))?;
1155 }
1156 if buf.len() >= self.buf.capacity() {
1157 self.get_pin_mut().poll_write(cx, buf)
1158 } else {
1159 Pin::new(&mut *self.project().buf).poll_write(cx, buf)
1160 }
1161 }
1162
1163 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1164 ready!(self.as_mut().poll_flush_buf(cx))?;
1165 self.get_pin_mut().poll_flush(cx)
1166 }
1167
1168 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1169 ready!(self.as_mut().poll_flush_buf(cx))?;
1170 self.get_pin_mut().poll_close(cx)
1171 }
1172}
1173
1174impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
1175 /// Seek to the offset, in bytes, in the underlying writer.
1176 ///
1177 /// Seeking always writes out the internal buffer before seeking.
1178 fn poll_seek(
1179 mut self: Pin<&mut Self>,
1180 cx: &mut Context<'_>,
1181 pos: SeekFrom,
1182 ) -> Poll<Result<u64>> {
1183 ready!(self.as_mut().poll_flush_buf(cx))?;
1184 self.get_pin_mut().poll_seek(cx, pos)
1185 }
1186}
1187
1188/// Gives an in-memory buffer a cursor for reading and writing.
1189///
1190/// # Examples
1191///
1192/// ```
1193/// use futures_lite::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Cursor, SeekFrom};
1194///
1195/// # spin_on::spin_on(async {
1196/// let mut bytes = b"hello".to_vec();
1197/// let mut cursor = Cursor::new(&mut bytes);
1198///
1199/// // Overwrite 'h' with 'H'.
1200/// cursor.write_all(b"H").await?;
1201///
1202/// // Move the cursor one byte forward.
1203/// cursor.seek(SeekFrom::Current(1)).await?;
1204///
1205/// // Read a byte.
1206/// let mut byte = [0];
1207/// cursor.read_exact(&mut byte).await?;
1208/// assert_eq!(&byte, b"l");
1209///
1210/// // Check the final buffer.
1211/// assert_eq!(bytes, b"Hello");
1212/// # std::io::Result::Ok(()) });
1213/// ```
1214#[derive(Clone, Debug, Default)]
1215pub struct Cursor<T> {
1216 inner: std::io::Cursor<T>,
1217}
1218
1219impl<T> Cursor<T> {
1220 /// Creates a cursor for an in-memory buffer.
1221 ///
1222 /// Cursor's initial position is 0 even if the underlying buffer is not empty. Writing using
1223 /// [`Cursor`] will overwrite the existing contents unless the cursor is moved to the end of
1224 /// the buffer using [`set_position()`][Cursor::set_position()`] or
1225 /// [`seek()`][`AsyncSeekExt::seek()`].
1226 ///
1227 /// # Examples
1228 ///
1229 /// ```
1230 /// use futures_lite::io::Cursor;
1231 ///
1232 /// let cursor = Cursor::new(Vec::<u8>::new());
1233 /// ```
1234 pub fn new(inner: T) -> Cursor<T> {
1235 Cursor {
1236 inner: std::io::Cursor::new(inner),
1237 }
1238 }
1239
1240 /// Gets a reference to the underlying buffer.
1241 ///
1242 /// # Examples
1243 ///
1244 /// ```
1245 /// use futures_lite::io::Cursor;
1246 ///
1247 /// let cursor = Cursor::new(Vec::<u8>::new());
1248 /// let r = cursor.get_ref();
1249 /// ```
1250 pub fn get_ref(&self) -> &T {
1251 self.inner.get_ref()
1252 }
1253
1254 /// Gets a mutable reference to the underlying buffer.
1255 ///
1256 /// # Examples
1257 ///
1258 /// ```
1259 /// use futures_lite::io::Cursor;
1260 ///
1261 /// let mut cursor = Cursor::new(Vec::<u8>::new());
1262 /// let r = cursor.get_mut();
1263 /// ```
1264 pub fn get_mut(&mut self) -> &mut T {
1265 self.inner.get_mut()
1266 }
1267
1268 /// Unwraps the cursor, returning the underlying buffer.
1269 ///
1270 /// # Examples
1271 ///
1272 /// ```
1273 /// use futures_lite::io::Cursor;
1274 ///
1275 /// let cursor = Cursor::new(vec![1, 2, 3]);
1276 /// assert_eq!(cursor.into_inner(), [1, 2, 3]);
1277 /// ```
1278 pub fn into_inner(self) -> T {
1279 self.inner.into_inner()
1280 }
1281
1282 /// Returns the current position of this cursor.
1283 ///
1284 /// # Examples
1285 ///
1286 /// ```
1287 /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
1288 ///
1289 /// # spin_on::spin_on(async {
1290 /// let mut cursor = Cursor::new(b"hello");
1291 /// assert_eq!(cursor.position(), 0);
1292 ///
1293 /// cursor.seek(SeekFrom::Start(2)).await?;
1294 /// assert_eq!(cursor.position(), 2);
1295 /// # std::io::Result::Ok(()) });
1296 /// ```
1297 pub fn position(&self) -> u64 {
1298 self.inner.position()
1299 }
1300
1301 /// Sets the position of this cursor.
1302 ///
1303 /// # Examples
1304 ///
1305 /// ```
1306 /// use futures_lite::io::Cursor;
1307 ///
1308 /// let mut cursor = Cursor::new(b"hello");
1309 /// assert_eq!(cursor.position(), 0);
1310 ///
1311 /// cursor.set_position(2);
1312 /// assert_eq!(cursor.position(), 2);
1313 /// ```
1314 pub fn set_position(&mut self, pos: u64) {
1315 self.inner.set_position(pos)
1316 }
1317}
1318
1319impl<T> AsyncSeek for Cursor<T>
1320where
1321 T: AsRef<[u8]> + Unpin,
1322{
1323 fn poll_seek(
1324 mut self: Pin<&mut Self>,
1325 _: &mut Context<'_>,
1326 pos: SeekFrom,
1327 ) -> Poll<Result<u64>> {
1328 Poll::Ready(std::io::Seek::seek(&mut self.inner, pos))
1329 }
1330}
1331
1332impl<T> AsyncRead for Cursor<T>
1333where
1334 T: AsRef<[u8]> + Unpin,
1335{
1336 fn poll_read(
1337 mut self: Pin<&mut Self>,
1338 _cx: &mut Context<'_>,
1339 buf: &mut [u8],
1340 ) -> Poll<Result<usize>> {
1341 Poll::Ready(std::io::Read::read(&mut self.inner, buf))
1342 }
1343
1344 fn poll_read_vectored(
1345 mut self: Pin<&mut Self>,
1346 _: &mut Context<'_>,
1347 bufs: &mut [IoSliceMut<'_>],
1348 ) -> Poll<Result<usize>> {
1349 Poll::Ready(std::io::Read::read_vectored(&mut self.inner, bufs))
1350 }
1351}
1352
1353impl<T> AsyncBufRead for Cursor<T>
1354where
1355 T: AsRef<[u8]> + Unpin,
1356{
1357 fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
1358 Poll::Ready(std::io::BufRead::fill_buf(&mut self.get_mut().inner))
1359 }
1360
1361 fn consume(mut self: Pin<&mut Self>, amt: usize) {
1362 std::io::BufRead::consume(&mut self.inner, amt)
1363 }
1364}
1365
1366impl AsyncWrite for Cursor<&mut [u8]> {
1367 fn poll_write(
1368 mut self: Pin<&mut Self>,
1369 _: &mut Context<'_>,
1370 buf: &[u8],
1371 ) -> Poll<Result<usize>> {
1372 Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1373 }
1374
1375 fn poll_write_vectored(
1376 mut self: Pin<&mut Self>,
1377 _: &mut Context<'_>,
1378 bufs: &[IoSlice<'_>],
1379 ) -> Poll<Result<usize>> {
1380 Poll::Ready(std::io::Write::write_vectored(&mut self.inner, bufs))
1381 }
1382
1383 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1384 Poll::Ready(std::io::Write::flush(&mut self.inner))
1385 }
1386
1387 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1388 self.poll_flush(cx)
1389 }
1390}
1391
1392impl AsyncWrite for Cursor<&mut Vec<u8>> {
1393 fn poll_write(
1394 mut self: Pin<&mut Self>,
1395 _: &mut Context<'_>,
1396 buf: &[u8],
1397 ) -> Poll<Result<usize>> {
1398 Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1399 }
1400
1401 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1402 self.poll_flush(cx)
1403 }
1404
1405 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1406 Poll::Ready(std::io::Write::flush(&mut self.inner))
1407 }
1408}
1409
1410impl AsyncWrite for Cursor<Vec<u8>> {
1411 fn poll_write(
1412 mut self: Pin<&mut Self>,
1413 _: &mut Context<'_>,
1414 buf: &[u8],
1415 ) -> Poll<Result<usize>> {
1416 Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1417 }
1418
1419 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1420 self.poll_flush(cx)
1421 }
1422
1423 fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1424 Poll::Ready(std::io::Write::flush(&mut self.inner))
1425 }
1426}
1427
1428/// Creates an empty reader.
1429///
1430/// # Examples
1431///
1432/// ```
1433/// use futures_lite::io::{self, AsyncReadExt};
1434///
1435/// # spin_on::spin_on(async {
1436/// let mut reader = io::empty();
1437///
1438/// let mut contents = Vec::new();
1439/// reader.read_to_end(&mut contents).await?;
1440/// assert!(contents.is_empty());
1441/// # std::io::Result::Ok(()) });
1442/// ```
1443pub fn empty() -> Empty {
1444 Empty { _private: () }
1445}
1446
1447/// Reader for the [`empty()`] function.
1448pub struct Empty {
1449 _private: (),
1450}
1451
1452impl fmt::Debug for Empty {
1453 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1454 f.pad("Empty { .. }")
1455 }
1456}
1457
1458impl AsyncRead for Empty {
1459 #[inline]
1460 fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<Result<usize>> {
1461 Poll::Ready(Ok(0))
1462 }
1463}
1464
1465impl AsyncBufRead for Empty {
1466 #[inline]
1467 fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
1468 Poll::Ready(Ok(&[]))
1469 }
1470
1471 #[inline]
1472 fn consume(self: Pin<&mut Self>, _: usize) {}
1473}
1474
1475/// Creates an infinite reader that reads the same byte repeatedly.
1476///
1477/// # Examples
1478///
1479/// ```
1480/// use futures_lite::io::{self, AsyncReadExt};
1481///
1482/// # spin_on::spin_on(async {
1483/// let mut reader = io::repeat(b'a');
1484///
1485/// let mut contents = vec![0; 5];
1486/// reader.read_exact(&mut contents).await?;
1487/// assert_eq!(contents, b"aaaaa");
1488/// # std::io::Result::Ok(()) });
1489/// ```
1490pub fn repeat(byte: u8) -> Repeat {
1491 Repeat { byte }
1492}
1493
1494/// Reader for the [`repeat()`] function.
1495#[derive(Debug)]
1496pub struct Repeat {
1497 byte: u8,
1498}
1499
1500impl AsyncRead for Repeat {
1501 #[inline]
1502 fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
1503 for b in &mut *buf {
1504 *b = self.byte;
1505 }
1506 Poll::Ready(Ok(buf.len()))
1507 }
1508}
1509
1510/// Creates a writer that consumes and drops all data.
1511///
1512/// # Examples
1513///
1514/// ```
1515/// use futures_lite::io::{self, AsyncWriteExt};
1516///
1517/// # spin_on::spin_on(async {
1518/// let mut writer = io::sink();
1519/// writer.write_all(b"hello").await?;
1520/// # std::io::Result::Ok(()) });
1521/// ```
1522pub fn sink() -> Sink {
1523 Sink { _private: () }
1524}
1525
1526/// Writer for the [`sink()`] function.
1527#[derive(Debug)]
1528pub struct Sink {
1529 _private: (),
1530}
1531
1532impl AsyncWrite for Sink {
1533 #[inline]
1534 fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
1535 Poll::Ready(Ok(buf.len()))
1536 }
1537
1538 #[inline]
1539 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1540 Poll::Ready(Ok(()))
1541 }
1542
1543 #[inline]
1544 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1545 Poll::Ready(Ok(()))
1546 }
1547}
1548
1549/// Extension trait for [`AsyncBufRead`].
1550pub trait AsyncBufReadExt: AsyncBufRead {
1551 /// Returns the contents of the internal buffer, filling it with more data if empty.
1552 ///
1553 /// If the stream has reached EOF, an empty buffer will be returned.
1554 ///
1555 /// # Examples
1556 ///
1557 /// ```
1558 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1559 /// use std::pin::Pin;
1560 ///
1561 /// # spin_on::spin_on(async {
1562 /// let input: &[u8] = b"hello world";
1563 /// let mut reader = BufReader::with_capacity(5, input);
1564 ///
1565 /// assert_eq!(reader.fill_buf().await?, b"hello");
1566 /// reader.consume(2);
1567 /// assert_eq!(reader.fill_buf().await?, b"llo");
1568 /// reader.consume(3);
1569 /// assert_eq!(reader.fill_buf().await?, b" worl");
1570 /// # std::io::Result::Ok(()) });
1571 /// ```
1572 fn fill_buf(&mut self) -> FillBuf<'_, Self>
1573 where
1574 Self: Unpin,
1575 {
1576 FillBuf { reader: Some(self) }
1577 }
1578
1579 /// Consumes `amt` buffered bytes.
1580 ///
1581 /// This method does not perform any I/O, it simply consumes some amount of bytes from the
1582 /// internal buffer.
1583 ///
1584 /// The `amt` must be <= the number of bytes in the buffer returned by
1585 /// [`fill_buf()`][`AsyncBufReadExt::fill_buf()`].
1586 ///
1587 /// # Examples
1588 ///
1589 /// ```
1590 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1591 /// use std::pin::Pin;
1592 ///
1593 /// # spin_on::spin_on(async {
1594 /// let input: &[u8] = b"hello";
1595 /// let mut reader = BufReader::with_capacity(4, input);
1596 ///
1597 /// assert_eq!(reader.fill_buf().await?, b"hell");
1598 /// reader.consume(2);
1599 /// assert_eq!(reader.fill_buf().await?, b"ll");
1600 /// # std::io::Result::Ok(()) });
1601 /// ```
1602 fn consume(&mut self, amt: usize)
1603 where
1604 Self: Unpin,
1605 {
1606 AsyncBufRead::consume(Pin::new(self), amt);
1607 }
1608
1609 /// Reads all bytes and appends them into `buf` until the delimiter `byte` or EOF is found.
1610 ///
1611 /// This method will read bytes from the underlying stream until the delimiter or EOF is
1612 /// found. All bytes up to and including the delimiter (if found) will be appended to `buf`.
1613 ///
1614 /// If successful, returns the total number of bytes read.
1615 ///
1616 /// # Examples
1617 ///
1618 /// ```
1619 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1620 ///
1621 /// # spin_on::spin_on(async {
1622 /// let input: &[u8] = b"hello";
1623 /// let mut reader = BufReader::new(input);
1624 ///
1625 /// let mut buf = Vec::new();
1626 /// let n = reader.read_until(b'\n', &mut buf).await?;
1627 /// # std::io::Result::Ok(()) });
1628 /// ```
1629 fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntilFuture<'a, Self>
1630 where
1631 Self: Unpin,
1632 {
1633 ReadUntilFuture {
1634 reader: self,
1635 byte,
1636 buf,
1637 read: 0,
1638 }
1639 }
1640
1641 /// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) or EOF is found.
1642 ///
1643 /// This method will read bytes from the underlying stream until the newline delimiter (the
1644 /// 0xA byte) or EOF is found. All bytes up to, and including, the newline delimiter (if found)
1645 /// will be appended to `buf`.
1646 ///
1647 /// If successful, returns the total number of bytes read.
1648 ///
1649 /// # Examples
1650 ///
1651 /// ```
1652 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1653 ///
1654 /// # spin_on::spin_on(async {
1655 /// let input: &[u8] = b"hello";
1656 /// let mut reader = BufReader::new(input);
1657 ///
1658 /// let mut line = String::new();
1659 /// let n = reader.read_line(&mut line).await?;
1660 /// # std::io::Result::Ok(()) });
1661 /// ```
1662 fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineFuture<'a, Self>
1663 where
1664 Self: Unpin,
1665 {
1666 ReadLineFuture {
1667 reader: self,
1668 buf,
1669 bytes: Vec::new(),
1670 read: 0,
1671 }
1672 }
1673
1674 /// Returns a stream over the lines of this byte stream.
1675 ///
1676 /// The stream returned from this method yields items of type
1677 /// [`io::Result`][`super::io::Result`]`<`[`String`]`>`.
1678 /// Each string returned will *not* have a newline byte (the 0xA byte) or CRLF (0xD, 0xA bytes)
1679 /// at the end.
1680 ///
1681 /// # Examples
1682 ///
1683 /// ```
1684 /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1685 /// use futures_lite::stream::StreamExt;
1686 ///
1687 /// # spin_on::spin_on(async {
1688 /// let input: &[u8] = b"hello\nworld\n";
1689 /// let mut reader = BufReader::new(input);
1690 /// let mut lines = reader.lines();
1691 ///
1692 /// while let Some(line) = lines.next().await {
1693 /// println!("{}", line?);
1694 /// }
1695 /// # std::io::Result::Ok(()) });
1696 /// ```
1697 fn lines(self) -> Lines<Self>
1698 where
1699 Self: Sized,
1700 {
1701 Lines {
1702 reader: self,
1703 buf: String::new(),
1704 bytes: Vec::new(),
1705 read: 0,
1706 }
1707 }
1708
1709 /// Returns a stream over the contents of this reader split on the specified `byte`.
1710 ///
1711 /// The stream returned from this method yields items of type
1712 /// [`io::Result`][`super::io::Result`]`<`[`Vec<u8>`][`Vec`]`>`.
1713 /// Each vector returned will *not* have the delimiter byte at the end.
1714 ///
1715 /// # Examples
1716 ///
1717 /// ```
1718 /// use futures_lite::io::{AsyncBufReadExt, Cursor};
1719 /// use futures_lite::stream::StreamExt;
1720 ///
1721 /// # spin_on::spin_on(async {
1722 /// let cursor = Cursor::new(b"lorem-ipsum-dolor");
1723 /// let items: Vec<Vec<u8>> = cursor.split(b'-').try_collect().await?;
1724 ///
1725 /// assert_eq!(items[0], b"lorem");
1726 /// assert_eq!(items[1], b"ipsum");
1727 /// assert_eq!(items[2], b"dolor");
1728 /// # std::io::Result::Ok(()) });
1729 /// ```
1730 fn split(self, byte: u8) -> Split<Self>
1731 where
1732 Self: Sized,
1733 {
1734 Split {
1735 reader: self,
1736 buf: Vec::new(),
1737 delim: byte,
1738 read: 0,
1739 }
1740 }
1741}
1742
1743impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
1744
1745/// Future for the [`AsyncBufReadExt::fill_buf()`] method.
1746#[derive(Debug)]
1747#[must_use = "futures do nothing unless you `.await` or poll them"]
1748pub struct FillBuf<'a, R: ?Sized> {
1749 reader: Option<&'a mut R>,
1750}
1751
1752impl<R: ?Sized> Unpin for FillBuf<'_, R> {}
1753
1754impl<'a, R> Future for FillBuf<'a, R>
1755where
1756 R: AsyncBufRead + Unpin + ?Sized,
1757{
1758 type Output = Result<&'a [u8]>;
1759
1760 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1761 let this = &mut *self;
1762 let reader = this
1763 .reader
1764 .take()
1765 .expect("polled `FillBuf` after completion");
1766
1767 match Pin::new(&mut *reader).poll_fill_buf(cx) {
1768 Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
1769 Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
1770 poll => panic!("`poll_fill_buf()` was ready but now it isn't: {:?}", poll),
1771 },
1772 Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1773 Poll::Pending => {
1774 this.reader = Some(reader);
1775 Poll::Pending
1776 }
1777 }
1778 }
1779}
1780
1781/// Future for the [`AsyncBufReadExt::read_until()`] method.
1782#[derive(Debug)]
1783#[must_use = "futures do nothing unless you `.await` or poll them"]
1784pub struct ReadUntilFuture<'a, R: Unpin + ?Sized> {
1785 reader: &'a mut R,
1786 byte: u8,
1787 buf: &'a mut Vec<u8>,
1788 read: usize,
1789}
1790
1791impl<R: Unpin + ?Sized> Unpin for ReadUntilFuture<'_, R> {}
1792
1793impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, R> {
1794 type Output = Result<usize>;
1795
1796 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1797 let Self {
1798 reader,
1799 byte,
1800 buf,
1801 read,
1802 } = &mut *self;
1803 read_until_internal(Pin::new(reader), cx, *byte, buf, read)
1804 }
1805}
1806
1807fn read_until_internal<R: AsyncBufReadExt + ?Sized>(
1808 mut reader: Pin<&mut R>,
1809 cx: &mut Context<'_>,
1810 byte: u8,
1811 buf: &mut Vec<u8>,
1812 read: &mut usize,
1813) -> Poll<Result<usize>> {
1814 loop {
1815 let (done, used) = {
1816 let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
1817
1818 if let Some(i) = memchr(byte, available) {
1819 buf.extend_from_slice(&available[..=i]);
1820 (true, i + 1)
1821 } else {
1822 buf.extend_from_slice(available);
1823 (false, available.len())
1824 }
1825 };
1826
1827 reader.as_mut().consume(used);
1828 *read += used;
1829
1830 if done || used == 0 {
1831 return Poll::Ready(Ok(mem::replace(read, 0)));
1832 }
1833 }
1834}
1835
1836/// Future for the [`AsyncBufReadExt::read_line()`] method.
1837#[derive(Debug)]
1838#[must_use = "futures do nothing unless you `.await` or poll them"]
1839pub struct ReadLineFuture<'a, R: Unpin + ?Sized> {
1840 reader: &'a mut R,
1841 buf: &'a mut String,
1842 bytes: Vec<u8>,
1843 read: usize,
1844}
1845
1846impl<R: Unpin + ?Sized> Unpin for ReadLineFuture<'_, R> {}
1847
1848impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, R> {
1849 type Output = Result<usize>;
1850
1851 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1852 let Self {
1853 reader,
1854 buf,
1855 bytes,
1856 read,
1857 } = &mut *self;
1858 read_line_internal(Pin::new(reader), cx, buf, bytes, read)
1859 }
1860}
1861
1862pin_project! {
1863 /// Stream for the [`AsyncBufReadExt::lines()`] method.
1864 #[derive(Debug)]
1865 #[must_use = "streams do nothing unless polled"]
1866 pub struct Lines<R> {
1867 #[pin]
1868 reader: R,
1869 buf: String,
1870 bytes: Vec<u8>,
1871 read: usize,
1872 }
1873}
1874
1875impl<R: AsyncBufRead> Stream for Lines<R> {
1876 type Item = Result<String>;
1877
1878 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1879 let this = self.project();
1880
1881 let n = ready!(read_line_internal(
1882 this.reader,
1883 cx,
1884 this.buf,
1885 this.bytes,
1886 this.read
1887 ))?;
1888 if n == 0 && this.buf.is_empty() {
1889 return Poll::Ready(None);
1890 }
1891
1892 if this.buf.ends_with('\n') {
1893 this.buf.pop();
1894 if this.buf.ends_with('\r') {
1895 this.buf.pop();
1896 }
1897 }
1898 Poll::Ready(Some(Ok(mem::take(this.buf))))
1899 }
1900}
1901
1902fn read_line_internal<R: AsyncBufRead + ?Sized>(
1903 reader: Pin<&mut R>,
1904 cx: &mut Context<'_>,
1905 buf: &mut String,
1906 bytes: &mut Vec<u8>,
1907 read: &mut usize,
1908) -> Poll<Result<usize>> {
1909 let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
1910
1911 match String::from_utf8(mem::take(bytes)) {
1912 Ok(s) => {
1913 debug_assert!(buf.is_empty());
1914 debug_assert_eq!(*read, 0);
1915 *buf = s;
1916 Poll::Ready(ret)
1917 }
1918 Err(_) => Poll::Ready(ret.and_then(|_| {
1919 Err(Error::new(
1920 ErrorKind::InvalidData,
1921 "stream did not contain valid UTF-8",
1922 ))
1923 })),
1924 }
1925}
1926
1927pin_project! {
1928 /// Stream for the [`AsyncBufReadExt::split()`] method.
1929 #[derive(Debug)]
1930 #[must_use = "streams do nothing unless polled"]
1931 pub struct Split<R> {
1932 #[pin]
1933 reader: R,
1934 buf: Vec<u8>,
1935 read: usize,
1936 delim: u8,
1937 }
1938}
1939
1940impl<R: AsyncBufRead> Stream for Split<R> {
1941 type Item = Result<Vec<u8>>;
1942
1943 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1944 let this = self.project();
1945
1946 let n = ready!(read_until_internal(
1947 this.reader,
1948 cx,
1949 *this.delim,
1950 this.buf,
1951 this.read
1952 ))?;
1953 if n == 0 && this.buf.is_empty() {
1954 return Poll::Ready(None);
1955 }
1956
1957 if this.buf[this.buf.len() - 1] == *this.delim {
1958 this.buf.pop();
1959 }
1960 Poll::Ready(Some(Ok(mem::take(this.buf))))
1961 }
1962}
1963
1964/// Extension trait for [`AsyncRead`].
1965pub trait AsyncReadExt: AsyncRead {
1966 /// Reads some bytes from the byte stream.
1967 ///
1968 /// On success, returns the total number of bytes read.
1969 ///
1970 /// If the return value is `Ok(n)`, then it must be guaranteed that
1971 /// `0 <= n <= buf.len()`. A nonzero `n` value indicates that the buffer has been
1972 /// filled with `n` bytes of data. If `n` is `0`, then it can indicate one of two
1973 /// scenarios:
1974 ///
1975 /// 1. This reader has reached its "end of file" and will likely no longer be able to
1976 /// produce bytes. Note that this does not mean that the reader will always no
1977 /// longer be able to produce bytes.
1978 /// 2. The buffer specified was 0 bytes in length.
1979 ///
1980 /// # Examples
1981 ///
1982 /// ```
1983 /// use futures_lite::io::{AsyncReadExt, BufReader};
1984 ///
1985 /// # spin_on::spin_on(async {
1986 /// let input: &[u8] = b"hello";
1987 /// let mut reader = BufReader::new(input);
1988 ///
1989 /// let mut buf = vec![0; 1024];
1990 /// let n = reader.read(&mut buf).await?;
1991 /// # std::io::Result::Ok(()) });
1992 /// ```
1993 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadFuture<'a, Self>
1994 where
1995 Self: Unpin,
1996 {
1997 ReadFuture { reader: self, buf }
1998 }
1999
2000 /// Like [`read()`][`AsyncReadExt::read()`], except it reads into a slice of buffers.
2001 ///
2002 /// Data is copied to fill each buffer in order, with the final buffer possibly being
2003 /// only partially filled. This method must behave same as a single call to
2004 /// [`read()`][`AsyncReadExt::read()`] with the buffers concatenated would.
2005 fn read_vectored<'a>(
2006 &'a mut self,
2007 bufs: &'a mut [IoSliceMut<'a>],
2008 ) -> ReadVectoredFuture<'a, Self>
2009 where
2010 Self: Unpin,
2011 {
2012 ReadVectoredFuture { reader: self, bufs }
2013 }
2014
2015 /// Reads the entire contents and appends them to a [`Vec`].
2016 ///
2017 /// On success, returns the total number of bytes read.
2018 ///
2019 /// # Examples
2020 ///
2021 /// ```
2022 /// use futures_lite::io::{AsyncReadExt, Cursor};
2023 ///
2024 /// # spin_on::spin_on(async {
2025 /// let mut reader = Cursor::new(vec![1, 2, 3]);
2026 /// let mut contents = Vec::new();
2027 ///
2028 /// let n = reader.read_to_end(&mut contents).await?;
2029 /// assert_eq!(n, 3);
2030 /// assert_eq!(contents, [1, 2, 3]);
2031 /// # std::io::Result::Ok(()) });
2032 /// ```
2033 fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self>
2034 where
2035 Self: Unpin,
2036 {
2037 let start_len = buf.len();
2038 ReadToEndFuture {
2039 reader: self,
2040 buf,
2041 start_len,
2042 }
2043 }
2044
2045 /// Reads the entire contents and appends them to a [`String`].
2046 ///
2047 /// On success, returns the total number of bytes read.
2048 ///
2049 /// # Examples
2050 ///
2051 /// ```
2052 /// use futures_lite::io::{AsyncReadExt, Cursor};
2053 ///
2054 /// # spin_on::spin_on(async {
2055 /// let mut reader = Cursor::new(&b"hello");
2056 /// let mut contents = String::new();
2057 ///
2058 /// let n = reader.read_to_string(&mut contents).await?;
2059 /// assert_eq!(n, 5);
2060 /// assert_eq!(contents, "hello");
2061 /// # std::io::Result::Ok(()) });
2062 /// ```
2063 fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToStringFuture<'a, Self>
2064 where
2065 Self: Unpin,
2066 {
2067 ReadToStringFuture {
2068 reader: self,
2069 buf,
2070 bytes: Vec::new(),
2071 start_len: 0,
2072 }
2073 }
2074
2075 /// Reads the exact number of bytes required to fill `buf`.
2076 ///
2077 /// # Examples
2078 ///
2079 /// ```
2080 /// use futures_lite::io::{AsyncReadExt, Cursor};
2081 ///
2082 /// # spin_on::spin_on(async {
2083 /// let mut reader = Cursor::new(&b"hello");
2084 /// let mut contents = vec![0; 3];
2085 ///
2086 /// reader.read_exact(&mut contents).await?;
2087 /// assert_eq!(contents, b"hel");
2088 /// # std::io::Result::Ok(()) });
2089 /// ```
2090 fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExactFuture<'a, Self>
2091 where
2092 Self: Unpin,
2093 {
2094 ReadExactFuture { reader: self, buf }
2095 }
2096
2097 /// Creates an adapter which will read at most `limit` bytes from it.
2098 ///
2099 /// This method returns a new instance of [`AsyncRead`] which will read at most
2100 /// `limit` bytes, after which it will always return `Ok(0)` indicating EOF.
2101 ///
2102 /// # Examples
2103 ///
2104 /// ```
2105 /// use futures_lite::io::{AsyncReadExt, Cursor};
2106 ///
2107 /// # spin_on::spin_on(async {
2108 /// let mut reader = Cursor::new(&b"hello");
2109 /// let mut contents = String::new();
2110 ///
2111 /// let n = reader.take(3).read_to_string(&mut contents).await?;
2112 /// assert_eq!(n, 3);
2113 /// assert_eq!(contents, "hel");
2114 /// # std::io::Result::Ok(()) });
2115 /// ```
2116 fn take(self, limit: u64) -> Take<Self>
2117 where
2118 Self: Sized,
2119 {
2120 Take { inner: self, limit }
2121 }
2122
2123 /// Converts this [`AsyncRead`] into a [`Stream`] of bytes.
2124 ///
2125 /// The returned type implements [`Stream`] where `Item` is `io::Result<u8>`.
2126 ///
2127 /// ```
2128 /// use futures_lite::io::{AsyncReadExt, Cursor};
2129 /// use futures_lite::stream::StreamExt;
2130 ///
2131 /// # spin_on::spin_on(async {
2132 /// let reader = Cursor::new(&b"hello");
2133 /// let mut bytes = reader.bytes();
2134 ///
2135 /// while let Some(byte) = bytes.next().await {
2136 /// println!("byte: {}", byte?);
2137 /// }
2138 /// # std::io::Result::Ok(()) });
2139 /// ```
2140 fn bytes(self) -> Bytes<Self>
2141 where
2142 Self: Sized,
2143 {
2144 Bytes { inner: self }
2145 }
2146
2147 /// Creates an adapter which will chain this stream with another.
2148 ///
2149 /// The returned [`AsyncRead`] instance will first read all bytes from this reader
2150 /// until EOF is found, and then continue with `next`.
2151 ///
2152 /// # Examples
2153 ///
2154 /// ```
2155 /// use futures_lite::io::{AsyncReadExt, Cursor};
2156 ///
2157 /// # spin_on::spin_on(async {
2158 /// let r1 = Cursor::new(&b"hello");
2159 /// let r2 = Cursor::new(&b"world");
2160 /// let mut reader = r1.chain(r2);
2161 ///
2162 /// let mut contents = String::new();
2163 /// reader.read_to_string(&mut contents).await?;
2164 /// assert_eq!(contents, "helloworld");
2165 /// # std::io::Result::Ok(()) });
2166 /// ```
2167 fn chain<R: AsyncRead>(self, next: R) -> Chain<Self, R>
2168 where
2169 Self: Sized,
2170 {
2171 Chain {
2172 first: self,
2173 second: next,
2174 done_first: false,
2175 }
2176 }
2177
2178 /// Boxes the reader and changes its type to `dyn AsyncRead + Send + 'a`.
2179 ///
2180 /// # Examples
2181 ///
2182 /// ```
2183 /// use futures_lite::io::AsyncReadExt;
2184 ///
2185 /// let reader = [1, 2, 3].boxed_reader();
2186 /// ```
2187 #[cfg(feature = "alloc")]
2188 fn boxed_reader<'a>(self) -> Pin<Box<dyn AsyncRead + Send + 'a>>
2189 where
2190 Self: Sized + Send + 'a,
2191 {
2192 Box::pin(self)
2193 }
2194}
2195
2196impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
2197
2198/// Future for the [`AsyncReadExt::read()`] method.
2199#[derive(Debug)]
2200#[must_use = "futures do nothing unless you `.await` or poll them"]
2201pub struct ReadFuture<'a, R: Unpin + ?Sized> {
2202 reader: &'a mut R,
2203 buf: &'a mut [u8],
2204}
2205
2206impl<R: Unpin + ?Sized> Unpin for ReadFuture<'_, R> {}
2207
2208impl<R: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, R> {
2209 type Output = Result<usize>;
2210
2211 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2212 let Self { reader, buf } = &mut *self;
2213 Pin::new(reader).poll_read(cx, buf)
2214 }
2215}
2216
2217/// Future for the [`AsyncReadExt::read_vectored()`] method.
2218#[derive(Debug)]
2219#[must_use = "futures do nothing unless you `.await` or poll them"]
2220pub struct ReadVectoredFuture<'a, R: Unpin + ?Sized> {
2221 reader: &'a mut R,
2222 bufs: &'a mut [IoSliceMut<'a>],
2223}
2224
2225impl<R: Unpin + ?Sized> Unpin for ReadVectoredFuture<'_, R> {}
2226
2227impl<R: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, R> {
2228 type Output = Result<usize>;
2229
2230 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2231 let Self { reader, bufs } = &mut *self;
2232 Pin::new(reader).poll_read_vectored(cx, bufs)
2233 }
2234}
2235
2236/// Future for the [`AsyncReadExt::read_to_end()`] method.
2237#[derive(Debug)]
2238#[must_use = "futures do nothing unless you `.await` or poll them"]
2239pub struct ReadToEndFuture<'a, R: Unpin + ?Sized> {
2240 reader: &'a mut R,
2241 buf: &'a mut Vec<u8>,
2242 start_len: usize,
2243}
2244
2245impl<R: Unpin + ?Sized> Unpin for ReadToEndFuture<'_, R> {}
2246
2247impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, R> {
2248 type Output = Result<usize>;
2249
2250 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2251 let Self {
2252 reader,
2253 buf,
2254 start_len,
2255 } = &mut *self;
2256 read_to_end_internal(Pin::new(reader), cx, buf, *start_len)
2257 }
2258}
2259
2260/// Future for the [`AsyncReadExt::read_to_string()`] method.
2261#[derive(Debug)]
2262#[must_use = "futures do nothing unless you `.await` or poll them"]
2263pub struct ReadToStringFuture<'a, R: Unpin + ?Sized> {
2264 reader: &'a mut R,
2265 buf: &'a mut String,
2266 bytes: Vec<u8>,
2267 start_len: usize,
2268}
2269
2270impl<R: Unpin + ?Sized> Unpin for ReadToStringFuture<'_, R> {}
2271
2272impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, R> {
2273 type Output = Result<usize>;
2274
2275 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2276 let Self {
2277 reader,
2278 buf,
2279 bytes,
2280 start_len,
2281 } = &mut *self;
2282 let reader = Pin::new(reader);
2283
2284 let ret = ready!(read_to_end_internal(reader, cx, bytes, *start_len));
2285
2286 match String::from_utf8(mem::take(bytes)) {
2287 Ok(s) => {
2288 debug_assert!(buf.is_empty());
2289 **buf = s;
2290 Poll::Ready(ret)
2291 }
2292 Err(_) => Poll::Ready(ret.and_then(|_| {
2293 Err(Error::new(
2294 ErrorKind::InvalidData,
2295 "stream did not contain valid UTF-8",
2296 ))
2297 })),
2298 }
2299 }
2300}
2301
2302// This uses an adaptive system to extend the vector when it fills. We want to
2303// avoid paying to allocate and zero a huge chunk of memory if the reader only
2304// has 4 bytes while still making large reads if the reader does have a ton
2305// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
2306// time is 4,500 times (!) slower than this if the reader has a very small
2307// amount of data to return.
2308//
2309// Because we're extending the buffer with uninitialized data for trusted
2310// readers, we need to make sure to truncate that if any of this panics.
2311fn read_to_end_internal<R: AsyncRead + ?Sized>(
2312 mut rd: Pin<&mut R>,
2313 cx: &mut Context<'_>,
2314 buf: &mut Vec<u8>,
2315 start_len: usize,
2316) -> Poll<Result<usize>> {
2317 struct Guard<'a> {
2318 buf: &'a mut Vec<u8>,
2319 len: usize,
2320 }
2321
2322 impl Drop for Guard<'_> {
2323 fn drop(&mut self) {
2324 self.buf.resize(self.len, 0);
2325 }
2326 }
2327
2328 let mut g = Guard {
2329 len: buf.len(),
2330 buf,
2331 };
2332 let ret;
2333 loop {
2334 if g.len == g.buf.len() {
2335 g.buf.reserve(32);
2336 let capacity = g.buf.capacity();
2337 g.buf.resize(capacity, 0);
2338 }
2339
2340 match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
2341 Ok(0) => {
2342 ret = Poll::Ready(Ok(g.len - start_len));
2343 break;
2344 }
2345 Ok(n) => g.len += n,
2346 Err(e) => {
2347 ret = Poll::Ready(Err(e));
2348 break;
2349 }
2350 }
2351 }
2352
2353 ret
2354}
2355
2356/// Future for the [`AsyncReadExt::read_exact()`] method.
2357#[derive(Debug)]
2358#[must_use = "futures do nothing unless you `.await` or poll them"]
2359pub struct ReadExactFuture<'a, R: Unpin + ?Sized> {
2360 reader: &'a mut R,
2361 buf: &'a mut [u8],
2362}
2363
2364impl<R: Unpin + ?Sized> Unpin for ReadExactFuture<'_, R> {}
2365
2366impl<R: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, R> {
2367 type Output = Result<()>;
2368
2369 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2370 let Self { reader, buf } = &mut *self;
2371
2372 while !buf.is_empty() {
2373 let n = ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
2374 let (_, rest) = mem::take(buf).split_at_mut(n);
2375 *buf = rest;
2376
2377 if n == 0 {
2378 return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
2379 }
2380 }
2381
2382 Poll::Ready(Ok(()))
2383 }
2384}
2385
2386pin_project! {
2387 /// Reader for the [`AsyncReadExt::take()`] method.
2388 #[derive(Debug)]
2389 pub struct Take<R> {
2390 #[pin]
2391 inner: R,
2392 limit: u64,
2393 }
2394}
2395
2396impl<R> Take<R> {
2397 /// Returns the number of bytes before this adapter will return EOF.
2398 ///
2399 /// Note that EOF may be reached sooner if the underlying reader is shorter than the limit.
2400 ///
2401 /// # Examples
2402 ///
2403 /// ```
2404 /// use futures_lite::io::{AsyncReadExt, Cursor};
2405 ///
2406 /// let reader = Cursor::new("hello");
2407 ///
2408 /// let reader = reader.take(3);
2409 /// assert_eq!(reader.limit(), 3);
2410 /// ```
2411 pub fn limit(&self) -> u64 {
2412 self.limit
2413 }
2414
2415 /// Puts a limit on the number of bytes.
2416 ///
2417 /// Changing the limit is equivalent to creating a new adapter with [`AsyncReadExt::take()`].
2418 ///
2419 /// # Examples
2420 ///
2421 /// ```
2422 /// use futures_lite::io::{AsyncReadExt, Cursor};
2423 ///
2424 /// let reader = Cursor::new("hello");
2425 ///
2426 /// let mut reader = reader.take(10);
2427 /// assert_eq!(reader.limit(), 10);
2428 ///
2429 /// reader.set_limit(3);
2430 /// assert_eq!(reader.limit(), 3);
2431 /// ```
2432 pub fn set_limit(&mut self, limit: u64) {
2433 self.limit = limit;
2434 }
2435
2436 /// Gets a reference to the underlying reader.
2437 ///
2438 /// # Examples
2439 ///
2440 /// ```
2441 /// use futures_lite::io::{AsyncReadExt, Cursor};
2442 ///
2443 /// let reader = Cursor::new("hello");
2444 ///
2445 /// let reader = reader.take(3);
2446 /// let r = reader.get_ref();
2447 /// ```
2448 pub fn get_ref(&self) -> &R {
2449 &self.inner
2450 }
2451
2452 /// Gets a mutable reference to the underlying reader.
2453 ///
2454 /// # Examples
2455 ///
2456 /// ```
2457 /// use futures_lite::io::{AsyncReadExt, Cursor};
2458 ///
2459 /// let reader = Cursor::new("hello");
2460 ///
2461 /// let mut reader = reader.take(3);
2462 /// let r = reader.get_mut();
2463 /// ```
2464 pub fn get_mut(&mut self) -> &mut R {
2465 &mut self.inner
2466 }
2467
2468 /// Unwraps the adapter, returning the underlying reader.
2469 ///
2470 /// # Examples
2471 ///
2472 /// ```
2473 /// use futures_lite::io::{AsyncReadExt, Cursor};
2474 ///
2475 /// let reader = Cursor::new("hello");
2476 ///
2477 /// let reader = reader.take(3);
2478 /// let reader = reader.into_inner();
2479 /// ```
2480 pub fn into_inner(self) -> R {
2481 self.inner
2482 }
2483}
2484
2485impl<R: AsyncRead> AsyncRead for Take<R> {
2486 fn poll_read(
2487 self: Pin<&mut Self>,
2488 cx: &mut Context<'_>,
2489 buf: &mut [u8],
2490 ) -> Poll<Result<usize>> {
2491 let this = self.project();
2492 take_read_internal(this.inner, cx, buf, this.limit)
2493 }
2494}
2495
2496fn take_read_internal<R: AsyncRead + ?Sized>(
2497 mut rd: Pin<&mut R>,
2498 cx: &mut Context<'_>,
2499 buf: &mut [u8],
2500 limit: &mut u64,
2501) -> Poll<Result<usize>> {
2502 // Don't call into inner reader at all at EOF because it may still block
2503 if *limit == 0 {
2504 return Poll::Ready(Ok(0));
2505 }
2506
2507 let max = cmp::min(buf.len() as u64, *limit) as usize;
2508
2509 match ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) {
2510 Ok(n) => {
2511 *limit -= n as u64;
2512 Poll::Ready(Ok(n))
2513 }
2514 Err(e) => Poll::Ready(Err(e)),
2515 }
2516}
2517
2518impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
2519 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2520 let this = self.project();
2521
2522 if *this.limit == 0 {
2523 return Poll::Ready(Ok(&[]));
2524 }
2525
2526 match ready!(this.inner.poll_fill_buf(cx)) {
2527 Ok(buf) => {
2528 let cap = cmp::min(buf.len() as u64, *this.limit) as usize;
2529 Poll::Ready(Ok(&buf[..cap]))
2530 }
2531 Err(e) => Poll::Ready(Err(e)),
2532 }
2533 }
2534
2535 fn consume(self: Pin<&mut Self>, amt: usize) {
2536 let this = self.project();
2537 // Don't let callers reset the limit by passing an overlarge value
2538 let amt = cmp::min(amt as u64, *this.limit) as usize;
2539 *this.limit -= amt as u64;
2540
2541 this.inner.consume(amt);
2542 }
2543}
2544
2545pin_project! {
2546 /// Reader for the [`AsyncReadExt::bytes()`] method.
2547 #[derive(Debug)]
2548 pub struct Bytes<R> {
2549 #[pin]
2550 inner: R,
2551 }
2552}
2553
2554impl<R: AsyncRead + Unpin> Stream for Bytes<R> {
2555 type Item = Result<u8>;
2556
2557 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2558 let mut byte = 0;
2559
2560 let rd = Pin::new(&mut self.inner);
2561
2562 match ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) {
2563 Ok(0) => Poll::Ready(None),
2564 Ok(..) => Poll::Ready(Some(Ok(byte))),
2565 Err(ref e) if e.kind() == ErrorKind::Interrupted => Poll::Pending,
2566 Err(e) => Poll::Ready(Some(Err(e))),
2567 }
2568 }
2569}
2570
2571impl<R: AsyncRead> AsyncRead for Bytes<R> {
2572 fn poll_read(
2573 self: Pin<&mut Self>,
2574 cx: &mut Context<'_>,
2575 buf: &mut [u8],
2576 ) -> Poll<Result<usize>> {
2577 self.project().inner.poll_read(cx, buf)
2578 }
2579
2580 fn poll_read_vectored(
2581 self: Pin<&mut Self>,
2582 cx: &mut Context<'_>,
2583 bufs: &mut [IoSliceMut<'_>],
2584 ) -> Poll<Result<usize>> {
2585 self.project().inner.poll_read_vectored(cx, bufs)
2586 }
2587}
2588
2589pin_project! {
2590 /// Reader for the [`AsyncReadExt::chain()`] method.
2591 pub struct Chain<R1, R2> {
2592 #[pin]
2593 first: R1,
2594 #[pin]
2595 second: R2,
2596 done_first: bool,
2597 }
2598}
2599
2600impl<R1, R2> Chain<R1, R2> {
2601 /// Gets references to the underlying readers.
2602 ///
2603 /// # Examples
2604 ///
2605 /// ```
2606 /// use futures_lite::io::{AsyncReadExt, Cursor};
2607 ///
2608 /// let r1 = Cursor::new(b"hello");
2609 /// let r2 = Cursor::new(b"world");
2610 ///
2611 /// let reader = r1.chain(r2);
2612 /// let (r1, r2) = reader.get_ref();
2613 /// ```
2614 pub fn get_ref(&self) -> (&R1, &R2) {
2615 (&self.first, &self.second)
2616 }
2617
2618 /// Gets mutable references to the underlying readers.
2619 ///
2620 /// # Examples
2621 ///
2622 /// ```
2623 /// use futures_lite::io::{AsyncReadExt, Cursor};
2624 ///
2625 /// let r1 = Cursor::new(b"hello");
2626 /// let r2 = Cursor::new(b"world");
2627 ///
2628 /// let mut reader = r1.chain(r2);
2629 /// let (r1, r2) = reader.get_mut();
2630 /// ```
2631 pub fn get_mut(&mut self) -> (&mut R1, &mut R2) {
2632 (&mut self.first, &mut self.second)
2633 }
2634
2635 /// Unwraps the adapter, returning the underlying readers.
2636 ///
2637 /// # Examples
2638 ///
2639 /// ```
2640 /// use futures_lite::io::{AsyncReadExt, Cursor};
2641 ///
2642 /// let r1 = Cursor::new(b"hello");
2643 /// let r2 = Cursor::new(b"world");
2644 ///
2645 /// let reader = r1.chain(r2);
2646 /// let (r1, r2) = reader.into_inner();
2647 /// ```
2648 pub fn into_inner(self) -> (R1, R2) {
2649 (self.first, self.second)
2650 }
2651}
2652
2653impl<R1: fmt::Debug, R2: fmt::Debug> fmt::Debug for Chain<R1, R2> {
2654 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2655 f.debug_struct("Chain")
2656 .field("r1", &self.first)
2657 .field("r2", &self.second)
2658 .finish()
2659 }
2660}
2661
2662impl<R1: AsyncRead, R2: AsyncRead> AsyncRead for Chain<R1, R2> {
2663 fn poll_read(
2664 self: Pin<&mut Self>,
2665 cx: &mut Context<'_>,
2666 buf: &mut [u8],
2667 ) -> Poll<Result<usize>> {
2668 let this = self.project();
2669 if !*this.done_first {
2670 match ready!(this.first.poll_read(cx, buf)) {
2671 Ok(0) if !buf.is_empty() => *this.done_first = true,
2672 Ok(n) => return Poll::Ready(Ok(n)),
2673 Err(err) => return Poll::Ready(Err(err)),
2674 }
2675 }
2676
2677 this.second.poll_read(cx, buf)
2678 }
2679
2680 fn poll_read_vectored(
2681 self: Pin<&mut Self>,
2682 cx: &mut Context<'_>,
2683 bufs: &mut [IoSliceMut<'_>],
2684 ) -> Poll<Result<usize>> {
2685 let this = self.project();
2686 if !*this.done_first {
2687 match ready!(this.first.poll_read_vectored(cx, bufs)) {
2688 Ok(0) if !bufs.is_empty() => *this.done_first = true,
2689 Ok(n) => return Poll::Ready(Ok(n)),
2690 Err(err) => return Poll::Ready(Err(err)),
2691 }
2692 }
2693
2694 this.second.poll_read_vectored(cx, bufs)
2695 }
2696}
2697
2698impl<R1: AsyncBufRead, R2: AsyncBufRead> AsyncBufRead for Chain<R1, R2> {
2699 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2700 let this = self.project();
2701 if !*this.done_first {
2702 match ready!(this.first.poll_fill_buf(cx)) {
2703 Ok([]) => *this.done_first = true,
2704 Ok(buf) => return Poll::Ready(Ok(buf)),
2705 Err(err) => return Poll::Ready(Err(err)),
2706 }
2707 }
2708
2709 this.second.poll_fill_buf(cx)
2710 }
2711
2712 fn consume(self: Pin<&mut Self>, amt: usize) {
2713 let this = self.project();
2714 if !*this.done_first {
2715 this.first.consume(amt)
2716 } else {
2717 this.second.consume(amt)
2718 }
2719 }
2720}
2721
2722/// Extension trait for [`AsyncSeek`].
2723pub trait AsyncSeekExt: AsyncSeek {
2724 /// Seeks to a new position in a byte stream.
2725 ///
2726 /// Returns the new position in the byte stream.
2727 ///
2728 /// A seek beyond the end of stream is allowed, but behavior is defined by the implementation.
2729 ///
2730 /// # Examples
2731 ///
2732 /// ```
2733 /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
2734 ///
2735 /// # spin_on::spin_on(async {
2736 /// let mut cursor = Cursor::new("hello");
2737 ///
2738 /// // Move the cursor to the end.
2739 /// cursor.seek(SeekFrom::End(0)).await?;
2740 ///
2741 /// // Check the current position.
2742 /// assert_eq!(cursor.seek(SeekFrom::Current(0)).await?, 5);
2743 /// # std::io::Result::Ok(()) });
2744 /// ```
2745 fn seek(&mut self, pos: SeekFrom) -> SeekFuture<'_, Self>
2746 where
2747 Self: Unpin,
2748 {
2749 SeekFuture { seeker: self, pos }
2750 }
2751}
2752
2753impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
2754
2755/// Future for the [`AsyncSeekExt::seek()`] method.
2756#[derive(Debug)]
2757#[must_use = "futures do nothing unless you `.await` or poll them"]
2758pub struct SeekFuture<'a, S: Unpin + ?Sized> {
2759 seeker: &'a mut S,
2760 pos: SeekFrom,
2761}
2762
2763impl<S: Unpin + ?Sized> Unpin for SeekFuture<'_, S> {}
2764
2765impl<S: AsyncSeek + Unpin + ?Sized> Future for SeekFuture<'_, S> {
2766 type Output = Result<u64>;
2767
2768 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2769 let pos = self.pos;
2770 Pin::new(&mut *self.seeker).poll_seek(cx, pos)
2771 }
2772}
2773
2774/// Extension trait for [`AsyncWrite`].
2775pub trait AsyncWriteExt: AsyncWrite {
2776 /// Writes some bytes into the byte stream.
2777 ///
2778 /// Returns the number of bytes written from the start of the buffer.
2779 ///
2780 /// If the return value is `Ok(n)` then it must be guaranteed that
2781 /// `0 <= n <= buf.len()`. A return value of `0` typically means that the underlying
2782 /// object is no longer able to accept bytes and will likely not be able to in the
2783 /// future as well, or that the provided buffer is empty.
2784 ///
2785 /// # Examples
2786 ///
2787 /// ```
2788 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2789 ///
2790 /// # spin_on::spin_on(async {
2791 /// let mut output = Vec::new();
2792 /// let mut writer = BufWriter::new(&mut output);
2793 ///
2794 /// let n = writer.write(b"hello").await?;
2795 /// # std::io::Result::Ok(()) });
2796 /// ```
2797 fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteFuture<'a, Self>
2798 where
2799 Self: Unpin,
2800 {
2801 WriteFuture { writer: self, buf }
2802 }
2803
2804 /// Like [`write()`][`AsyncWriteExt::write()`], except that it writes a slice of buffers.
2805 ///
2806 /// Data is copied from each buffer in order, with the final buffer possibly being only
2807 /// partially consumed. This method must behave same as a call to
2808 /// [`write()`][`AsyncWriteExt::write()`] with the buffers concatenated would.
2809 fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectoredFuture<'a, Self>
2810 where
2811 Self: Unpin,
2812 {
2813 WriteVectoredFuture { writer: self, bufs }
2814 }
2815
2816 /// Writes an entire buffer into the byte stream.
2817 ///
2818 /// This method will keep calling [`write()`][`AsyncWriteExt::write()`] until there is no more
2819 /// data to be written or an error occurs. It will not return before the entire buffer is
2820 /// successfully written or an error occurs.
2821 ///
2822 /// # Examples
2823 ///
2824 /// ```
2825 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2826 ///
2827 /// # spin_on::spin_on(async {
2828 /// let mut output = Vec::new();
2829 /// let mut writer = BufWriter::new(&mut output);
2830 ///
2831 /// let n = writer.write_all(b"hello").await?;
2832 /// # std::io::Result::Ok(()) });
2833 /// ```
2834 fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllFuture<'a, Self>
2835 where
2836 Self: Unpin,
2837 {
2838 WriteAllFuture { writer: self, buf }
2839 }
2840
2841 /// Flushes the stream to ensure that all buffered contents reach their destination.
2842 ///
2843 /// # Examples
2844 ///
2845 /// ```
2846 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2847 ///
2848 /// # spin_on::spin_on(async {
2849 /// let mut output = Vec::new();
2850 /// let mut writer = BufWriter::new(&mut output);
2851 ///
2852 /// writer.write_all(b"hello").await?;
2853 /// writer.flush().await?;
2854 /// # std::io::Result::Ok(()) });
2855 /// ```
2856 fn flush(&mut self) -> FlushFuture<'_, Self>
2857 where
2858 Self: Unpin,
2859 {
2860 FlushFuture { writer: self }
2861 }
2862
2863 /// Closes the writer.
2864 ///
2865 /// # Examples
2866 ///
2867 /// ```
2868 /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2869 ///
2870 /// # spin_on::spin_on(async {
2871 /// let mut output = Vec::new();
2872 /// let mut writer = BufWriter::new(&mut output);
2873 ///
2874 /// writer.close().await?;
2875 /// # std::io::Result::Ok(()) });
2876 /// ```
2877 fn close(&mut self) -> CloseFuture<'_, Self>
2878 where
2879 Self: Unpin,
2880 {
2881 CloseFuture { writer: self }
2882 }
2883
2884 /// Boxes the writer and changes its type to `dyn AsyncWrite + Send + 'a`.
2885 ///
2886 /// # Examples
2887 ///
2888 /// ```
2889 /// use futures_lite::io::AsyncWriteExt;
2890 ///
2891 /// let writer = Vec::<u8>::new().boxed_writer();
2892 /// ```
2893 #[cfg(feature = "alloc")]
2894 fn boxed_writer<'a>(self) -> Pin<Box<dyn AsyncWrite + Send + 'a>>
2895 where
2896 Self: Sized + Send + 'a,
2897 {
2898 Box::pin(self)
2899 }
2900}
2901
2902impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
2903
2904/// Future for the [`AsyncWriteExt::write()`] method.
2905#[derive(Debug)]
2906#[must_use = "futures do nothing unless you `.await` or poll them"]
2907pub struct WriteFuture<'a, W: Unpin + ?Sized> {
2908 writer: &'a mut W,
2909 buf: &'a [u8],
2910}
2911
2912impl<W: Unpin + ?Sized> Unpin for WriteFuture<'_, W> {}
2913
2914impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteFuture<'_, W> {
2915 type Output = Result<usize>;
2916
2917 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2918 let buf = self.buf;
2919 Pin::new(&mut *self.writer).poll_write(cx, buf)
2920 }
2921}
2922
2923/// Future for the [`AsyncWriteExt::write_vectored()`] method.
2924#[derive(Debug)]
2925#[must_use = "futures do nothing unless you `.await` or poll them"]
2926pub struct WriteVectoredFuture<'a, W: Unpin + ?Sized> {
2927 writer: &'a mut W,
2928 bufs: &'a [IoSlice<'a>],
2929}
2930
2931impl<W: Unpin + ?Sized> Unpin for WriteVectoredFuture<'_, W> {}
2932
2933impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteVectoredFuture<'_, W> {
2934 type Output = Result<usize>;
2935
2936 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2937 let bufs = self.bufs;
2938 Pin::new(&mut *self.writer).poll_write_vectored(cx, bufs)
2939 }
2940}
2941
2942/// Future for the [`AsyncWriteExt::write_all()`] method.
2943#[derive(Debug)]
2944#[must_use = "futures do nothing unless you `.await` or poll them"]
2945pub struct WriteAllFuture<'a, W: Unpin + ?Sized> {
2946 writer: &'a mut W,
2947 buf: &'a [u8],
2948}
2949
2950impl<W: Unpin + ?Sized> Unpin for WriteAllFuture<'_, W> {}
2951
2952impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, W> {
2953 type Output = Result<()>;
2954
2955 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2956 let Self { writer, buf } = &mut *self;
2957
2958 while !buf.is_empty() {
2959 let n = ready!(Pin::new(&mut **writer).poll_write(cx, buf))?;
2960 let (_, rest) = mem::take(buf).split_at(n);
2961 *buf = rest;
2962
2963 if n == 0 {
2964 return Poll::Ready(Err(ErrorKind::WriteZero.into()));
2965 }
2966 }
2967
2968 Poll::Ready(Ok(()))
2969 }
2970}
2971
2972/// Future for the [`AsyncWriteExt::flush()`] method.
2973#[derive(Debug)]
2974#[must_use = "futures do nothing unless you `.await` or poll them"]
2975pub struct FlushFuture<'a, W: Unpin + ?Sized> {
2976 writer: &'a mut W,
2977}
2978
2979impl<W: Unpin + ?Sized> Unpin for FlushFuture<'_, W> {}
2980
2981impl<W: AsyncWrite + Unpin + ?Sized> Future for FlushFuture<'_, W> {
2982 type Output = Result<()>;
2983
2984 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2985 Pin::new(&mut *self.writer).poll_flush(cx)
2986 }
2987}
2988
2989/// Future for the [`AsyncWriteExt::close()`] method.
2990#[derive(Debug)]
2991#[must_use = "futures do nothing unless you `.await` or poll them"]
2992pub struct CloseFuture<'a, W: Unpin + ?Sized> {
2993 writer: &'a mut W,
2994}
2995
2996impl<W: Unpin + ?Sized> Unpin for CloseFuture<'_, W> {}
2997
2998impl<W: AsyncWrite + Unpin + ?Sized> Future for CloseFuture<'_, W> {
2999 type Output = Result<()>;
3000
3001 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3002 Pin::new(&mut *self.writer).poll_close(cx)
3003 }
3004}
3005
3006/// Type alias for `Pin<Box<dyn AsyncRead + Send + 'static>>`.
3007///
3008/// # Examples
3009///
3010/// ```
3011/// use futures_lite::io::AsyncReadExt;
3012///
3013/// let reader = [1, 2, 3].boxed_reader();
3014/// ```
3015#[cfg(feature = "alloc")]
3016pub type BoxedReader = Pin<Box<dyn AsyncRead + Send + 'static>>;
3017
3018/// Type alias for `Pin<Box<dyn AsyncWrite + Send + 'static>>`.
3019///
3020/// # Examples
3021///
3022/// ```
3023/// use futures_lite::io::AsyncWriteExt;
3024///
3025/// let writer = Vec::<u8>::new().boxed_writer();
3026/// ```
3027#[cfg(feature = "alloc")]
3028pub type BoxedWriter = Pin<Box<dyn AsyncWrite + Send + 'static>>;
3029
3030/// Splits a stream into [`AsyncRead`] and [`AsyncWrite`] halves.
3031///
3032/// # Examples
3033///
3034/// ```
3035/// use futures_lite::io::{self, Cursor};
3036///
3037/// # spin_on::spin_on(async {
3038/// let stream = Cursor::new(vec![]);
3039/// let (mut reader, mut writer) = io::split(stream);
3040/// # std::io::Result::Ok(()) });
3041/// ```
3042pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
3043where
3044 T: AsyncRead + AsyncWrite + Unpin,
3045{
3046 let inner = Arc::new(Mutex::new(stream));
3047 (ReadHalf(inner.clone()), WriteHalf(inner))
3048}
3049
3050/// The read half returned by [`split()`].
3051#[derive(Debug)]
3052pub struct ReadHalf<T>(Arc<Mutex<T>>);
3053
3054/// The write half returned by [`split()`].
3055#[derive(Debug)]
3056pub struct WriteHalf<T>(Arc<Mutex<T>>);
3057
3058impl<T: AsyncRead + Unpin> AsyncRead for ReadHalf<T> {
3059 fn poll_read(
3060 self: Pin<&mut Self>,
3061 cx: &mut Context<'_>,
3062 buf: &mut [u8],
3063 ) -> Poll<Result<usize>> {
3064 let mut inner = self.0.lock().unwrap();
3065 Pin::new(&mut *inner).poll_read(cx, buf)
3066 }
3067
3068 fn poll_read_vectored(
3069 self: Pin<&mut Self>,
3070 cx: &mut Context<'_>,
3071 bufs: &mut [IoSliceMut<'_>],
3072 ) -> Poll<Result<usize>> {
3073 let mut inner = self.0.lock().unwrap();
3074 Pin::new(&mut *inner).poll_read_vectored(cx, bufs)
3075 }
3076}
3077
3078impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> {
3079 fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
3080 let mut inner = self.0.lock().unwrap();
3081 Pin::new(&mut *inner).poll_write(cx, buf)
3082 }
3083
3084 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
3085 let mut inner = self.0.lock().unwrap();
3086 Pin::new(&mut *inner).poll_flush(cx)
3087 }
3088
3089 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
3090 let mut inner = self.0.lock().unwrap();
3091 Pin::new(&mut *inner).poll_close(cx)
3092 }
3093}
3094
3095#[cfg(feature = "memchr")]
3096use memchr::memchr;
3097
3098/// Unoptimized memchr fallback.
3099#[cfg(not(feature = "memchr"))]
3100fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
3101 haystack.iter().position(|&b| b == needle)
3102}