futures_lite/
io.rs

1//! Tools and combinators for I/O.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::io::{self, AsyncReadExt};
7//!
8//! # spin_on::spin_on(async {
9//! let input: &[u8] = b"hello";
10//! let mut reader = io::BufReader::new(input);
11//!
12//! let mut contents = String::new();
13//! reader.read_to_string(&mut contents).await?;
14//! # std::io::Result::Ok(()) });
15//! ```
16
17#[doc(no_inline)]
18pub use std::io::{Error, ErrorKind, Result, SeekFrom};
19
20#[doc(no_inline)]
21pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
22
23use std::borrow::{Borrow, BorrowMut};
24use std::boxed::Box;
25use std::cmp;
26use std::fmt;
27use std::future::Future;
28use std::io::{IoSlice, IoSliceMut};
29use std::mem;
30use std::pin::Pin;
31use std::string::String;
32use std::sync::{Arc, Mutex};
33use std::task::{Context, Poll};
34use std::vec;
35use std::vec::Vec;
36
37use futures_core::stream::Stream;
38use pin_project_lite::pin_project;
39
40use crate::future;
41use crate::ready;
42
43const DEFAULT_BUF_SIZE: usize = 8 * 1024;
44
45/// Copies the entire contents of a reader into a writer.
46///
47/// This function will read data from `reader` and write it into `writer` in a streaming fashion
48/// until `reader` returns EOF.
49///
50/// On success, returns the total number of bytes copied.
51///
52/// # Examples
53///
54/// ```
55/// use futures_lite::io::{self, BufReader, BufWriter};
56///
57/// # spin_on::spin_on(async {
58/// let input: &[u8] = b"hello";
59/// let reader = BufReader::new(input);
60///
61/// let mut output = Vec::new();
62/// let writer = BufWriter::new(&mut output);
63///
64/// io::copy(reader, writer).await?;
65/// # std::io::Result::Ok(()) });
66/// ```
67pub async fn copy<R, W>(reader: R, writer: W) -> Result<u64>
68where
69    R: AsyncRead,
70    W: AsyncWrite,
71{
72    pin_project! {
73        struct CopyFuture<R, W> {
74            #[pin]
75            reader: R,
76            #[pin]
77            writer: W,
78            amt: u64,
79        }
80    }
81
82    impl<R, W> Future for CopyFuture<R, W>
83    where
84        R: AsyncBufRead,
85        W: AsyncWrite,
86    {
87        type Output = Result<u64>;
88
89        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
90            let mut this = self.project();
91            loop {
92                let buffer = ready!(this.reader.as_mut().poll_fill_buf(cx))?;
93                if buffer.is_empty() {
94                    ready!(this.writer.as_mut().poll_flush(cx))?;
95                    return Poll::Ready(Ok(*this.amt));
96                }
97
98                let i = ready!(this.writer.as_mut().poll_write(cx, buffer))?;
99                if i == 0 {
100                    return Poll::Ready(Err(ErrorKind::WriteZero.into()));
101                }
102                *this.amt += i as u64;
103                this.reader.as_mut().consume(i);
104            }
105        }
106    }
107
108    let future = CopyFuture {
109        reader: BufReader::new(reader),
110        writer,
111        amt: 0,
112    };
113    future.await
114}
115
116/// Asserts that a type implementing [`std::io`] traits can be used as an async type.
117///
118/// The underlying I/O handle should never block nor return the [`ErrorKind::WouldBlock`] error.
119/// This is usually the case for in-memory buffered I/O.
120///
121/// # Examples
122///
123/// ```
124/// use futures_lite::io::{AssertAsync, AsyncReadExt};
125///
126/// let reader: &[u8] = b"hello";
127///
128/// # spin_on::spin_on(async {
129/// let mut async_reader = AssertAsync::new(reader);
130/// let mut contents = String::new();
131///
132/// // This line works in async manner - note that there is await:
133/// async_reader.read_to_string(&mut contents).await?;
134/// # std::io::Result::Ok(()) });
135/// ```
136#[derive(Debug, Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
137pub struct AssertAsync<T>(T);
138
139impl<T> Unpin for AssertAsync<T> {}
140
141impl<T> AssertAsync<T> {
142    /// Wraps an I/O handle implementing [`std::io`] traits.
143    ///
144    /// # Examples
145    ///
146    /// ```
147    /// use futures_lite::io::AssertAsync;
148    ///
149    /// let reader: &[u8] = b"hello";
150    ///
151    /// let async_reader = AssertAsync::new(reader);
152    /// ```
153    #[inline(always)]
154    pub fn new(io: T) -> Self {
155        AssertAsync(io)
156    }
157
158    /// Gets a reference to the inner I/O handle.
159    ///
160    /// # Examples
161    ///
162    /// ```
163    /// use futures_lite::io::AssertAsync;
164    ///
165    /// let reader: &[u8] = b"hello";
166    ///
167    /// let async_reader = AssertAsync::new(reader);
168    /// let r = async_reader.get_ref();
169    /// ```
170    #[inline(always)]
171    pub fn get_ref(&self) -> &T {
172        &self.0
173    }
174
175    /// Gets a mutable reference to the inner I/O handle.
176    ///
177    /// # Examples
178    ///
179    /// ```
180    /// use futures_lite::io::AssertAsync;
181    ///
182    /// let reader: &[u8] = b"hello";
183    ///
184    /// let mut async_reader = AssertAsync::new(reader);
185    /// let r = async_reader.get_mut();
186    /// ```
187    #[inline(always)]
188    pub fn get_mut(&mut self) -> &mut T {
189        &mut self.0
190    }
191
192    /// Extracts the inner I/O handle.
193    ///
194    /// # Examples
195    ///
196    /// ```
197    /// use futures_lite::io::AssertAsync;
198    ///
199    /// let reader: &[u8] = b"hello";
200    ///
201    /// let async_reader = AssertAsync::new(reader);
202    /// let inner = async_reader.into_inner();
203    /// ```
204    #[inline(always)]
205    pub fn into_inner(self) -> T {
206        self.0
207    }
208}
209
210fn assert_async_wrapio<F, T>(mut f: F) -> Poll<std::io::Result<T>>
211where
212    F: FnMut() -> std::io::Result<T>,
213{
214    loop {
215        match f() {
216            Err(err) if err.kind() == ErrorKind::Interrupted => {}
217            res => return Poll::Ready(res),
218        }
219    }
220}
221
222impl<T: std::io::Read> AsyncRead for AssertAsync<T> {
223    #[inline]
224    fn poll_read(
225        mut self: Pin<&mut Self>,
226        _: &mut Context<'_>,
227        buf: &mut [u8],
228    ) -> Poll<Result<usize>> {
229        assert_async_wrapio(move || self.0.read(buf))
230    }
231
232    #[inline]
233    fn poll_read_vectored(
234        mut self: Pin<&mut Self>,
235        _: &mut Context<'_>,
236        bufs: &mut [IoSliceMut<'_>],
237    ) -> Poll<Result<usize>> {
238        assert_async_wrapio(move || self.0.read_vectored(bufs))
239    }
240}
241
242impl<T: std::io::Write> AsyncWrite for AssertAsync<T> {
243    #[inline]
244    fn poll_write(
245        mut self: Pin<&mut Self>,
246        _: &mut Context<'_>,
247        buf: &[u8],
248    ) -> Poll<Result<usize>> {
249        assert_async_wrapio(move || self.0.write(buf))
250    }
251
252    #[inline]
253    fn poll_write_vectored(
254        mut self: Pin<&mut Self>,
255        _: &mut Context<'_>,
256        bufs: &[IoSlice<'_>],
257    ) -> Poll<Result<usize>> {
258        assert_async_wrapio(move || self.0.write_vectored(bufs))
259    }
260
261    #[inline]
262    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
263        assert_async_wrapio(move || self.0.flush())
264    }
265
266    #[inline]
267    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
268        self.poll_flush(cx)
269    }
270}
271
272impl<T: std::io::Seek> AsyncSeek for AssertAsync<T> {
273    #[inline]
274    fn poll_seek(
275        mut self: Pin<&mut Self>,
276        _: &mut Context<'_>,
277        pos: SeekFrom,
278    ) -> Poll<Result<u64>> {
279        assert_async_wrapio(move || self.0.seek(pos))
280    }
281}
282
283/// A wrapper around a type that implements `AsyncRead` or `AsyncWrite` that converts `Pending`
284/// polls to `WouldBlock` errors.
285///
286/// This wrapper can be used as a compatibility layer between `AsyncRead` and `Read`, for types
287/// that take `Read` as a parameter.
288///
289/// # Examples
290///
291/// ```
292/// use std::io::Read;
293/// use std::task::{Poll, Context};
294///
295/// fn poll_for_io(cx: &mut Context<'_>) -> Poll<usize> {
296///     // Assume we have a library that's built around `Read` and `Write` traits.
297///     use cooltls::Session;
298///
299///     // We want to use it with our writer that implements `AsyncWrite`.
300///     let writer = Stream::new();
301///
302///     // First, we wrap our `Writer` with `AsyncAsSync` to convert `Pending` polls to `WouldBlock`.
303///     use futures_lite::io::AsyncAsSync;
304///     let writer = AsyncAsSync::new(cx, writer);
305///
306///     // Now, we can use it with `cooltls`.
307///     let mut session = Session::new(writer);
308///
309///     // Match on the result of `read()` and translate it to poll.
310///     match session.read(&mut [0; 1024]) {
311///         Ok(n) => Poll::Ready(n),
312///         Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => Poll::Pending,
313///         Err(err) => panic!("unexpected error: {}", err),
314///     }
315/// }
316///
317/// // Usually, poll-based functions are best wrapped using `poll_fn`.
318/// use futures_lite::future::poll_fn;
319/// # futures_lite::future::block_on(async {
320/// poll_fn(|cx| poll_for_io(cx)).await;
321/// # });
322/// # struct Stream;
323/// # impl Stream {
324/// #     fn new() -> Stream {
325/// #         Stream
326/// #     }
327/// # }
328/// # impl futures_lite::io::AsyncRead for Stream {
329/// #     fn poll_read(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<std::io::Result<usize>> {
330/// #         Poll::Ready(Ok(0))
331/// #     }
332/// # }
333/// # mod cooltls {
334/// #     pub struct Session<W> {
335/// #         reader: W,
336/// #     }
337/// #     impl<W> Session<W> {
338/// #         pub fn new(reader: W) -> Session<W> {
339/// #             Session { reader }
340/// #         }
341/// #     }
342/// #     impl<W: std::io::Read> std::io::Read for Session<W> {
343/// #         fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
344/// #             self.reader.read(buf)
345/// #         }
346/// #     }
347/// # }
348/// ```
349#[derive(Debug)]
350pub struct AsyncAsSync<'r, 'ctx, T> {
351    /// The context we are using to poll the future.
352    pub context: &'r mut Context<'ctx>,
353
354    /// The actual reader/writer we are wrapping.
355    pub inner: T,
356}
357
358impl<'r, 'ctx, T> AsyncAsSync<'r, 'ctx, T> {
359    /// Wraps an I/O handle implementing [`AsyncRead`] or [`AsyncWrite`] traits.
360    ///
361    /// # Examples
362    ///
363    /// ```
364    /// use futures_lite::io::AsyncAsSync;
365    /// use std::task::Context;
366    /// use waker_fn::waker_fn;
367    ///
368    /// let reader: &[u8] = b"hello";
369    /// let waker = waker_fn(|| {});
370    /// let mut context = Context::from_waker(&waker);
371    ///
372    /// let async_reader = AsyncAsSync::new(&mut context, reader);
373    /// ```
374    #[inline]
375    pub fn new(context: &'r mut Context<'ctx>, inner: T) -> Self {
376        AsyncAsSync { context, inner }
377    }
378
379    /// Attempt to shutdown the I/O handle.
380    ///
381    /// # Examples
382    ///
383    /// ```
384    /// use futures_lite::io::AsyncAsSync;
385    /// use std::task::Context;
386    /// use waker_fn::waker_fn;
387    ///
388    /// let reader: Vec<u8> = b"hello".to_vec();
389    /// let waker = waker_fn(|| {});
390    /// let mut context = Context::from_waker(&waker);
391    ///
392    /// let mut async_reader = AsyncAsSync::new(&mut context, reader);
393    /// async_reader.close().unwrap();
394    /// ```
395    #[inline]
396    pub fn close(&mut self) -> Result<()>
397    where
398        T: AsyncWrite + Unpin,
399    {
400        self.poll_with(|io, cx| io.poll_close(cx))
401    }
402
403    /// Poll this `AsyncAsSync` for some function.
404    ///
405    /// # Examples
406    ///
407    /// ```
408    /// use futures_lite::io::{AsyncAsSync, AsyncRead};
409    /// use std::task::Context;
410    /// use waker_fn::waker_fn;
411    ///
412    /// let reader: &[u8] = b"hello";
413    /// let waker = waker_fn(|| {});
414    /// let mut context = Context::from_waker(&waker);
415    ///
416    /// let mut async_reader = AsyncAsSync::new(&mut context, reader);
417    /// let r = async_reader.poll_with(|io, cx| io.poll_read(cx, &mut [0; 1024]));
418    /// assert_eq!(r.unwrap(), 5);
419    /// ```
420    #[inline]
421    pub fn poll_with<R>(
422        &mut self,
423        f: impl FnOnce(Pin<&mut T>, &mut Context<'_>) -> Poll<Result<R>>,
424    ) -> Result<R>
425    where
426        T: Unpin,
427    {
428        match f(Pin::new(&mut self.inner), self.context) {
429            Poll::Ready(res) => res,
430            Poll::Pending => Err(ErrorKind::WouldBlock.into()),
431        }
432    }
433}
434
435impl<T: AsyncRead + Unpin> std::io::Read for AsyncAsSync<'_, '_, T> {
436    #[inline]
437    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
438        self.poll_with(|io, cx| io.poll_read(cx, buf))
439    }
440
441    #[inline]
442    fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize> {
443        self.poll_with(|io, cx| io.poll_read_vectored(cx, bufs))
444    }
445}
446
447impl<T: AsyncWrite + Unpin> std::io::Write for AsyncAsSync<'_, '_, T> {
448    #[inline]
449    fn write(&mut self, buf: &[u8]) -> Result<usize> {
450        self.poll_with(|io, cx| io.poll_write(cx, buf))
451    }
452
453    #[inline]
454    fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize> {
455        self.poll_with(|io, cx| io.poll_write_vectored(cx, bufs))
456    }
457
458    #[inline]
459    fn flush(&mut self) -> Result<()> {
460        self.poll_with(|io, cx| io.poll_flush(cx))
461    }
462}
463
464impl<T: AsyncSeek + Unpin> std::io::Seek for AsyncAsSync<'_, '_, T> {
465    #[inline]
466    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
467        self.poll_with(|io, cx| io.poll_seek(cx, pos))
468    }
469}
470
471impl<T> AsRef<T> for AsyncAsSync<'_, '_, T> {
472    #[inline]
473    fn as_ref(&self) -> &T {
474        &self.inner
475    }
476}
477
478impl<T> AsMut<T> for AsyncAsSync<'_, '_, T> {
479    #[inline]
480    fn as_mut(&mut self) -> &mut T {
481        &mut self.inner
482    }
483}
484
485impl<T> Borrow<T> for AsyncAsSync<'_, '_, T> {
486    #[inline]
487    fn borrow(&self) -> &T {
488        &self.inner
489    }
490}
491
492impl<T> BorrowMut<T> for AsyncAsSync<'_, '_, T> {
493    #[inline]
494    fn borrow_mut(&mut self) -> &mut T {
495        &mut self.inner
496    }
497}
498
499/// Blocks on all async I/O operations and implements [`std::io`] traits.
500///
501/// Sometimes async I/O needs to be used in a blocking manner. If calling [`future::block_on()`]
502/// manually all the time becomes too tedious, use this type for more convenient blocking on async
503/// I/O operations.
504///
505/// This type implements traits [`Read`][`std::io::Read`], [`Write`][`std::io::Write`], or
506/// [`Seek`][`std::io::Seek`] if the inner type implements [`AsyncRead`], [`AsyncWrite`], or
507/// [`AsyncSeek`], respectively.
508///
509/// If writing data through the [`Write`][`std::io::Write`] trait, make sure to flush before
510/// dropping the [`BlockOn`] handle or some buffered data might get lost.
511///
512/// # Examples
513///
514/// ```
515/// use futures_lite::io::BlockOn;
516/// use futures_lite::pin;
517/// use std::io::Read;
518///
519/// let reader: &[u8] = b"hello";
520/// pin!(reader);
521///
522/// let mut blocking_reader = BlockOn::new(reader);
523/// let mut contents = String::new();
524///
525/// // This line blocks - note that there is no await:
526/// blocking_reader.read_to_string(&mut contents)?;
527/// # std::io::Result::Ok(())
528/// ```
529#[derive(Debug)]
530pub struct BlockOn<T>(T);
531
532impl<T> BlockOn<T> {
533    /// Wraps an async I/O handle into a blocking interface.
534    ///
535    /// # Examples
536    ///
537    /// ```
538    /// use futures_lite::io::BlockOn;
539    /// use futures_lite::pin;
540    ///
541    /// let reader: &[u8] = b"hello";
542    /// pin!(reader);
543    ///
544    /// let blocking_reader = BlockOn::new(reader);
545    /// ```
546    pub fn new(io: T) -> BlockOn<T> {
547        BlockOn(io)
548    }
549
550    /// Gets a reference to the async I/O handle.
551    ///
552    /// # Examples
553    ///
554    /// ```
555    /// use futures_lite::io::BlockOn;
556    /// use futures_lite::pin;
557    ///
558    /// let reader: &[u8] = b"hello";
559    /// pin!(reader);
560    ///
561    /// let blocking_reader = BlockOn::new(reader);
562    /// let r = blocking_reader.get_ref();
563    /// ```
564    pub fn get_ref(&self) -> &T {
565        &self.0
566    }
567
568    /// Gets a mutable reference to the async I/O handle.
569    ///
570    /// # Examples
571    ///
572    /// ```
573    /// use futures_lite::io::BlockOn;
574    /// use futures_lite::pin;
575    ///
576    /// let reader: &[u8] = b"hello";
577    /// pin!(reader);
578    ///
579    /// let mut blocking_reader = BlockOn::new(reader);
580    /// let r = blocking_reader.get_mut();
581    /// ```
582    pub fn get_mut(&mut self) -> &mut T {
583        &mut self.0
584    }
585
586    /// Extracts the inner async I/O handle.
587    ///
588    /// # Examples
589    ///
590    /// ```
591    /// use futures_lite::io::BlockOn;
592    /// use futures_lite::pin;
593    ///
594    /// let reader: &[u8] = b"hello";
595    /// pin!(reader);
596    ///
597    /// let blocking_reader = BlockOn::new(reader);
598    /// let inner = blocking_reader.into_inner();
599    /// ```
600    pub fn into_inner(self) -> T {
601        self.0
602    }
603}
604
605impl<T: AsyncRead + Unpin> std::io::Read for BlockOn<T> {
606    fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
607        future::block_on(self.0.read(buf))
608    }
609}
610
611impl<T: AsyncBufRead + Unpin> std::io::BufRead for BlockOn<T> {
612    fn fill_buf(&mut self) -> Result<&[u8]> {
613        future::block_on(self.0.fill_buf())
614    }
615
616    fn consume(&mut self, amt: usize) {
617        Pin::new(&mut self.0).consume(amt)
618    }
619}
620
621impl<T: AsyncWrite + Unpin> std::io::Write for BlockOn<T> {
622    fn write(&mut self, buf: &[u8]) -> Result<usize> {
623        future::block_on(self.0.write(buf))
624    }
625
626    fn flush(&mut self) -> Result<()> {
627        future::block_on(self.0.flush())
628    }
629}
630
631impl<T: AsyncSeek + Unpin> std::io::Seek for BlockOn<T> {
632    fn seek(&mut self, pos: SeekFrom) -> Result<u64> {
633        future::block_on(self.0.seek(pos))
634    }
635}
636
637pin_project! {
638    /// Adds buffering to a reader.
639    ///
640    /// It can be excessively inefficient to work directly with an [`AsyncRead`] instance. A
641    /// [`BufReader`] performs large, infrequent reads on the underlying [`AsyncRead`] and
642    /// maintains an in-memory buffer of the incoming byte stream.
643    ///
644    /// [`BufReader`] can improve the speed of programs that make *small* and *repeated* reads to
645    /// the same file or networking socket. It does not help when reading very large amounts at
646    /// once, or reading just once or a few times. It also provides no advantage when reading from
647    /// a source that is already in memory, like a `Vec<u8>`.
648    ///
649    /// When a [`BufReader`] is dropped, the contents of its buffer are discarded. Creating
650    /// multiple instances of [`BufReader`] on the same reader can cause data loss.
651    ///
652    /// # Examples
653    ///
654    /// ```
655    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
656    ///
657    /// # spin_on::spin_on(async {
658    /// let input: &[u8] = b"hello";
659    /// let mut reader = BufReader::new(input);
660    ///
661    /// let mut line = String::new();
662    /// reader.read_line(&mut line).await?;
663    /// # std::io::Result::Ok(()) });
664    /// ```
665    pub struct BufReader<R> {
666        #[pin]
667        inner: R,
668        buf: Box<[u8]>,
669        pos: usize,
670        cap: usize,
671    }
672}
673
674impl<R: AsyncRead> BufReader<R> {
675    /// Creates a buffered reader with the default buffer capacity.
676    ///
677    /// The default capacity is currently 8 KB, but that may change in the future.
678    ///
679    /// # Examples
680    ///
681    /// ```
682    /// use futures_lite::io::BufReader;
683    ///
684    /// let input: &[u8] = b"hello";
685    /// let reader = BufReader::new(input);
686    /// ```
687    pub fn new(inner: R) -> BufReader<R> {
688        BufReader::with_capacity(DEFAULT_BUF_SIZE, inner)
689    }
690
691    /// Creates a buffered reader with the specified capacity.
692    ///
693    /// # Examples
694    ///
695    /// ```
696    /// use futures_lite::io::BufReader;
697    ///
698    /// let input: &[u8] = b"hello";
699    /// let reader = BufReader::with_capacity(1024, input);
700    /// ```
701    pub fn with_capacity(capacity: usize, inner: R) -> BufReader<R> {
702        BufReader {
703            inner,
704            buf: vec![0; capacity].into_boxed_slice(),
705            pos: 0,
706            cap: 0,
707        }
708    }
709}
710
711impl<R> BufReader<R> {
712    /// Gets a reference to the underlying reader.
713    ///
714    /// It is not advisable to directly read from the underlying reader.
715    ///
716    /// # Examples
717    ///
718    /// ```
719    /// use futures_lite::io::BufReader;
720    ///
721    /// let input: &[u8] = b"hello";
722    /// let reader = BufReader::new(input);
723    ///
724    /// let r = reader.get_ref();
725    /// ```
726    pub fn get_ref(&self) -> &R {
727        &self.inner
728    }
729
730    /// Gets a mutable reference to the underlying reader.
731    ///
732    /// It is not advisable to directly read from the underlying reader.
733    ///
734    /// # Examples
735    ///
736    /// ```
737    /// use futures_lite::io::BufReader;
738    ///
739    /// let input: &[u8] = b"hello";
740    /// let mut reader = BufReader::new(input);
741    ///
742    /// let r = reader.get_mut();
743    /// ```
744    pub fn get_mut(&mut self) -> &mut R {
745        &mut self.inner
746    }
747
748    /// Gets a pinned mutable reference to the underlying reader.
749    ///
750    /// It is not advisable to directly read from the underlying reader.
751    fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut R> {
752        self.project().inner
753    }
754
755    /// Returns a reference to the internal buffer.
756    ///
757    /// This method will not attempt to fill the buffer if it is empty.
758    ///
759    /// # Examples
760    ///
761    /// ```
762    /// use futures_lite::io::BufReader;
763    ///
764    /// let input: &[u8] = b"hello";
765    /// let reader = BufReader::new(input);
766    ///
767    /// // The internal buffer is empty until the first read request.
768    /// assert_eq!(reader.buffer(), &[]);
769    /// ```
770    pub fn buffer(&self) -> &[u8] {
771        &self.buf[self.pos..self.cap]
772    }
773
774    /// Unwraps the buffered reader, returning the underlying reader.
775    ///
776    /// Note that any leftover data in the internal buffer will be lost.
777    ///
778    /// # Examples
779    ///
780    /// ```
781    /// use futures_lite::io::BufReader;
782    ///
783    /// let input: &[u8] = b"hello";
784    /// let reader = BufReader::new(input);
785    ///
786    /// assert_eq!(reader.into_inner(), input);
787    /// ```
788    pub fn into_inner(self) -> R {
789        self.inner
790    }
791
792    /// Invalidates all data in the internal buffer.
793    #[inline]
794    fn discard_buffer(self: Pin<&mut Self>) {
795        let this = self.project();
796        *this.pos = 0;
797        *this.cap = 0;
798    }
799}
800
801impl<R: AsyncRead> AsyncRead for BufReader<R> {
802    fn poll_read(
803        mut self: Pin<&mut Self>,
804        cx: &mut Context<'_>,
805        buf: &mut [u8],
806    ) -> Poll<Result<usize>> {
807        // If we don't have any buffered data and we're doing a massive read
808        // (larger than our internal buffer), bypass our internal buffer
809        // entirely.
810        if self.pos == self.cap && buf.len() >= self.buf.len() {
811            let res = ready!(self.as_mut().get_pin_mut().poll_read(cx, buf));
812            self.discard_buffer();
813            return Poll::Ready(res);
814        }
815        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
816        let nread = std::io::Read::read(&mut rem, buf)?;
817        self.consume(nread);
818        Poll::Ready(Ok(nread))
819    }
820
821    fn poll_read_vectored(
822        mut self: Pin<&mut Self>,
823        cx: &mut Context<'_>,
824        bufs: &mut [IoSliceMut<'_>],
825    ) -> Poll<Result<usize>> {
826        let total_len = bufs.iter().map(|b| b.len()).sum::<usize>();
827        if self.pos == self.cap && total_len >= self.buf.len() {
828            let res = ready!(self.as_mut().get_pin_mut().poll_read_vectored(cx, bufs));
829            self.discard_buffer();
830            return Poll::Ready(res);
831        }
832        let mut rem = ready!(self.as_mut().poll_fill_buf(cx))?;
833        let nread = std::io::Read::read_vectored(&mut rem, bufs)?;
834        self.consume(nread);
835        Poll::Ready(Ok(nread))
836    }
837}
838
839impl<R: AsyncRead> AsyncBufRead for BufReader<R> {
840    fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, cx: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
841        let mut this = self.project();
842
843        // If we've reached the end of our internal buffer then we need to fetch
844        // some more data from the underlying reader.
845        // Branch using `>=` instead of the more correct `==`
846        // to tell the compiler that the pos..cap slice is always valid.
847        if *this.pos >= *this.cap {
848            debug_assert!(*this.pos == *this.cap);
849            *this.cap = ready!(this.inner.as_mut().poll_read(cx, this.buf))?;
850            *this.pos = 0;
851        }
852        Poll::Ready(Ok(&this.buf[*this.pos..*this.cap]))
853    }
854
855    fn consume(self: Pin<&mut Self>, amt: usize) {
856        let this = self.project();
857        *this.pos = cmp::min(*this.pos + amt, *this.cap);
858    }
859}
860
861impl<R: fmt::Debug> fmt::Debug for BufReader<R> {
862    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
863        f.debug_struct("BufReader")
864            .field("reader", &self.inner)
865            .field(
866                "buffer",
867                &format_args!("{}/{}", self.cap - self.pos, self.buf.len()),
868            )
869            .finish()
870    }
871}
872
873impl<R: AsyncSeek> AsyncSeek for BufReader<R> {
874    /// Seeks to an offset, in bytes, in the underlying reader.
875    ///
876    /// The position used for seeking with [`SeekFrom::Current`] is the position the underlying
877    /// reader would be at if the [`BufReader`] had no internal buffer.
878    ///
879    /// Seeking always discards the internal buffer, even if the seek position would otherwise fall
880    /// within it. This guarantees that calling [`into_inner()`][`BufReader::into_inner()`]
881    /// immediately after a seek yields the underlying reader at the same position.
882    ///
883    /// See [`AsyncSeek`] for more details.
884    ///
885    /// Note: In the edge case where you're seeking with `SeekFrom::Current(n)` where `n` minus the
886    /// internal buffer length overflows an `i64`, two seeks will be performed instead of one. If
887    /// the second seek returns `Err`, the underlying reader will be left at the same position it
888    /// would have if you called [`seek()`][`AsyncSeekExt::seek()`] with `SeekFrom::Current(0)`.
889    fn poll_seek(
890        mut self: Pin<&mut Self>,
891        cx: &mut Context<'_>,
892        pos: SeekFrom,
893    ) -> Poll<Result<u64>> {
894        let result: u64;
895        if let SeekFrom::Current(n) = pos {
896            let remainder = (self.cap - self.pos) as i64;
897            // it should be safe to assume that remainder fits within an i64 as the alternative
898            // means we managed to allocate 8 exbibytes and that's absurd.
899            // But it's not out of the realm of possibility for some weird underlying reader to
900            // support seeking by i64::min_value() so we need to handle underflow when subtracting
901            // remainder.
902            if let Some(offset) = n.checked_sub(remainder) {
903                result = ready!(self
904                    .as_mut()
905                    .get_pin_mut()
906                    .poll_seek(cx, SeekFrom::Current(offset)))?;
907            } else {
908                // seek backwards by our remainder, and then by the offset
909                ready!(self
910                    .as_mut()
911                    .get_pin_mut()
912                    .poll_seek(cx, SeekFrom::Current(-remainder)))?;
913                self.as_mut().discard_buffer();
914                result = ready!(self
915                    .as_mut()
916                    .get_pin_mut()
917                    .poll_seek(cx, SeekFrom::Current(n)))?;
918            }
919        } else {
920            // Seeking with Start/End doesn't care about our buffer length.
921            result = ready!(self.as_mut().get_pin_mut().poll_seek(cx, pos))?;
922        }
923        self.discard_buffer();
924        Poll::Ready(Ok(result))
925    }
926}
927
928impl<R: AsyncWrite> AsyncWrite for BufReader<R> {
929    fn poll_write(
930        mut self: Pin<&mut Self>,
931        cx: &mut Context<'_>,
932        buf: &[u8],
933    ) -> Poll<Result<usize>> {
934        self.as_mut().get_pin_mut().poll_write(cx, buf)
935    }
936
937    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
938        self.as_mut().get_pin_mut().poll_flush(cx)
939    }
940
941    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
942        self.as_mut().get_pin_mut().poll_close(cx)
943    }
944}
945
946pin_project! {
947    /// Adds buffering to a writer.
948    ///
949    /// It can be excessively inefficient to work directly with something that implements
950    /// [`AsyncWrite`]. For example, every call to [`write()`][`AsyncWriteExt::write()`] on a TCP
951    /// stream results in a system call. A [`BufWriter`] keeps an in-memory buffer of data and
952    /// writes it to the underlying writer in large, infrequent batches.
953    ///
954    /// [`BufWriter`] can improve the speed of programs that make *small* and *repeated* writes to
955    /// the same file or networking socket. It does not help when writing very large amounts at
956    /// once, or writing just once or a few times. It also provides no advantage when writing to a
957    /// destination that is in memory, like a `Vec<u8>`.
958    ///
959    /// Unlike [`std::io::BufWriter`], this type does not write out the contents of its buffer when
960    /// it is dropped. Therefore, it is important that users explicitly flush the buffer before
961    /// dropping the [`BufWriter`].
962    ///
963    /// # Examples
964    ///
965    /// ```
966    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
967    ///
968    /// # spin_on::spin_on(async {
969    /// let mut output = Vec::new();
970    /// let mut writer = BufWriter::new(&mut output);
971    ///
972    /// writer.write_all(b"hello").await?;
973    /// writer.flush().await?;
974    /// # std::io::Result::Ok(()) });
975    /// ```
976    pub struct BufWriter<W> {
977        #[pin]
978        inner: W,
979        buf: Vec<u8>,
980        written: usize,
981    }
982}
983
984impl<W: AsyncWrite> BufWriter<W> {
985    /// Creates a buffered writer with the default buffer capacity.
986    ///
987    /// The default capacity is currently 8 KB, but that may change in the future.
988    ///
989    /// # Examples
990    ///
991    /// ```
992    /// use futures_lite::io::BufWriter;
993    ///
994    /// let mut output = Vec::new();
995    /// let writer = BufWriter::new(&mut output);
996    /// ```
997    pub fn new(inner: W) -> BufWriter<W> {
998        BufWriter::with_capacity(DEFAULT_BUF_SIZE, inner)
999    }
1000
1001    /// Creates a buffered writer with the specified buffer capacity.
1002    ///
1003    /// # Examples
1004    ///
1005    /// ```
1006    /// use futures_lite::io::BufWriter;
1007    ///
1008    /// let mut output = Vec::new();
1009    /// let writer = BufWriter::with_capacity(100, &mut output);
1010    /// ```
1011    pub fn with_capacity(capacity: usize, inner: W) -> BufWriter<W> {
1012        BufWriter {
1013            inner,
1014            buf: Vec::with_capacity(capacity),
1015            written: 0,
1016        }
1017    }
1018
1019    /// Gets a reference to the underlying writer.
1020    ///
1021    /// # Examples
1022    ///
1023    /// ```
1024    /// use futures_lite::io::BufWriter;
1025    ///
1026    /// let mut output = Vec::new();
1027    /// let writer = BufWriter::new(&mut output);
1028    ///
1029    /// let r = writer.get_ref();
1030    /// ```
1031    pub fn get_ref(&self) -> &W {
1032        &self.inner
1033    }
1034
1035    /// Gets a mutable reference to the underlying writer.
1036    ///
1037    /// It is not advisable to directly write to the underlying writer.
1038    ///
1039    /// # Examples
1040    ///
1041    /// ```
1042    /// use futures_lite::io::BufWriter;
1043    ///
1044    /// let mut output = Vec::new();
1045    /// let mut writer = BufWriter::new(&mut output);
1046    ///
1047    /// let r = writer.get_mut();
1048    /// ```
1049    pub fn get_mut(&mut self) -> &mut W {
1050        &mut self.inner
1051    }
1052
1053    /// Gets a pinned mutable reference to the underlying writer.
1054    ///
1055    /// It is not not advisable to directly write to the underlying writer.
1056    fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut W> {
1057        self.project().inner
1058    }
1059
1060    /// Unwraps the buffered writer, returning the underlying writer.
1061    ///
1062    /// Note that any leftover data in the internal buffer will be lost. If you don't want to lose
1063    /// that data, flush the buffered writer before unwrapping it.
1064    ///
1065    /// # Examples
1066    ///
1067    /// ```
1068    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
1069    ///
1070    /// # spin_on::spin_on(async {
1071    /// let mut output = vec![1, 2, 3];
1072    /// let mut writer = BufWriter::new(&mut output);
1073    ///
1074    /// writer.write_all(&[4]).await?;
1075    /// writer.flush().await?;
1076    /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4]);
1077    /// # std::io::Result::Ok(()) });
1078    /// ```
1079    pub fn into_inner(self) -> W {
1080        self.inner
1081    }
1082
1083    /// Returns a reference to the internal buffer.
1084    ///
1085    /// # Examples
1086    ///
1087    /// ```
1088    /// use futures_lite::io::BufWriter;
1089    ///
1090    /// let mut output = Vec::new();
1091    /// let writer = BufWriter::new(&mut output);
1092    ///
1093    /// // The internal buffer is empty until the first write request.
1094    /// assert_eq!(writer.buffer(), &[]);
1095    /// ```
1096    pub fn buffer(&self) -> &[u8] {
1097        &self.buf
1098    }
1099
1100    /// Flush the buffer.
1101    fn poll_flush_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1102        let mut this = self.project();
1103        let len = this.buf.len();
1104        let mut ret = Ok(());
1105
1106        while *this.written < len {
1107            match this
1108                .inner
1109                .as_mut()
1110                .poll_write(cx, &this.buf[*this.written..])
1111            {
1112                Poll::Ready(Ok(0)) => {
1113                    ret = Err(Error::new(
1114                        ErrorKind::WriteZero,
1115                        "Failed to write buffered data",
1116                    ));
1117                    break;
1118                }
1119                Poll::Ready(Ok(n)) => *this.written += n,
1120                Poll::Ready(Err(ref e)) if e.kind() == ErrorKind::Interrupted => {}
1121                Poll::Ready(Err(e)) => {
1122                    ret = Err(e);
1123                    break;
1124                }
1125                Poll::Pending => return Poll::Pending,
1126            }
1127        }
1128
1129        if *this.written > 0 {
1130            this.buf.drain(..*this.written);
1131        }
1132        *this.written = 0;
1133
1134        Poll::Ready(ret)
1135    }
1136}
1137
1138impl<W: fmt::Debug> fmt::Debug for BufWriter<W> {
1139    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1140        f.debug_struct("BufWriter")
1141            .field("writer", &self.inner)
1142            .field("buf", &self.buf)
1143            .finish()
1144    }
1145}
1146
1147impl<W: AsyncWrite> AsyncWrite for BufWriter<W> {
1148    fn poll_write(
1149        mut self: Pin<&mut Self>,
1150        cx: &mut Context<'_>,
1151        buf: &[u8],
1152    ) -> Poll<Result<usize>> {
1153        if self.buf.len() + buf.len() > self.buf.capacity() {
1154            ready!(self.as_mut().poll_flush_buf(cx))?;
1155        }
1156        if buf.len() >= self.buf.capacity() {
1157            self.get_pin_mut().poll_write(cx, buf)
1158        } else {
1159            Pin::new(&mut *self.project().buf).poll_write(cx, buf)
1160        }
1161    }
1162
1163    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1164        ready!(self.as_mut().poll_flush_buf(cx))?;
1165        self.get_pin_mut().poll_flush(cx)
1166    }
1167
1168    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1169        ready!(self.as_mut().poll_flush_buf(cx))?;
1170        self.get_pin_mut().poll_close(cx)
1171    }
1172}
1173
1174impl<W: AsyncWrite + AsyncSeek> AsyncSeek for BufWriter<W> {
1175    /// Seek to the offset, in bytes, in the underlying writer.
1176    ///
1177    /// Seeking always writes out the internal buffer before seeking.
1178    fn poll_seek(
1179        mut self: Pin<&mut Self>,
1180        cx: &mut Context<'_>,
1181        pos: SeekFrom,
1182    ) -> Poll<Result<u64>> {
1183        ready!(self.as_mut().poll_flush_buf(cx))?;
1184        self.get_pin_mut().poll_seek(cx, pos)
1185    }
1186}
1187
1188/// Gives an in-memory buffer a cursor for reading and writing.
1189///
1190/// # Examples
1191///
1192/// ```
1193/// use futures_lite::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, Cursor, SeekFrom};
1194///
1195/// # spin_on::spin_on(async {
1196/// let mut bytes = b"hello".to_vec();
1197/// let mut cursor = Cursor::new(&mut bytes);
1198///
1199/// // Overwrite 'h' with 'H'.
1200/// cursor.write_all(b"H").await?;
1201///
1202/// // Move the cursor one byte forward.
1203/// cursor.seek(SeekFrom::Current(1)).await?;
1204///
1205/// // Read a byte.
1206/// let mut byte = [0];
1207/// cursor.read_exact(&mut byte).await?;
1208/// assert_eq!(&byte, b"l");
1209///
1210/// // Check the final buffer.
1211/// assert_eq!(bytes, b"Hello");
1212/// # std::io::Result::Ok(()) });
1213/// ```
1214#[derive(Clone, Debug, Default)]
1215pub struct Cursor<T> {
1216    inner: std::io::Cursor<T>,
1217}
1218
1219impl<T> Cursor<T> {
1220    /// Creates a cursor for an in-memory buffer.
1221    ///
1222    /// Cursor's initial position is 0 even if the underlying buffer is not empty. Writing using
1223    /// [`Cursor`] will overwrite the existing contents unless the cursor is moved to the end of
1224    /// the buffer using [`set_position()`][Cursor::set_position()`] or
1225    /// [`seek()`][`AsyncSeekExt::seek()`].
1226    ///
1227    /// # Examples
1228    ///
1229    /// ```
1230    /// use futures_lite::io::Cursor;
1231    ///
1232    /// let cursor = Cursor::new(Vec::<u8>::new());
1233    /// ```
1234    pub fn new(inner: T) -> Cursor<T> {
1235        Cursor {
1236            inner: std::io::Cursor::new(inner),
1237        }
1238    }
1239
1240    /// Gets a reference to the underlying buffer.
1241    ///
1242    /// # Examples
1243    ///
1244    /// ```
1245    /// use futures_lite::io::Cursor;
1246    ///
1247    /// let cursor = Cursor::new(Vec::<u8>::new());
1248    /// let r = cursor.get_ref();
1249    /// ```
1250    pub fn get_ref(&self) -> &T {
1251        self.inner.get_ref()
1252    }
1253
1254    /// Gets a mutable reference to the underlying buffer.
1255    ///
1256    /// # Examples
1257    ///
1258    /// ```
1259    /// use futures_lite::io::Cursor;
1260    ///
1261    /// let mut cursor = Cursor::new(Vec::<u8>::new());
1262    /// let r = cursor.get_mut();
1263    /// ```
1264    pub fn get_mut(&mut self) -> &mut T {
1265        self.inner.get_mut()
1266    }
1267
1268    /// Unwraps the cursor, returning the underlying buffer.
1269    ///
1270    /// # Examples
1271    ///
1272    /// ```
1273    /// use futures_lite::io::Cursor;
1274    ///
1275    /// let cursor = Cursor::new(vec![1, 2, 3]);
1276    /// assert_eq!(cursor.into_inner(), [1, 2, 3]);
1277    /// ```
1278    pub fn into_inner(self) -> T {
1279        self.inner.into_inner()
1280    }
1281
1282    /// Returns the current position of this cursor.
1283    ///
1284    /// # Examples
1285    ///
1286    /// ```
1287    /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
1288    ///
1289    /// # spin_on::spin_on(async {
1290    /// let mut cursor = Cursor::new(b"hello");
1291    /// assert_eq!(cursor.position(), 0);
1292    ///
1293    /// cursor.seek(SeekFrom::Start(2)).await?;
1294    /// assert_eq!(cursor.position(), 2);
1295    /// # std::io::Result::Ok(()) });
1296    /// ```
1297    pub fn position(&self) -> u64 {
1298        self.inner.position()
1299    }
1300
1301    /// Sets the position of this cursor.
1302    ///
1303    /// # Examples
1304    ///
1305    /// ```
1306    /// use futures_lite::io::Cursor;
1307    ///
1308    /// let mut cursor = Cursor::new(b"hello");
1309    /// assert_eq!(cursor.position(), 0);
1310    ///
1311    /// cursor.set_position(2);
1312    /// assert_eq!(cursor.position(), 2);
1313    /// ```
1314    pub fn set_position(&mut self, pos: u64) {
1315        self.inner.set_position(pos)
1316    }
1317}
1318
1319impl<T> AsyncSeek for Cursor<T>
1320where
1321    T: AsRef<[u8]> + Unpin,
1322{
1323    fn poll_seek(
1324        mut self: Pin<&mut Self>,
1325        _: &mut Context<'_>,
1326        pos: SeekFrom,
1327    ) -> Poll<Result<u64>> {
1328        Poll::Ready(std::io::Seek::seek(&mut self.inner, pos))
1329    }
1330}
1331
1332impl<T> AsyncRead for Cursor<T>
1333where
1334    T: AsRef<[u8]> + Unpin,
1335{
1336    fn poll_read(
1337        mut self: Pin<&mut Self>,
1338        _cx: &mut Context<'_>,
1339        buf: &mut [u8],
1340    ) -> Poll<Result<usize>> {
1341        Poll::Ready(std::io::Read::read(&mut self.inner, buf))
1342    }
1343
1344    fn poll_read_vectored(
1345        mut self: Pin<&mut Self>,
1346        _: &mut Context<'_>,
1347        bufs: &mut [IoSliceMut<'_>],
1348    ) -> Poll<Result<usize>> {
1349        Poll::Ready(std::io::Read::read_vectored(&mut self.inner, bufs))
1350    }
1351}
1352
1353impl<T> AsyncBufRead for Cursor<T>
1354where
1355    T: AsRef<[u8]> + Unpin,
1356{
1357    fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<&[u8]>> {
1358        Poll::Ready(std::io::BufRead::fill_buf(&mut self.get_mut().inner))
1359    }
1360
1361    fn consume(mut self: Pin<&mut Self>, amt: usize) {
1362        std::io::BufRead::consume(&mut self.inner, amt)
1363    }
1364}
1365
1366impl AsyncWrite for Cursor<&mut [u8]> {
1367    fn poll_write(
1368        mut self: Pin<&mut Self>,
1369        _: &mut Context<'_>,
1370        buf: &[u8],
1371    ) -> Poll<Result<usize>> {
1372        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1373    }
1374
1375    fn poll_write_vectored(
1376        mut self: Pin<&mut Self>,
1377        _: &mut Context<'_>,
1378        bufs: &[IoSlice<'_>],
1379    ) -> Poll<Result<usize>> {
1380        Poll::Ready(std::io::Write::write_vectored(&mut self.inner, bufs))
1381    }
1382
1383    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1384        Poll::Ready(std::io::Write::flush(&mut self.inner))
1385    }
1386
1387    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1388        self.poll_flush(cx)
1389    }
1390}
1391
1392impl AsyncWrite for Cursor<&mut Vec<u8>> {
1393    fn poll_write(
1394        mut self: Pin<&mut Self>,
1395        _: &mut Context<'_>,
1396        buf: &[u8],
1397    ) -> Poll<Result<usize>> {
1398        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1399    }
1400
1401    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1402        self.poll_flush(cx)
1403    }
1404
1405    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1406        Poll::Ready(std::io::Write::flush(&mut self.inner))
1407    }
1408}
1409
1410impl AsyncWrite for Cursor<Vec<u8>> {
1411    fn poll_write(
1412        mut self: Pin<&mut Self>,
1413        _: &mut Context<'_>,
1414        buf: &[u8],
1415    ) -> Poll<Result<usize>> {
1416        Poll::Ready(std::io::Write::write(&mut self.inner, buf))
1417    }
1418
1419    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
1420        self.poll_flush(cx)
1421    }
1422
1423    fn poll_flush(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1424        Poll::Ready(std::io::Write::flush(&mut self.inner))
1425    }
1426}
1427
1428/// Creates an empty reader.
1429///
1430/// # Examples
1431///
1432/// ```
1433/// use futures_lite::io::{self, AsyncReadExt};
1434///
1435/// # spin_on::spin_on(async {
1436/// let mut reader = io::empty();
1437///
1438/// let mut contents = Vec::new();
1439/// reader.read_to_end(&mut contents).await?;
1440/// assert!(contents.is_empty());
1441/// # std::io::Result::Ok(()) });
1442/// ```
1443pub fn empty() -> Empty {
1444    Empty { _private: () }
1445}
1446
1447/// Reader for the [`empty()`] function.
1448pub struct Empty {
1449    _private: (),
1450}
1451
1452impl fmt::Debug for Empty {
1453    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1454        f.pad("Empty { .. }")
1455    }
1456}
1457
1458impl AsyncRead for Empty {
1459    #[inline]
1460    fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) -> Poll<Result<usize>> {
1461        Poll::Ready(Ok(0))
1462    }
1463}
1464
1465impl AsyncBufRead for Empty {
1466    #[inline]
1467    fn poll_fill_buf<'a>(self: Pin<&'a mut Self>, _: &mut Context<'_>) -> Poll<Result<&'a [u8]>> {
1468        Poll::Ready(Ok(&[]))
1469    }
1470
1471    #[inline]
1472    fn consume(self: Pin<&mut Self>, _: usize) {}
1473}
1474
1475/// Creates an infinite reader that reads the same byte repeatedly.
1476///
1477/// # Examples
1478///
1479/// ```
1480/// use futures_lite::io::{self, AsyncReadExt};
1481///
1482/// # spin_on::spin_on(async {
1483/// let mut reader = io::repeat(b'a');
1484///
1485/// let mut contents = vec![0; 5];
1486/// reader.read_exact(&mut contents).await?;
1487/// assert_eq!(contents, b"aaaaa");
1488/// # std::io::Result::Ok(()) });
1489/// ```
1490pub fn repeat(byte: u8) -> Repeat {
1491    Repeat { byte }
1492}
1493
1494/// Reader for the [`repeat()`] function.
1495#[derive(Debug)]
1496pub struct Repeat {
1497    byte: u8,
1498}
1499
1500impl AsyncRead for Repeat {
1501    #[inline]
1502    fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) -> Poll<Result<usize>> {
1503        for b in &mut *buf {
1504            *b = self.byte;
1505        }
1506        Poll::Ready(Ok(buf.len()))
1507    }
1508}
1509
1510/// Creates a writer that consumes and drops all data.
1511///
1512/// # Examples
1513///
1514/// ```
1515/// use futures_lite::io::{self, AsyncWriteExt};
1516///
1517/// # spin_on::spin_on(async {
1518/// let mut writer = io::sink();
1519/// writer.write_all(b"hello").await?;
1520/// # std::io::Result::Ok(()) });
1521/// ```
1522pub fn sink() -> Sink {
1523    Sink { _private: () }
1524}
1525
1526/// Writer for the [`sink()`] function.
1527#[derive(Debug)]
1528pub struct Sink {
1529    _private: (),
1530}
1531
1532impl AsyncWrite for Sink {
1533    #[inline]
1534    fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
1535        Poll::Ready(Ok(buf.len()))
1536    }
1537
1538    #[inline]
1539    fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1540        Poll::Ready(Ok(()))
1541    }
1542
1543    #[inline]
1544    fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<()>> {
1545        Poll::Ready(Ok(()))
1546    }
1547}
1548
1549/// Extension trait for [`AsyncBufRead`].
1550pub trait AsyncBufReadExt: AsyncBufRead {
1551    /// Returns the contents of the internal buffer, filling it with more data if empty.
1552    ///
1553    /// If the stream has reached EOF, an empty buffer will be returned.
1554    ///
1555    /// # Examples
1556    ///
1557    /// ```
1558    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1559    /// use std::pin::Pin;
1560    ///
1561    /// # spin_on::spin_on(async {
1562    /// let input: &[u8] = b"hello world";
1563    /// let mut reader = BufReader::with_capacity(5, input);
1564    ///
1565    /// assert_eq!(reader.fill_buf().await?, b"hello");
1566    /// reader.consume(2);
1567    /// assert_eq!(reader.fill_buf().await?, b"llo");
1568    /// reader.consume(3);
1569    /// assert_eq!(reader.fill_buf().await?, b" worl");
1570    /// # std::io::Result::Ok(()) });
1571    /// ```
1572    fn fill_buf(&mut self) -> FillBuf<'_, Self>
1573    where
1574        Self: Unpin,
1575    {
1576        FillBuf { reader: Some(self) }
1577    }
1578
1579    /// Consumes `amt` buffered bytes.
1580    ///
1581    /// This method does not perform any I/O, it simply consumes some amount of bytes from the
1582    /// internal buffer.
1583    ///
1584    /// The `amt` must be <= the number of bytes in the buffer returned by
1585    /// [`fill_buf()`][`AsyncBufReadExt::fill_buf()`].
1586    ///
1587    /// # Examples
1588    ///
1589    /// ```
1590    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1591    /// use std::pin::Pin;
1592    ///
1593    /// # spin_on::spin_on(async {
1594    /// let input: &[u8] = b"hello";
1595    /// let mut reader = BufReader::with_capacity(4, input);
1596    ///
1597    /// assert_eq!(reader.fill_buf().await?, b"hell");
1598    /// reader.consume(2);
1599    /// assert_eq!(reader.fill_buf().await?, b"ll");
1600    /// # std::io::Result::Ok(()) });
1601    /// ```
1602    fn consume(&mut self, amt: usize)
1603    where
1604        Self: Unpin,
1605    {
1606        AsyncBufRead::consume(Pin::new(self), amt);
1607    }
1608
1609    /// Reads all bytes and appends them into `buf` until the delimiter `byte` or EOF is found.
1610    ///
1611    /// This method will read bytes from the underlying stream until the delimiter or EOF is
1612    /// found. All bytes up to and including the delimiter (if found) will be appended to `buf`.
1613    ///
1614    /// If successful, returns the total number of bytes read.
1615    ///
1616    /// # Examples
1617    ///
1618    /// ```
1619    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1620    ///
1621    /// # spin_on::spin_on(async {
1622    /// let input: &[u8] = b"hello";
1623    /// let mut reader = BufReader::new(input);
1624    ///
1625    /// let mut buf = Vec::new();
1626    /// let n = reader.read_until(b'\n', &mut buf).await?;
1627    /// # std::io::Result::Ok(()) });
1628    /// ```
1629    fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntilFuture<'a, Self>
1630    where
1631        Self: Unpin,
1632    {
1633        ReadUntilFuture {
1634            reader: self,
1635            byte,
1636            buf,
1637            read: 0,
1638        }
1639    }
1640
1641    /// Reads all bytes and appends them into `buf` until a newline (the 0xA byte) or EOF is found.
1642    ///
1643    /// This method will read bytes from the underlying stream until the newline delimiter (the
1644    /// 0xA byte) or EOF is found. All bytes up to, and including, the newline delimiter (if found)
1645    /// will be appended to `buf`.
1646    ///
1647    /// If successful, returns the total number of bytes read.
1648    ///
1649    /// # Examples
1650    ///
1651    /// ```
1652    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1653    ///
1654    /// # spin_on::spin_on(async {
1655    /// let input: &[u8] = b"hello";
1656    /// let mut reader = BufReader::new(input);
1657    ///
1658    /// let mut line = String::new();
1659    /// let n = reader.read_line(&mut line).await?;
1660    /// # std::io::Result::Ok(()) });
1661    /// ```
1662    fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLineFuture<'a, Self>
1663    where
1664        Self: Unpin,
1665    {
1666        ReadLineFuture {
1667            reader: self,
1668            buf,
1669            bytes: Vec::new(),
1670            read: 0,
1671        }
1672    }
1673
1674    /// Returns a stream over the lines of this byte stream.
1675    ///
1676    /// The stream returned from this method yields items of type
1677    /// [`io::Result`][`super::io::Result`]`<`[`String`]`>`.
1678    /// Each string returned will *not* have a newline byte (the 0xA byte) or CRLF (0xD, 0xA bytes)
1679    /// at the end.
1680    ///
1681    /// # Examples
1682    ///
1683    /// ```
1684    /// use futures_lite::io::{AsyncBufReadExt, BufReader};
1685    /// use futures_lite::stream::StreamExt;
1686    ///
1687    /// # spin_on::spin_on(async {
1688    /// let input: &[u8] = b"hello\nworld\n";
1689    /// let mut reader = BufReader::new(input);
1690    /// let mut lines = reader.lines();
1691    ///
1692    /// while let Some(line) = lines.next().await {
1693    ///     println!("{}", line?);
1694    /// }
1695    /// # std::io::Result::Ok(()) });
1696    /// ```
1697    fn lines(self) -> Lines<Self>
1698    where
1699        Self: Sized,
1700    {
1701        Lines {
1702            reader: self,
1703            buf: String::new(),
1704            bytes: Vec::new(),
1705            read: 0,
1706        }
1707    }
1708
1709    /// Returns a stream over the contents of this reader split on the specified `byte`.
1710    ///
1711    /// The stream returned from this method yields items of type
1712    /// [`io::Result`][`super::io::Result`]`<`[`Vec<u8>`][`Vec`]`>`.
1713    /// Each vector returned will *not* have the delimiter byte at the end.
1714    ///
1715    /// # Examples
1716    ///
1717    /// ```
1718    /// use futures_lite::io::{AsyncBufReadExt, Cursor};
1719    /// use futures_lite::stream::StreamExt;
1720    ///
1721    /// # spin_on::spin_on(async {
1722    /// let cursor = Cursor::new(b"lorem-ipsum-dolor");
1723    /// let items: Vec<Vec<u8>> = cursor.split(b'-').try_collect().await?;
1724    ///
1725    /// assert_eq!(items[0], b"lorem");
1726    /// assert_eq!(items[1], b"ipsum");
1727    /// assert_eq!(items[2], b"dolor");
1728    /// # std::io::Result::Ok(()) });
1729    /// ```
1730    fn split(self, byte: u8) -> Split<Self>
1731    where
1732        Self: Sized,
1733    {
1734        Split {
1735            reader: self,
1736            buf: Vec::new(),
1737            delim: byte,
1738            read: 0,
1739        }
1740    }
1741}
1742
1743impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
1744
1745/// Future for the [`AsyncBufReadExt::fill_buf()`] method.
1746#[derive(Debug)]
1747#[must_use = "futures do nothing unless you `.await` or poll them"]
1748pub struct FillBuf<'a, R: ?Sized> {
1749    reader: Option<&'a mut R>,
1750}
1751
1752impl<R: ?Sized> Unpin for FillBuf<'_, R> {}
1753
1754impl<'a, R> Future for FillBuf<'a, R>
1755where
1756    R: AsyncBufRead + Unpin + ?Sized,
1757{
1758    type Output = Result<&'a [u8]>;
1759
1760    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1761        let this = &mut *self;
1762        let reader = this
1763            .reader
1764            .take()
1765            .expect("polled `FillBuf` after completion");
1766
1767        match Pin::new(&mut *reader).poll_fill_buf(cx) {
1768            Poll::Ready(Ok(_)) => match Pin::new(reader).poll_fill_buf(cx) {
1769                Poll::Ready(Ok(slice)) => Poll::Ready(Ok(slice)),
1770                poll => panic!("`poll_fill_buf()` was ready but now it isn't: {:?}", poll),
1771            },
1772            Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
1773            Poll::Pending => {
1774                this.reader = Some(reader);
1775                Poll::Pending
1776            }
1777        }
1778    }
1779}
1780
1781/// Future for the [`AsyncBufReadExt::read_until()`] method.
1782#[derive(Debug)]
1783#[must_use = "futures do nothing unless you `.await` or poll them"]
1784pub struct ReadUntilFuture<'a, R: Unpin + ?Sized> {
1785    reader: &'a mut R,
1786    byte: u8,
1787    buf: &'a mut Vec<u8>,
1788    read: usize,
1789}
1790
1791impl<R: Unpin + ?Sized> Unpin for ReadUntilFuture<'_, R> {}
1792
1793impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadUntilFuture<'_, R> {
1794    type Output = Result<usize>;
1795
1796    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1797        let Self {
1798            reader,
1799            byte,
1800            buf,
1801            read,
1802        } = &mut *self;
1803        read_until_internal(Pin::new(reader), cx, *byte, buf, read)
1804    }
1805}
1806
1807fn read_until_internal<R: AsyncBufReadExt + ?Sized>(
1808    mut reader: Pin<&mut R>,
1809    cx: &mut Context<'_>,
1810    byte: u8,
1811    buf: &mut Vec<u8>,
1812    read: &mut usize,
1813) -> Poll<Result<usize>> {
1814    loop {
1815        let (done, used) = {
1816            let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
1817
1818            if let Some(i) = memchr(byte, available) {
1819                buf.extend_from_slice(&available[..=i]);
1820                (true, i + 1)
1821            } else {
1822                buf.extend_from_slice(available);
1823                (false, available.len())
1824            }
1825        };
1826
1827        reader.as_mut().consume(used);
1828        *read += used;
1829
1830        if done || used == 0 {
1831            return Poll::Ready(Ok(mem::replace(read, 0)));
1832        }
1833    }
1834}
1835
1836/// Future for the [`AsyncBufReadExt::read_line()`] method.
1837#[derive(Debug)]
1838#[must_use = "futures do nothing unless you `.await` or poll them"]
1839pub struct ReadLineFuture<'a, R: Unpin + ?Sized> {
1840    reader: &'a mut R,
1841    buf: &'a mut String,
1842    bytes: Vec<u8>,
1843    read: usize,
1844}
1845
1846impl<R: Unpin + ?Sized> Unpin for ReadLineFuture<'_, R> {}
1847
1848impl<R: AsyncBufRead + Unpin + ?Sized> Future for ReadLineFuture<'_, R> {
1849    type Output = Result<usize>;
1850
1851    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1852        let Self {
1853            reader,
1854            buf,
1855            bytes,
1856            read,
1857        } = &mut *self;
1858        read_line_internal(Pin::new(reader), cx, buf, bytes, read)
1859    }
1860}
1861
1862pin_project! {
1863    /// Stream for the [`AsyncBufReadExt::lines()`] method.
1864    #[derive(Debug)]
1865    #[must_use = "streams do nothing unless polled"]
1866    pub struct Lines<R> {
1867        #[pin]
1868        reader: R,
1869        buf: String,
1870        bytes: Vec<u8>,
1871        read: usize,
1872    }
1873}
1874
1875impl<R: AsyncBufRead> Stream for Lines<R> {
1876    type Item = Result<String>;
1877
1878    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1879        let this = self.project();
1880
1881        let n = ready!(read_line_internal(
1882            this.reader,
1883            cx,
1884            this.buf,
1885            this.bytes,
1886            this.read
1887        ))?;
1888        if n == 0 && this.buf.is_empty() {
1889            return Poll::Ready(None);
1890        }
1891
1892        if this.buf.ends_with('\n') {
1893            this.buf.pop();
1894            if this.buf.ends_with('\r') {
1895                this.buf.pop();
1896            }
1897        }
1898        Poll::Ready(Some(Ok(mem::take(this.buf))))
1899    }
1900}
1901
1902fn read_line_internal<R: AsyncBufRead + ?Sized>(
1903    reader: Pin<&mut R>,
1904    cx: &mut Context<'_>,
1905    buf: &mut String,
1906    bytes: &mut Vec<u8>,
1907    read: &mut usize,
1908) -> Poll<Result<usize>> {
1909    let ret = ready!(read_until_internal(reader, cx, b'\n', bytes, read));
1910
1911    match String::from_utf8(mem::take(bytes)) {
1912        Ok(s) => {
1913            debug_assert!(buf.is_empty());
1914            debug_assert_eq!(*read, 0);
1915            *buf = s;
1916            Poll::Ready(ret)
1917        }
1918        Err(_) => Poll::Ready(ret.and_then(|_| {
1919            Err(Error::new(
1920                ErrorKind::InvalidData,
1921                "stream did not contain valid UTF-8",
1922            ))
1923        })),
1924    }
1925}
1926
1927pin_project! {
1928    /// Stream for the [`AsyncBufReadExt::split()`] method.
1929    #[derive(Debug)]
1930    #[must_use = "streams do nothing unless polled"]
1931    pub struct Split<R> {
1932        #[pin]
1933        reader: R,
1934        buf: Vec<u8>,
1935        read: usize,
1936        delim: u8,
1937    }
1938}
1939
1940impl<R: AsyncBufRead> Stream for Split<R> {
1941    type Item = Result<Vec<u8>>;
1942
1943    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1944        let this = self.project();
1945
1946        let n = ready!(read_until_internal(
1947            this.reader,
1948            cx,
1949            *this.delim,
1950            this.buf,
1951            this.read
1952        ))?;
1953        if n == 0 && this.buf.is_empty() {
1954            return Poll::Ready(None);
1955        }
1956
1957        if this.buf[this.buf.len() - 1] == *this.delim {
1958            this.buf.pop();
1959        }
1960        Poll::Ready(Some(Ok(mem::take(this.buf))))
1961    }
1962}
1963
1964/// Extension trait for [`AsyncRead`].
1965pub trait AsyncReadExt: AsyncRead {
1966    /// Reads some bytes from the byte stream.
1967    ///
1968    /// On success, returns the total number of bytes read.
1969    ///
1970    /// If the return value is `Ok(n)`, then it must be guaranteed that
1971    /// `0 <= n <= buf.len()`. A nonzero `n` value indicates that the buffer has been
1972    /// filled with `n` bytes of data. If `n` is `0`, then it can indicate one of two
1973    /// scenarios:
1974    ///
1975    /// 1. This reader has reached its "end of file" and will likely no longer be able to
1976    ///    produce bytes. Note that this does not mean that the reader will always no
1977    ///    longer be able to produce bytes.
1978    /// 2. The buffer specified was 0 bytes in length.
1979    ///
1980    /// # Examples
1981    ///
1982    /// ```
1983    /// use futures_lite::io::{AsyncReadExt, BufReader};
1984    ///
1985    /// # spin_on::spin_on(async {
1986    /// let input: &[u8] = b"hello";
1987    /// let mut reader = BufReader::new(input);
1988    ///
1989    /// let mut buf = vec![0; 1024];
1990    /// let n = reader.read(&mut buf).await?;
1991    /// # std::io::Result::Ok(()) });
1992    /// ```
1993    fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadFuture<'a, Self>
1994    where
1995        Self: Unpin,
1996    {
1997        ReadFuture { reader: self, buf }
1998    }
1999
2000    /// Like [`read()`][`AsyncReadExt::read()`], except it reads into a slice of buffers.
2001    ///
2002    /// Data is copied to fill each buffer in order, with the final buffer possibly being
2003    /// only partially filled. This method must behave same as a single call to
2004    /// [`read()`][`AsyncReadExt::read()`] with the buffers concatenated would.
2005    fn read_vectored<'a>(
2006        &'a mut self,
2007        bufs: &'a mut [IoSliceMut<'a>],
2008    ) -> ReadVectoredFuture<'a, Self>
2009    where
2010        Self: Unpin,
2011    {
2012        ReadVectoredFuture { reader: self, bufs }
2013    }
2014
2015    /// Reads the entire contents and appends them to a [`Vec`].
2016    ///
2017    /// On success, returns the total number of bytes read.
2018    ///
2019    /// # Examples
2020    ///
2021    /// ```
2022    /// use futures_lite::io::{AsyncReadExt, Cursor};
2023    ///
2024    /// # spin_on::spin_on(async {
2025    /// let mut reader = Cursor::new(vec![1, 2, 3]);
2026    /// let mut contents = Vec::new();
2027    ///
2028    /// let n = reader.read_to_end(&mut contents).await?;
2029    /// assert_eq!(n, 3);
2030    /// assert_eq!(contents, [1, 2, 3]);
2031    /// # std::io::Result::Ok(()) });
2032    /// ```
2033    fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEndFuture<'a, Self>
2034    where
2035        Self: Unpin,
2036    {
2037        let start_len = buf.len();
2038        ReadToEndFuture {
2039            reader: self,
2040            buf,
2041            start_len,
2042        }
2043    }
2044
2045    /// Reads the entire contents and appends them to a [`String`].
2046    ///
2047    /// On success, returns the total number of bytes read.
2048    ///
2049    /// # Examples
2050    ///
2051    /// ```
2052    /// use futures_lite::io::{AsyncReadExt, Cursor};
2053    ///
2054    /// # spin_on::spin_on(async {
2055    /// let mut reader = Cursor::new(&b"hello");
2056    /// let mut contents = String::new();
2057    ///
2058    /// let n = reader.read_to_string(&mut contents).await?;
2059    /// assert_eq!(n, 5);
2060    /// assert_eq!(contents, "hello");
2061    /// # std::io::Result::Ok(()) });
2062    /// ```
2063    fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToStringFuture<'a, Self>
2064    where
2065        Self: Unpin,
2066    {
2067        ReadToStringFuture {
2068            reader: self,
2069            buf,
2070            bytes: Vec::new(),
2071            start_len: 0,
2072        }
2073    }
2074
2075    /// Reads the exact number of bytes required to fill `buf`.
2076    ///
2077    /// # Examples
2078    ///
2079    /// ```
2080    /// use futures_lite::io::{AsyncReadExt, Cursor};
2081    ///
2082    /// # spin_on::spin_on(async {
2083    /// let mut reader = Cursor::new(&b"hello");
2084    /// let mut contents = vec![0; 3];
2085    ///
2086    /// reader.read_exact(&mut contents).await?;
2087    /// assert_eq!(contents, b"hel");
2088    /// # std::io::Result::Ok(()) });
2089    /// ```
2090    fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExactFuture<'a, Self>
2091    where
2092        Self: Unpin,
2093    {
2094        ReadExactFuture { reader: self, buf }
2095    }
2096
2097    /// Creates an adapter which will read at most `limit` bytes from it.
2098    ///
2099    /// This method returns a new instance of [`AsyncRead`] which will read at most
2100    /// `limit` bytes, after which it will always return `Ok(0)` indicating EOF.
2101    ///
2102    /// # Examples
2103    ///
2104    /// ```
2105    /// use futures_lite::io::{AsyncReadExt, Cursor};
2106    ///
2107    /// # spin_on::spin_on(async {
2108    /// let mut reader = Cursor::new(&b"hello");
2109    /// let mut contents = String::new();
2110    ///
2111    /// let n = reader.take(3).read_to_string(&mut contents).await?;
2112    /// assert_eq!(n, 3);
2113    /// assert_eq!(contents, "hel");
2114    /// # std::io::Result::Ok(()) });
2115    /// ```
2116    fn take(self, limit: u64) -> Take<Self>
2117    where
2118        Self: Sized,
2119    {
2120        Take { inner: self, limit }
2121    }
2122
2123    /// Converts this [`AsyncRead`] into a [`Stream`] of bytes.
2124    ///
2125    /// The returned type implements [`Stream`] where `Item` is `io::Result<u8>`.
2126    ///
2127    /// ```
2128    /// use futures_lite::io::{AsyncReadExt, Cursor};
2129    /// use futures_lite::stream::StreamExt;
2130    ///
2131    /// # spin_on::spin_on(async {
2132    /// let reader = Cursor::new(&b"hello");
2133    /// let mut bytes = reader.bytes();
2134    ///
2135    /// while let Some(byte) = bytes.next().await {
2136    ///     println!("byte: {}", byte?);
2137    /// }
2138    /// # std::io::Result::Ok(()) });
2139    /// ```
2140    fn bytes(self) -> Bytes<Self>
2141    where
2142        Self: Sized,
2143    {
2144        Bytes { inner: self }
2145    }
2146
2147    /// Creates an adapter which will chain this stream with another.
2148    ///
2149    /// The returned [`AsyncRead`] instance will first read all bytes from this reader
2150    /// until EOF is found, and then continue with `next`.
2151    ///
2152    /// # Examples
2153    ///
2154    /// ```
2155    /// use futures_lite::io::{AsyncReadExt, Cursor};
2156    ///
2157    /// # spin_on::spin_on(async {
2158    /// let r1 = Cursor::new(&b"hello");
2159    /// let r2 = Cursor::new(&b"world");
2160    /// let mut reader = r1.chain(r2);
2161    ///
2162    /// let mut contents = String::new();
2163    /// reader.read_to_string(&mut contents).await?;
2164    /// assert_eq!(contents, "helloworld");
2165    /// # std::io::Result::Ok(()) });
2166    /// ```
2167    fn chain<R: AsyncRead>(self, next: R) -> Chain<Self, R>
2168    where
2169        Self: Sized,
2170    {
2171        Chain {
2172            first: self,
2173            second: next,
2174            done_first: false,
2175        }
2176    }
2177
2178    /// Boxes the reader and changes its type to `dyn AsyncRead + Send + 'a`.
2179    ///
2180    /// # Examples
2181    ///
2182    /// ```
2183    /// use futures_lite::io::AsyncReadExt;
2184    ///
2185    /// let reader = [1, 2, 3].boxed_reader();
2186    /// ```
2187    #[cfg(feature = "alloc")]
2188    fn boxed_reader<'a>(self) -> Pin<Box<dyn AsyncRead + Send + 'a>>
2189    where
2190        Self: Sized + Send + 'a,
2191    {
2192        Box::pin(self)
2193    }
2194}
2195
2196impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
2197
2198/// Future for the [`AsyncReadExt::read()`] method.
2199#[derive(Debug)]
2200#[must_use = "futures do nothing unless you `.await` or poll them"]
2201pub struct ReadFuture<'a, R: Unpin + ?Sized> {
2202    reader: &'a mut R,
2203    buf: &'a mut [u8],
2204}
2205
2206impl<R: Unpin + ?Sized> Unpin for ReadFuture<'_, R> {}
2207
2208impl<R: AsyncRead + Unpin + ?Sized> Future for ReadFuture<'_, R> {
2209    type Output = Result<usize>;
2210
2211    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2212        let Self { reader, buf } = &mut *self;
2213        Pin::new(reader).poll_read(cx, buf)
2214    }
2215}
2216
2217/// Future for the [`AsyncReadExt::read_vectored()`] method.
2218#[derive(Debug)]
2219#[must_use = "futures do nothing unless you `.await` or poll them"]
2220pub struct ReadVectoredFuture<'a, R: Unpin + ?Sized> {
2221    reader: &'a mut R,
2222    bufs: &'a mut [IoSliceMut<'a>],
2223}
2224
2225impl<R: Unpin + ?Sized> Unpin for ReadVectoredFuture<'_, R> {}
2226
2227impl<R: AsyncRead + Unpin + ?Sized> Future for ReadVectoredFuture<'_, R> {
2228    type Output = Result<usize>;
2229
2230    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2231        let Self { reader, bufs } = &mut *self;
2232        Pin::new(reader).poll_read_vectored(cx, bufs)
2233    }
2234}
2235
2236/// Future for the [`AsyncReadExt::read_to_end()`] method.
2237#[derive(Debug)]
2238#[must_use = "futures do nothing unless you `.await` or poll them"]
2239pub struct ReadToEndFuture<'a, R: Unpin + ?Sized> {
2240    reader: &'a mut R,
2241    buf: &'a mut Vec<u8>,
2242    start_len: usize,
2243}
2244
2245impl<R: Unpin + ?Sized> Unpin for ReadToEndFuture<'_, R> {}
2246
2247impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToEndFuture<'_, R> {
2248    type Output = Result<usize>;
2249
2250    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2251        let Self {
2252            reader,
2253            buf,
2254            start_len,
2255        } = &mut *self;
2256        read_to_end_internal(Pin::new(reader), cx, buf, *start_len)
2257    }
2258}
2259
2260/// Future for the [`AsyncReadExt::read_to_string()`] method.
2261#[derive(Debug)]
2262#[must_use = "futures do nothing unless you `.await` or poll them"]
2263pub struct ReadToStringFuture<'a, R: Unpin + ?Sized> {
2264    reader: &'a mut R,
2265    buf: &'a mut String,
2266    bytes: Vec<u8>,
2267    start_len: usize,
2268}
2269
2270impl<R: Unpin + ?Sized> Unpin for ReadToStringFuture<'_, R> {}
2271
2272impl<R: AsyncRead + Unpin + ?Sized> Future for ReadToStringFuture<'_, R> {
2273    type Output = Result<usize>;
2274
2275    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2276        let Self {
2277            reader,
2278            buf,
2279            bytes,
2280            start_len,
2281        } = &mut *self;
2282        let reader = Pin::new(reader);
2283
2284        let ret = ready!(read_to_end_internal(reader, cx, bytes, *start_len));
2285
2286        match String::from_utf8(mem::take(bytes)) {
2287            Ok(s) => {
2288                debug_assert!(buf.is_empty());
2289                **buf = s;
2290                Poll::Ready(ret)
2291            }
2292            Err(_) => Poll::Ready(ret.and_then(|_| {
2293                Err(Error::new(
2294                    ErrorKind::InvalidData,
2295                    "stream did not contain valid UTF-8",
2296                ))
2297            })),
2298        }
2299    }
2300}
2301
2302// This uses an adaptive system to extend the vector when it fills. We want to
2303// avoid paying to allocate and zero a huge chunk of memory if the reader only
2304// has 4 bytes while still making large reads if the reader does have a ton
2305// of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
2306// time is 4,500 times (!) slower than this if the reader has a very small
2307// amount of data to return.
2308//
2309// Because we're extending the buffer with uninitialized data for trusted
2310// readers, we need to make sure to truncate that if any of this panics.
2311fn read_to_end_internal<R: AsyncRead + ?Sized>(
2312    mut rd: Pin<&mut R>,
2313    cx: &mut Context<'_>,
2314    buf: &mut Vec<u8>,
2315    start_len: usize,
2316) -> Poll<Result<usize>> {
2317    struct Guard<'a> {
2318        buf: &'a mut Vec<u8>,
2319        len: usize,
2320    }
2321
2322    impl Drop for Guard<'_> {
2323        fn drop(&mut self) {
2324            self.buf.resize(self.len, 0);
2325        }
2326    }
2327
2328    let mut g = Guard {
2329        len: buf.len(),
2330        buf,
2331    };
2332    let ret;
2333    loop {
2334        if g.len == g.buf.len() {
2335            g.buf.reserve(32);
2336            let capacity = g.buf.capacity();
2337            g.buf.resize(capacity, 0);
2338        }
2339
2340        match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
2341            Ok(0) => {
2342                ret = Poll::Ready(Ok(g.len - start_len));
2343                break;
2344            }
2345            Ok(n) => g.len += n,
2346            Err(e) => {
2347                ret = Poll::Ready(Err(e));
2348                break;
2349            }
2350        }
2351    }
2352
2353    ret
2354}
2355
2356/// Future for the [`AsyncReadExt::read_exact()`] method.
2357#[derive(Debug)]
2358#[must_use = "futures do nothing unless you `.await` or poll them"]
2359pub struct ReadExactFuture<'a, R: Unpin + ?Sized> {
2360    reader: &'a mut R,
2361    buf: &'a mut [u8],
2362}
2363
2364impl<R: Unpin + ?Sized> Unpin for ReadExactFuture<'_, R> {}
2365
2366impl<R: AsyncRead + Unpin + ?Sized> Future for ReadExactFuture<'_, R> {
2367    type Output = Result<()>;
2368
2369    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2370        let Self { reader, buf } = &mut *self;
2371
2372        while !buf.is_empty() {
2373            let n = ready!(Pin::new(&mut *reader).poll_read(cx, buf))?;
2374            let (_, rest) = mem::take(buf).split_at_mut(n);
2375            *buf = rest;
2376
2377            if n == 0 {
2378                return Poll::Ready(Err(ErrorKind::UnexpectedEof.into()));
2379            }
2380        }
2381
2382        Poll::Ready(Ok(()))
2383    }
2384}
2385
2386pin_project! {
2387    /// Reader for the [`AsyncReadExt::take()`] method.
2388    #[derive(Debug)]
2389    pub struct Take<R> {
2390        #[pin]
2391        inner: R,
2392        limit: u64,
2393    }
2394}
2395
2396impl<R> Take<R> {
2397    /// Returns the number of bytes before this adapter will return EOF.
2398    ///
2399    /// Note that EOF may be reached sooner if the underlying reader is shorter than the limit.
2400    ///
2401    /// # Examples
2402    ///
2403    /// ```
2404    /// use futures_lite::io::{AsyncReadExt, Cursor};
2405    ///
2406    /// let reader = Cursor::new("hello");
2407    ///
2408    /// let reader = reader.take(3);
2409    /// assert_eq!(reader.limit(), 3);
2410    /// ```
2411    pub fn limit(&self) -> u64 {
2412        self.limit
2413    }
2414
2415    /// Puts a limit on the number of bytes.
2416    ///
2417    /// Changing the limit is equivalent to creating a new adapter with [`AsyncReadExt::take()`].
2418    ///
2419    /// # Examples
2420    ///
2421    /// ```
2422    /// use futures_lite::io::{AsyncReadExt, Cursor};
2423    ///
2424    /// let reader = Cursor::new("hello");
2425    ///
2426    /// let mut reader = reader.take(10);
2427    /// assert_eq!(reader.limit(), 10);
2428    ///
2429    /// reader.set_limit(3);
2430    /// assert_eq!(reader.limit(), 3);
2431    /// ```
2432    pub fn set_limit(&mut self, limit: u64) {
2433        self.limit = limit;
2434    }
2435
2436    /// Gets a reference to the underlying reader.
2437    ///
2438    /// # Examples
2439    ///
2440    /// ```
2441    /// use futures_lite::io::{AsyncReadExt, Cursor};
2442    ///
2443    /// let reader = Cursor::new("hello");
2444    ///
2445    /// let reader = reader.take(3);
2446    /// let r = reader.get_ref();
2447    /// ```
2448    pub fn get_ref(&self) -> &R {
2449        &self.inner
2450    }
2451
2452    /// Gets a mutable reference to the underlying reader.
2453    ///
2454    /// # Examples
2455    ///
2456    /// ```
2457    /// use futures_lite::io::{AsyncReadExt, Cursor};
2458    ///
2459    /// let reader = Cursor::new("hello");
2460    ///
2461    /// let mut reader = reader.take(3);
2462    /// let r = reader.get_mut();
2463    /// ```
2464    pub fn get_mut(&mut self) -> &mut R {
2465        &mut self.inner
2466    }
2467
2468    /// Unwraps the adapter, returning the underlying reader.
2469    ///
2470    /// # Examples
2471    ///
2472    /// ```
2473    /// use futures_lite::io::{AsyncReadExt, Cursor};
2474    ///
2475    /// let reader = Cursor::new("hello");
2476    ///
2477    /// let reader = reader.take(3);
2478    /// let reader = reader.into_inner();
2479    /// ```
2480    pub fn into_inner(self) -> R {
2481        self.inner
2482    }
2483}
2484
2485impl<R: AsyncRead> AsyncRead for Take<R> {
2486    fn poll_read(
2487        self: Pin<&mut Self>,
2488        cx: &mut Context<'_>,
2489        buf: &mut [u8],
2490    ) -> Poll<Result<usize>> {
2491        let this = self.project();
2492        take_read_internal(this.inner, cx, buf, this.limit)
2493    }
2494}
2495
2496fn take_read_internal<R: AsyncRead + ?Sized>(
2497    mut rd: Pin<&mut R>,
2498    cx: &mut Context<'_>,
2499    buf: &mut [u8],
2500    limit: &mut u64,
2501) -> Poll<Result<usize>> {
2502    // Don't call into inner reader at all at EOF because it may still block
2503    if *limit == 0 {
2504        return Poll::Ready(Ok(0));
2505    }
2506
2507    let max = cmp::min(buf.len() as u64, *limit) as usize;
2508
2509    match ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) {
2510        Ok(n) => {
2511            *limit -= n as u64;
2512            Poll::Ready(Ok(n))
2513        }
2514        Err(e) => Poll::Ready(Err(e)),
2515    }
2516}
2517
2518impl<R: AsyncBufRead> AsyncBufRead for Take<R> {
2519    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2520        let this = self.project();
2521
2522        if *this.limit == 0 {
2523            return Poll::Ready(Ok(&[]));
2524        }
2525
2526        match ready!(this.inner.poll_fill_buf(cx)) {
2527            Ok(buf) => {
2528                let cap = cmp::min(buf.len() as u64, *this.limit) as usize;
2529                Poll::Ready(Ok(&buf[..cap]))
2530            }
2531            Err(e) => Poll::Ready(Err(e)),
2532        }
2533    }
2534
2535    fn consume(self: Pin<&mut Self>, amt: usize) {
2536        let this = self.project();
2537        // Don't let callers reset the limit by passing an overlarge value
2538        let amt = cmp::min(amt as u64, *this.limit) as usize;
2539        *this.limit -= amt as u64;
2540
2541        this.inner.consume(amt);
2542    }
2543}
2544
2545pin_project! {
2546    /// Reader for the [`AsyncReadExt::bytes()`] method.
2547    #[derive(Debug)]
2548    pub struct Bytes<R> {
2549        #[pin]
2550        inner: R,
2551    }
2552}
2553
2554impl<R: AsyncRead + Unpin> Stream for Bytes<R> {
2555    type Item = Result<u8>;
2556
2557    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2558        let mut byte = 0;
2559
2560        let rd = Pin::new(&mut self.inner);
2561
2562        match ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) {
2563            Ok(0) => Poll::Ready(None),
2564            Ok(..) => Poll::Ready(Some(Ok(byte))),
2565            Err(ref e) if e.kind() == ErrorKind::Interrupted => Poll::Pending,
2566            Err(e) => Poll::Ready(Some(Err(e))),
2567        }
2568    }
2569}
2570
2571impl<R: AsyncRead> AsyncRead for Bytes<R> {
2572    fn poll_read(
2573        self: Pin<&mut Self>,
2574        cx: &mut Context<'_>,
2575        buf: &mut [u8],
2576    ) -> Poll<Result<usize>> {
2577        self.project().inner.poll_read(cx, buf)
2578    }
2579
2580    fn poll_read_vectored(
2581        self: Pin<&mut Self>,
2582        cx: &mut Context<'_>,
2583        bufs: &mut [IoSliceMut<'_>],
2584    ) -> Poll<Result<usize>> {
2585        self.project().inner.poll_read_vectored(cx, bufs)
2586    }
2587}
2588
2589pin_project! {
2590    /// Reader for the [`AsyncReadExt::chain()`] method.
2591    pub struct Chain<R1, R2> {
2592        #[pin]
2593        first: R1,
2594        #[pin]
2595        second: R2,
2596        done_first: bool,
2597    }
2598}
2599
2600impl<R1, R2> Chain<R1, R2> {
2601    /// Gets references to the underlying readers.
2602    ///
2603    /// # Examples
2604    ///
2605    /// ```
2606    /// use futures_lite::io::{AsyncReadExt, Cursor};
2607    ///
2608    /// let r1 = Cursor::new(b"hello");
2609    /// let r2 = Cursor::new(b"world");
2610    ///
2611    /// let reader = r1.chain(r2);
2612    /// let (r1, r2) = reader.get_ref();
2613    /// ```
2614    pub fn get_ref(&self) -> (&R1, &R2) {
2615        (&self.first, &self.second)
2616    }
2617
2618    /// Gets mutable references to the underlying readers.
2619    ///
2620    /// # Examples
2621    ///
2622    /// ```
2623    /// use futures_lite::io::{AsyncReadExt, Cursor};
2624    ///
2625    /// let r1 = Cursor::new(b"hello");
2626    /// let r2 = Cursor::new(b"world");
2627    ///
2628    /// let mut reader = r1.chain(r2);
2629    /// let (r1, r2) = reader.get_mut();
2630    /// ```
2631    pub fn get_mut(&mut self) -> (&mut R1, &mut R2) {
2632        (&mut self.first, &mut self.second)
2633    }
2634
2635    /// Unwraps the adapter, returning the underlying readers.
2636    ///
2637    /// # Examples
2638    ///
2639    /// ```
2640    /// use futures_lite::io::{AsyncReadExt, Cursor};
2641    ///
2642    /// let r1 = Cursor::new(b"hello");
2643    /// let r2 = Cursor::new(b"world");
2644    ///
2645    /// let reader = r1.chain(r2);
2646    /// let (r1, r2) = reader.into_inner();
2647    /// ```
2648    pub fn into_inner(self) -> (R1, R2) {
2649        (self.first, self.second)
2650    }
2651}
2652
2653impl<R1: fmt::Debug, R2: fmt::Debug> fmt::Debug for Chain<R1, R2> {
2654    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2655        f.debug_struct("Chain")
2656            .field("r1", &self.first)
2657            .field("r2", &self.second)
2658            .finish()
2659    }
2660}
2661
2662impl<R1: AsyncRead, R2: AsyncRead> AsyncRead for Chain<R1, R2> {
2663    fn poll_read(
2664        self: Pin<&mut Self>,
2665        cx: &mut Context<'_>,
2666        buf: &mut [u8],
2667    ) -> Poll<Result<usize>> {
2668        let this = self.project();
2669        if !*this.done_first {
2670            match ready!(this.first.poll_read(cx, buf)) {
2671                Ok(0) if !buf.is_empty() => *this.done_first = true,
2672                Ok(n) => return Poll::Ready(Ok(n)),
2673                Err(err) => return Poll::Ready(Err(err)),
2674            }
2675        }
2676
2677        this.second.poll_read(cx, buf)
2678    }
2679
2680    fn poll_read_vectored(
2681        self: Pin<&mut Self>,
2682        cx: &mut Context<'_>,
2683        bufs: &mut [IoSliceMut<'_>],
2684    ) -> Poll<Result<usize>> {
2685        let this = self.project();
2686        if !*this.done_first {
2687            match ready!(this.first.poll_read_vectored(cx, bufs)) {
2688                Ok(0) if !bufs.is_empty() => *this.done_first = true,
2689                Ok(n) => return Poll::Ready(Ok(n)),
2690                Err(err) => return Poll::Ready(Err(err)),
2691            }
2692        }
2693
2694        this.second.poll_read_vectored(cx, bufs)
2695    }
2696}
2697
2698impl<R1: AsyncBufRead, R2: AsyncBufRead> AsyncBufRead for Chain<R1, R2> {
2699    fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<&[u8]>> {
2700        let this = self.project();
2701        if !*this.done_first {
2702            match ready!(this.first.poll_fill_buf(cx)) {
2703                Ok([]) => *this.done_first = true,
2704                Ok(buf) => return Poll::Ready(Ok(buf)),
2705                Err(err) => return Poll::Ready(Err(err)),
2706            }
2707        }
2708
2709        this.second.poll_fill_buf(cx)
2710    }
2711
2712    fn consume(self: Pin<&mut Self>, amt: usize) {
2713        let this = self.project();
2714        if !*this.done_first {
2715            this.first.consume(amt)
2716        } else {
2717            this.second.consume(amt)
2718        }
2719    }
2720}
2721
2722/// Extension trait for [`AsyncSeek`].
2723pub trait AsyncSeekExt: AsyncSeek {
2724    /// Seeks to a new position in a byte stream.
2725    ///
2726    /// Returns the new position in the byte stream.
2727    ///
2728    /// A seek beyond the end of stream is allowed, but behavior is defined by the implementation.
2729    ///
2730    /// # Examples
2731    ///
2732    /// ```
2733    /// use futures_lite::io::{AsyncSeekExt, Cursor, SeekFrom};
2734    ///
2735    /// # spin_on::spin_on(async {
2736    /// let mut cursor = Cursor::new("hello");
2737    ///
2738    /// // Move the cursor to the end.
2739    /// cursor.seek(SeekFrom::End(0)).await?;
2740    ///
2741    /// // Check the current position.
2742    /// assert_eq!(cursor.seek(SeekFrom::Current(0)).await?, 5);
2743    /// # std::io::Result::Ok(()) });
2744    /// ```
2745    fn seek(&mut self, pos: SeekFrom) -> SeekFuture<'_, Self>
2746    where
2747        Self: Unpin,
2748    {
2749        SeekFuture { seeker: self, pos }
2750    }
2751}
2752
2753impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
2754
2755/// Future for the [`AsyncSeekExt::seek()`] method.
2756#[derive(Debug)]
2757#[must_use = "futures do nothing unless you `.await` or poll them"]
2758pub struct SeekFuture<'a, S: Unpin + ?Sized> {
2759    seeker: &'a mut S,
2760    pos: SeekFrom,
2761}
2762
2763impl<S: Unpin + ?Sized> Unpin for SeekFuture<'_, S> {}
2764
2765impl<S: AsyncSeek + Unpin + ?Sized> Future for SeekFuture<'_, S> {
2766    type Output = Result<u64>;
2767
2768    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2769        let pos = self.pos;
2770        Pin::new(&mut *self.seeker).poll_seek(cx, pos)
2771    }
2772}
2773
2774/// Extension trait for [`AsyncWrite`].
2775pub trait AsyncWriteExt: AsyncWrite {
2776    /// Writes some bytes into the byte stream.
2777    ///
2778    /// Returns the number of bytes written from the start of the buffer.
2779    ///
2780    /// If the return value is `Ok(n)` then it must be guaranteed that
2781    /// `0 <= n <= buf.len()`. A return value of `0` typically means that the underlying
2782    /// object is no longer able to accept bytes and will likely not be able to in the
2783    /// future as well, or that the provided buffer is empty.
2784    ///
2785    /// # Examples
2786    ///
2787    /// ```
2788    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2789    ///
2790    /// # spin_on::spin_on(async {
2791    /// let mut output = Vec::new();
2792    /// let mut writer = BufWriter::new(&mut output);
2793    ///
2794    /// let n = writer.write(b"hello").await?;
2795    /// # std::io::Result::Ok(()) });
2796    /// ```
2797    fn write<'a>(&'a mut self, buf: &'a [u8]) -> WriteFuture<'a, Self>
2798    where
2799        Self: Unpin,
2800    {
2801        WriteFuture { writer: self, buf }
2802    }
2803
2804    /// Like [`write()`][`AsyncWriteExt::write()`], except that it writes a slice of buffers.
2805    ///
2806    /// Data is copied from each buffer in order, with the final buffer possibly being only
2807    /// partially consumed. This method must behave same as a call to
2808    /// [`write()`][`AsyncWriteExt::write()`] with the buffers concatenated would.
2809    fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectoredFuture<'a, Self>
2810    where
2811        Self: Unpin,
2812    {
2813        WriteVectoredFuture { writer: self, bufs }
2814    }
2815
2816    /// Writes an entire buffer into the byte stream.
2817    ///
2818    /// This method will keep calling [`write()`][`AsyncWriteExt::write()`] until there is no more
2819    /// data to be written or an error occurs. It will not return before the entire buffer is
2820    /// successfully written or an error occurs.
2821    ///
2822    /// # Examples
2823    ///
2824    /// ```
2825    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2826    ///
2827    /// # spin_on::spin_on(async {
2828    /// let mut output = Vec::new();
2829    /// let mut writer = BufWriter::new(&mut output);
2830    ///
2831    /// let n = writer.write_all(b"hello").await?;
2832    /// # std::io::Result::Ok(()) });
2833    /// ```
2834    fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAllFuture<'a, Self>
2835    where
2836        Self: Unpin,
2837    {
2838        WriteAllFuture { writer: self, buf }
2839    }
2840
2841    /// Flushes the stream to ensure that all buffered contents reach their destination.
2842    ///
2843    /// # Examples
2844    ///
2845    /// ```
2846    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2847    ///
2848    /// # spin_on::spin_on(async {
2849    /// let mut output = Vec::new();
2850    /// let mut writer = BufWriter::new(&mut output);
2851    ///
2852    /// writer.write_all(b"hello").await?;
2853    /// writer.flush().await?;
2854    /// # std::io::Result::Ok(()) });
2855    /// ```
2856    fn flush(&mut self) -> FlushFuture<'_, Self>
2857    where
2858        Self: Unpin,
2859    {
2860        FlushFuture { writer: self }
2861    }
2862
2863    /// Closes the writer.
2864    ///
2865    /// # Examples
2866    ///
2867    /// ```
2868    /// use futures_lite::io::{AsyncWriteExt, BufWriter};
2869    ///
2870    /// # spin_on::spin_on(async {
2871    /// let mut output = Vec::new();
2872    /// let mut writer = BufWriter::new(&mut output);
2873    ///
2874    /// writer.close().await?;
2875    /// # std::io::Result::Ok(()) });
2876    /// ```
2877    fn close(&mut self) -> CloseFuture<'_, Self>
2878    where
2879        Self: Unpin,
2880    {
2881        CloseFuture { writer: self }
2882    }
2883
2884    /// Boxes the writer and changes its type to `dyn AsyncWrite + Send + 'a`.
2885    ///
2886    /// # Examples
2887    ///
2888    /// ```
2889    /// use futures_lite::io::AsyncWriteExt;
2890    ///
2891    /// let writer = Vec::<u8>::new().boxed_writer();
2892    /// ```
2893    #[cfg(feature = "alloc")]
2894    fn boxed_writer<'a>(self) -> Pin<Box<dyn AsyncWrite + Send + 'a>>
2895    where
2896        Self: Sized + Send + 'a,
2897    {
2898        Box::pin(self)
2899    }
2900}
2901
2902impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
2903
2904/// Future for the [`AsyncWriteExt::write()`] method.
2905#[derive(Debug)]
2906#[must_use = "futures do nothing unless you `.await` or poll them"]
2907pub struct WriteFuture<'a, W: Unpin + ?Sized> {
2908    writer: &'a mut W,
2909    buf: &'a [u8],
2910}
2911
2912impl<W: Unpin + ?Sized> Unpin for WriteFuture<'_, W> {}
2913
2914impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteFuture<'_, W> {
2915    type Output = Result<usize>;
2916
2917    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2918        let buf = self.buf;
2919        Pin::new(&mut *self.writer).poll_write(cx, buf)
2920    }
2921}
2922
2923/// Future for the [`AsyncWriteExt::write_vectored()`] method.
2924#[derive(Debug)]
2925#[must_use = "futures do nothing unless you `.await` or poll them"]
2926pub struct WriteVectoredFuture<'a, W: Unpin + ?Sized> {
2927    writer: &'a mut W,
2928    bufs: &'a [IoSlice<'a>],
2929}
2930
2931impl<W: Unpin + ?Sized> Unpin for WriteVectoredFuture<'_, W> {}
2932
2933impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteVectoredFuture<'_, W> {
2934    type Output = Result<usize>;
2935
2936    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2937        let bufs = self.bufs;
2938        Pin::new(&mut *self.writer).poll_write_vectored(cx, bufs)
2939    }
2940}
2941
2942/// Future for the [`AsyncWriteExt::write_all()`] method.
2943#[derive(Debug)]
2944#[must_use = "futures do nothing unless you `.await` or poll them"]
2945pub struct WriteAllFuture<'a, W: Unpin + ?Sized> {
2946    writer: &'a mut W,
2947    buf: &'a [u8],
2948}
2949
2950impl<W: Unpin + ?Sized> Unpin for WriteAllFuture<'_, W> {}
2951
2952impl<W: AsyncWrite + Unpin + ?Sized> Future for WriteAllFuture<'_, W> {
2953    type Output = Result<()>;
2954
2955    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2956        let Self { writer, buf } = &mut *self;
2957
2958        while !buf.is_empty() {
2959            let n = ready!(Pin::new(&mut **writer).poll_write(cx, buf))?;
2960            let (_, rest) = mem::take(buf).split_at(n);
2961            *buf = rest;
2962
2963            if n == 0 {
2964                return Poll::Ready(Err(ErrorKind::WriteZero.into()));
2965            }
2966        }
2967
2968        Poll::Ready(Ok(()))
2969    }
2970}
2971
2972/// Future for the [`AsyncWriteExt::flush()`] method.
2973#[derive(Debug)]
2974#[must_use = "futures do nothing unless you `.await` or poll them"]
2975pub struct FlushFuture<'a, W: Unpin + ?Sized> {
2976    writer: &'a mut W,
2977}
2978
2979impl<W: Unpin + ?Sized> Unpin for FlushFuture<'_, W> {}
2980
2981impl<W: AsyncWrite + Unpin + ?Sized> Future for FlushFuture<'_, W> {
2982    type Output = Result<()>;
2983
2984    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2985        Pin::new(&mut *self.writer).poll_flush(cx)
2986    }
2987}
2988
2989/// Future for the [`AsyncWriteExt::close()`] method.
2990#[derive(Debug)]
2991#[must_use = "futures do nothing unless you `.await` or poll them"]
2992pub struct CloseFuture<'a, W: Unpin + ?Sized> {
2993    writer: &'a mut W,
2994}
2995
2996impl<W: Unpin + ?Sized> Unpin for CloseFuture<'_, W> {}
2997
2998impl<W: AsyncWrite + Unpin + ?Sized> Future for CloseFuture<'_, W> {
2999    type Output = Result<()>;
3000
3001    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3002        Pin::new(&mut *self.writer).poll_close(cx)
3003    }
3004}
3005
3006/// Type alias for `Pin<Box<dyn AsyncRead + Send + 'static>>`.
3007///
3008/// # Examples
3009///
3010/// ```
3011/// use futures_lite::io::AsyncReadExt;
3012///
3013/// let reader = [1, 2, 3].boxed_reader();
3014/// ```
3015#[cfg(feature = "alloc")]
3016pub type BoxedReader = Pin<Box<dyn AsyncRead + Send + 'static>>;
3017
3018/// Type alias for `Pin<Box<dyn AsyncWrite + Send + 'static>>`.
3019///
3020/// # Examples
3021///
3022/// ```
3023/// use futures_lite::io::AsyncWriteExt;
3024///
3025/// let writer = Vec::<u8>::new().boxed_writer();
3026/// ```
3027#[cfg(feature = "alloc")]
3028pub type BoxedWriter = Pin<Box<dyn AsyncWrite + Send + 'static>>;
3029
3030/// Splits a stream into [`AsyncRead`] and [`AsyncWrite`] halves.
3031///
3032/// # Examples
3033///
3034/// ```
3035/// use futures_lite::io::{self, Cursor};
3036///
3037/// # spin_on::spin_on(async {
3038/// let stream = Cursor::new(vec![]);
3039/// let (mut reader, mut writer) = io::split(stream);
3040/// # std::io::Result::Ok(()) });
3041/// ```
3042pub fn split<T>(stream: T) -> (ReadHalf<T>, WriteHalf<T>)
3043where
3044    T: AsyncRead + AsyncWrite + Unpin,
3045{
3046    let inner = Arc::new(Mutex::new(stream));
3047    (ReadHalf(inner.clone()), WriteHalf(inner))
3048}
3049
3050/// The read half returned by [`split()`].
3051#[derive(Debug)]
3052pub struct ReadHalf<T>(Arc<Mutex<T>>);
3053
3054/// The write half returned by [`split()`].
3055#[derive(Debug)]
3056pub struct WriteHalf<T>(Arc<Mutex<T>>);
3057
3058impl<T: AsyncRead + Unpin> AsyncRead for ReadHalf<T> {
3059    fn poll_read(
3060        self: Pin<&mut Self>,
3061        cx: &mut Context<'_>,
3062        buf: &mut [u8],
3063    ) -> Poll<Result<usize>> {
3064        let mut inner = self.0.lock().unwrap();
3065        Pin::new(&mut *inner).poll_read(cx, buf)
3066    }
3067
3068    fn poll_read_vectored(
3069        self: Pin<&mut Self>,
3070        cx: &mut Context<'_>,
3071        bufs: &mut [IoSliceMut<'_>],
3072    ) -> Poll<Result<usize>> {
3073        let mut inner = self.0.lock().unwrap();
3074        Pin::new(&mut *inner).poll_read_vectored(cx, bufs)
3075    }
3076}
3077
3078impl<T: AsyncWrite + Unpin> AsyncWrite for WriteHalf<T> {
3079    fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> {
3080        let mut inner = self.0.lock().unwrap();
3081        Pin::new(&mut *inner).poll_write(cx, buf)
3082    }
3083
3084    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
3085        let mut inner = self.0.lock().unwrap();
3086        Pin::new(&mut *inner).poll_flush(cx)
3087    }
3088
3089    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
3090        let mut inner = self.0.lock().unwrap();
3091        Pin::new(&mut *inner).poll_close(cx)
3092    }
3093}
3094
3095#[cfg(feature = "memchr")]
3096use memchr::memchr;
3097
3098/// Unoptimized memchr fallback.
3099#[cfg(not(feature = "memchr"))]
3100fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
3101    haystack.iter().position(|&b| b == needle)
3102}