futures_lite/
stream.rs

1//! Combinators for the [`Stream`] trait.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::stream::{self, StreamExt};
7//!
8//! # spin_on::spin_on(async {
9//! let mut s = stream::iter(vec![1, 2, 3]);
10//!
11//! assert_eq!(s.next().await, Some(1));
12//! assert_eq!(s.next().await, Some(2));
13//! assert_eq!(s.next().await, Some(3));
14//! assert_eq!(s.next().await, None);
15//! # });
16//! ```
17
18#[doc(no_inline)]
19pub use futures_core::stream::Stream;
20
21#[cfg(feature = "alloc")]
22use alloc::boxed::Box;
23
24use core::fmt;
25use core::future::Future;
26use core::marker::PhantomData;
27use core::mem;
28use core::pin::Pin;
29use core::task::{Context, Poll};
30
31#[cfg(feature = "race")]
32use fastrand::Rng;
33
34use pin_project_lite::pin_project;
35
36use crate::ready;
37
38/// Converts a stream into a blocking iterator.
39///
40/// # Examples
41///
42/// ```
43/// use futures_lite::{pin, stream};
44///
45/// let stream = stream::once(7);
46/// pin!(stream);
47///
48/// let mut iter = stream::block_on(stream);
49/// assert_eq!(iter.next(), Some(7));
50/// assert_eq!(iter.next(), None);
51/// ```
52#[cfg(feature = "std")]
53pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
54    BlockOn(stream)
55}
56
57/// Iterator for the [`block_on()`] function.
58#[derive(Debug)]
59pub struct BlockOn<S>(S);
60
61#[cfg(feature = "std")]
62impl<S: Stream + Unpin> Iterator for BlockOn<S> {
63    type Item = S::Item;
64
65    fn next(&mut self) -> Option<Self::Item> {
66        crate::future::block_on(self.0.next())
67    }
68
69    fn size_hint(&self) -> (usize, Option<usize>) {
70        self.0.size_hint()
71    }
72
73    fn count(self) -> usize {
74        crate::future::block_on(self.0.count())
75    }
76
77    fn last(self) -> Option<Self::Item> {
78        crate::future::block_on(self.0.last())
79    }
80
81    fn nth(&mut self, n: usize) -> Option<Self::Item> {
82        crate::future::block_on(self.0.nth(n))
83    }
84
85    fn fold<B, F>(self, init: B, f: F) -> B
86    where
87        F: FnMut(B, Self::Item) -> B,
88    {
89        crate::future::block_on(self.0.fold(init, f))
90    }
91
92    fn for_each<F>(self, f: F) -> F::Output
93    where
94        F: FnMut(Self::Item),
95    {
96        crate::future::block_on(self.0.for_each(f))
97    }
98
99    fn all<F>(&mut self, f: F) -> bool
100    where
101        F: FnMut(Self::Item) -> bool,
102    {
103        crate::future::block_on(self.0.all(f))
104    }
105
106    fn any<F>(&mut self, f: F) -> bool
107    where
108        F: FnMut(Self::Item) -> bool,
109    {
110        crate::future::block_on(self.0.any(f))
111    }
112
113    fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
114    where
115        P: FnMut(&Self::Item) -> bool,
116    {
117        crate::future::block_on(self.0.find(predicate))
118    }
119
120    fn find_map<B, F>(&mut self, f: F) -> Option<B>
121    where
122        F: FnMut(Self::Item) -> Option<B>,
123    {
124        crate::future::block_on(self.0.find_map(f))
125    }
126
127    fn position<P>(&mut self, predicate: P) -> Option<usize>
128    where
129        P: FnMut(Self::Item) -> bool,
130    {
131        crate::future::block_on(self.0.position(predicate))
132    }
133}
134
135/// Creates an empty stream.
136///
137/// # Examples
138///
139/// ```
140/// use futures_lite::stream::{self, StreamExt};
141///
142/// # spin_on::spin_on(async {
143/// let mut s = stream::empty::<i32>();
144/// assert_eq!(s.next().await, None);
145/// # })
146/// ```
147pub fn empty<T>() -> Empty<T> {
148    Empty {
149        _marker: PhantomData,
150    }
151}
152
153/// Stream for the [`empty()`] function.
154#[derive(Clone, Debug)]
155#[must_use = "streams do nothing unless polled"]
156pub struct Empty<T> {
157    _marker: PhantomData<T>,
158}
159
160impl<T> Unpin for Empty<T> {}
161
162impl<T> Stream for Empty<T> {
163    type Item = T;
164
165    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
166        Poll::Ready(None)
167    }
168
169    fn size_hint(&self) -> (usize, Option<usize>) {
170        (0, Some(0))
171    }
172}
173
174/// Creates a stream from an iterator.
175///
176/// # Examples
177///
178/// ```
179/// use futures_lite::stream::{self, StreamExt};
180///
181/// # spin_on::spin_on(async {
182/// let mut s = stream::iter(vec![1, 2]);
183///
184/// assert_eq!(s.next().await, Some(1));
185/// assert_eq!(s.next().await, Some(2));
186/// assert_eq!(s.next().await, None);
187/// # })
188/// ```
189pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
190    Iter {
191        iter: iter.into_iter(),
192    }
193}
194
195/// Stream for the [`iter()`] function.
196#[derive(Clone, Debug)]
197#[must_use = "streams do nothing unless polled"]
198pub struct Iter<I> {
199    iter: I,
200}
201
202impl<I> Unpin for Iter<I> {}
203
204impl<I: Iterator> Stream for Iter<I> {
205    type Item = I::Item;
206
207    fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
208        Poll::Ready(self.iter.next())
209    }
210
211    fn size_hint(&self) -> (usize, Option<usize>) {
212        self.iter.size_hint()
213    }
214}
215
216/// Creates a stream that yields a single item.
217///
218/// # Examples
219///
220/// ```
221/// use futures_lite::stream::{self, StreamExt};
222///
223/// # spin_on::spin_on(async {
224/// let mut s = stream::once(7);
225///
226/// assert_eq!(s.next().await, Some(7));
227/// assert_eq!(s.next().await, None);
228/// # })
229/// ```
230pub fn once<T>(t: T) -> Once<T> {
231    Once { value: Some(t) }
232}
233
234pin_project! {
235    /// Stream for the [`once()`] function.
236    #[derive(Clone, Debug)]
237    #[must_use = "streams do nothing unless polled"]
238    pub struct Once<T> {
239        value: Option<T>,
240    }
241}
242
243impl<T> Stream for Once<T> {
244    type Item = T;
245
246    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
247        Poll::Ready(self.project().value.take())
248    }
249
250    fn size_hint(&self) -> (usize, Option<usize>) {
251        if self.value.is_some() {
252            (1, Some(1))
253        } else {
254            (0, Some(0))
255        }
256    }
257}
258
259/// Creates a stream that is always pending.
260///
261/// # Examples
262///
263/// ```no_run
264/// use futures_lite::stream::{self, StreamExt};
265///
266/// # spin_on::spin_on(async {
267/// let mut s = stream::pending::<i32>();
268/// s.next().await;
269/// unreachable!();
270/// # })
271/// ```
272pub fn pending<T>() -> Pending<T> {
273    Pending {
274        _marker: PhantomData,
275    }
276}
277
278/// Stream for the [`pending()`] function.
279#[derive(Clone, Debug)]
280#[must_use = "streams do nothing unless polled"]
281pub struct Pending<T> {
282    _marker: PhantomData<T>,
283}
284
285impl<T> Unpin for Pending<T> {}
286
287impl<T> Stream for Pending<T> {
288    type Item = T;
289
290    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
291        Poll::Pending
292    }
293
294    fn size_hint(&self) -> (usize, Option<usize>) {
295        (0, Some(0))
296    }
297}
298
299/// Creates a stream from a function returning [`Poll`].
300///
301/// # Examples
302///
303/// ```
304/// use futures_lite::stream::{self, StreamExt};
305/// use std::task::{Context, Poll};
306///
307/// # spin_on::spin_on(async {
308/// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> {
309///     Poll::Ready(Some(7))
310/// }
311///
312/// assert_eq!(stream::poll_fn(f).next().await, Some(7));
313/// # })
314/// ```
315pub fn poll_fn<T, F>(f: F) -> PollFn<F>
316where
317    F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
318{
319    PollFn { f }
320}
321
322/// Stream for the [`poll_fn()`] function.
323#[derive(Clone)]
324#[must_use = "streams do nothing unless polled"]
325pub struct PollFn<F> {
326    f: F,
327}
328
329impl<F> Unpin for PollFn<F> {}
330
331impl<F> fmt::Debug for PollFn<F> {
332    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333        f.debug_struct("PollFn").finish()
334    }
335}
336
337impl<T, F> Stream for PollFn<F>
338where
339    F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
340{
341    type Item = T;
342
343    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
344        (&mut self.f)(cx)
345    }
346}
347
348/// Creates an infinite stream that yields the same item repeatedly.
349///
350/// # Examples
351///
352/// ```
353/// use futures_lite::stream::{self, StreamExt};
354///
355/// # spin_on::spin_on(async {
356/// let mut s = stream::repeat(7);
357///
358/// assert_eq!(s.next().await, Some(7));
359/// assert_eq!(s.next().await, Some(7));
360/// # })
361/// ```
362pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
363    Repeat { item }
364}
365
366/// Stream for the [`repeat()`] function.
367#[derive(Clone, Debug)]
368#[must_use = "streams do nothing unless polled"]
369pub struct Repeat<T> {
370    item: T,
371}
372
373impl<T> Unpin for Repeat<T> {}
374
375impl<T: Clone> Stream for Repeat<T> {
376    type Item = T;
377
378    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
379        Poll::Ready(Some(self.item.clone()))
380    }
381
382    fn size_hint(&self) -> (usize, Option<usize>) {
383        (usize::MAX, None)
384    }
385}
386
387/// Creates an infinite stream from a closure that generates items.
388///
389/// # Examples
390///
391/// ```
392/// use futures_lite::stream::{self, StreamExt};
393///
394/// # spin_on::spin_on(async {
395/// let mut s = stream::repeat_with(|| 7);
396///
397/// assert_eq!(s.next().await, Some(7));
398/// assert_eq!(s.next().await, Some(7));
399/// # })
400/// ```
401pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
402where
403    F: FnMut() -> T,
404{
405    RepeatWith { f: repeater }
406}
407
408/// Stream for the [`repeat_with()`] function.
409#[derive(Clone, Debug)]
410#[must_use = "streams do nothing unless polled"]
411pub struct RepeatWith<F> {
412    f: F,
413}
414
415impl<F> Unpin for RepeatWith<F> {}
416
417impl<T, F> Stream for RepeatWith<F>
418where
419    F: FnMut() -> T,
420{
421    type Item = T;
422
423    fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
424        let item = (&mut self.f)();
425        Poll::Ready(Some(item))
426    }
427
428    fn size_hint(&self) -> (usize, Option<usize>) {
429        (usize::MAX, None)
430    }
431}
432
433/// Creates a stream from a seed value and an async closure operating on it.
434///
435/// # Examples
436///
437/// ```
438/// use futures_lite::stream::{self, StreamExt};
439///
440/// # spin_on::spin_on(async {
441/// let s = stream::unfold(0, |mut n| async move {
442///     if n < 2 {
443///         let m = n + 1;
444///         Some((n, m))
445///     } else {
446///         None
447///     }
448/// });
449///
450/// let v: Vec<i32> = s.collect().await;
451/// assert_eq!(v, [0, 1]);
452/// # })
453/// ```
454pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
455where
456    F: FnMut(T) -> Fut,
457    Fut: Future<Output = Option<(Item, T)>>,
458{
459    Unfold {
460        f,
461        state: Some(seed),
462        fut: None,
463    }
464}
465
466pin_project! {
467    /// Stream for the [`unfold()`] function.
468    #[derive(Clone)]
469    #[must_use = "streams do nothing unless polled"]
470    pub struct Unfold<T, F, Fut> {
471        f: F,
472        state: Option<T>,
473        #[pin]
474        fut: Option<Fut>,
475    }
476}
477
478impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
479where
480    T: fmt::Debug,
481    Fut: fmt::Debug,
482{
483    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
484        f.debug_struct("Unfold")
485            .field("state", &self.state)
486            .field("fut", &self.fut)
487            .finish()
488    }
489}
490
491impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
492where
493    F: FnMut(T) -> Fut,
494    Fut: Future<Output = Option<(Item, T)>>,
495{
496    type Item = Item;
497
498    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
499        let mut this = self.project();
500
501        if let Some(state) = this.state.take() {
502            this.fut.set(Some((this.f)(state)));
503        }
504
505        let step = ready!(this
506            .fut
507            .as_mut()
508            .as_pin_mut()
509            .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
510            .poll(cx));
511        this.fut.set(None);
512
513        if let Some((item, next_state)) = step {
514            *this.state = Some(next_state);
515            Poll::Ready(Some(item))
516        } else {
517            Poll::Ready(None)
518        }
519    }
520}
521
522/// Creates a stream from a seed value and a fallible async closure operating on it.
523///
524/// # Examples
525///
526/// ```
527/// use futures_lite::stream::{self, StreamExt};
528///
529/// # spin_on::spin_on(async {
530/// let s = stream::try_unfold(0, |mut n| async move {
531///     if n < 2 {
532///         let m = n + 1;
533///         Ok(Some((n, m)))
534///     } else {
535///         std::io::Result::Ok(None)
536///     }
537/// });
538///
539/// let v: Vec<i32> = s.try_collect().await?;
540/// assert_eq!(v, [0, 1]);
541/// # std::io::Result::Ok(()) });
542/// ```
543pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
544where
545    F: FnMut(T) -> Fut,
546    Fut: Future<Output = Result<Option<(Item, T)>, E>>,
547{
548    TryUnfold {
549        f,
550        state: Some(init),
551        fut: None,
552    }
553}
554
555pin_project! {
556    /// Stream for the [`try_unfold()`] function.
557    #[derive(Clone)]
558    #[must_use = "streams do nothing unless polled"]
559    pub struct TryUnfold<T, F, Fut> {
560        f: F,
561        state: Option<T>,
562        #[pin]
563        fut: Option<Fut>,
564    }
565}
566
567impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
568where
569    T: fmt::Debug,
570    Fut: fmt::Debug,
571{
572    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
573        f.debug_struct("TryUnfold")
574            .field("state", &self.state)
575            .field("fut", &self.fut)
576            .finish()
577    }
578}
579
580impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
581where
582    F: FnMut(T) -> Fut,
583    Fut: Future<Output = Result<Option<(Item, T)>, E>>,
584{
585    type Item = Result<Item, E>;
586
587    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
588        let mut this = self.project();
589
590        if let Some(state) = this.state.take() {
591            this.fut.set(Some((this.f)(state)));
592        }
593
594        match this.fut.as_mut().as_pin_mut() {
595            None => {
596                // The future previously errored
597                Poll::Ready(None)
598            }
599            Some(future) => {
600                let step = ready!(future.poll(cx));
601                this.fut.set(None);
602
603                match step {
604                    Ok(Some((item, next_state))) => {
605                        *this.state = Some(next_state);
606                        Poll::Ready(Some(Ok(item)))
607                    }
608                    Ok(None) => Poll::Ready(None),
609                    Err(e) => Poll::Ready(Some(Err(e))),
610                }
611            }
612        }
613    }
614}
615
616/// Creates a stream that invokes the given future as its first item, and then
617/// produces no more items.
618///
619/// # Example
620///
621/// ```
622/// use futures_lite::{stream, prelude::*};
623///
624/// # spin_on::spin_on(async {
625/// let mut stream = Box::pin(stream::once_future(async { 1 }));
626/// assert_eq!(stream.next().await, Some(1));
627/// assert_eq!(stream.next().await, None);
628/// # });
629/// ```
630pub fn once_future<F: Future>(future: F) -> OnceFuture<F> {
631    OnceFuture {
632        future: Some(future),
633    }
634}
635
636pin_project! {
637    /// Stream for the [`once_future()`] function.
638    #[derive(Debug)]
639    #[must_use = "futures do nothing unless you `.await` or poll them"]
640    pub struct OnceFuture<F> {
641        #[pin]
642        future: Option<F>,
643    }
644}
645
646impl<F: Future> Stream for OnceFuture<F> {
647    type Item = F::Output;
648
649    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
650        let mut this = self.project();
651
652        match this.future.as_mut().as_pin_mut().map(|f| f.poll(cx)) {
653            Some(Poll::Ready(t)) => {
654                this.future.set(None);
655                Poll::Ready(Some(t))
656            }
657            Some(Poll::Pending) => Poll::Pending,
658            None => Poll::Ready(None),
659        }
660    }
661}
662
663/// Take elements from this stream until the provided future resolves.
664///
665/// This function will take elements from the stream until the provided
666/// stopping future `fut` resolves. Once the `fut` future becomes ready,
667/// this stream combinator will always return that the stream is done.
668///
669/// The stopping future may return any type. Once the stream is stopped
670/// the result of the stopping future may be accessed with `StopAfterFuture::take_result()`.
671/// The stream may also be resumed with `StopAfterFuture::take_future()`.
672/// See the documentation of [`StopAfterFuture`] for more information.
673///
674/// ```
675/// use futures_lite::stream::{self, StreamExt, stop_after_future};
676/// use futures_lite::future;
677/// use std::task::Poll;
678///
679/// let stream = stream::iter(1..=10);
680///
681/// # spin_on::spin_on(async {
682/// let mut i = 0;
683/// let stop_fut = future::poll_fn(|_cx| {
684///     i += 1;
685///     if i <= 5 {
686///         Poll::Pending
687///     } else {
688///         Poll::Ready(())
689///     }
690/// });
691///
692/// let stream = stop_after_future(stream, stop_fut);
693///
694/// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
695/// # });
696pub fn stop_after_future<S, F>(stream: S, future: F) -> StopAfterFuture<S, F>
697where
698    S: Sized + Stream,
699    F: Future,
700{
701    StopAfterFuture {
702        stream,
703        fut: Some(future),
704        fut_result: None,
705        free: false,
706    }
707}
708
709pin_project! {
710    /// Stream for the [`stop_after_future()`] function.
711    #[derive(Clone, Debug)]
712    #[must_use = "streams do nothing unless polled"]
713    pub struct StopAfterFuture<S: Stream, Fut: Future> {
714        #[pin]
715        stream: S,
716        // Contains the inner Future on start and None once the inner Future is resolved
717        // or taken out by the user.
718        #[pin]
719        fut: Option<Fut>,
720        // Contains fut's return value once fut is resolved
721        fut_result: Option<Fut::Output>,
722        // Whether the future was taken out by the user.
723        free: bool,
724    }
725}
726
727impl<St, Fut> StopAfterFuture<St, Fut>
728where
729    St: Stream,
730    Fut: Future,
731{
732    /// Extract the stopping future out of the combinator.
733    ///
734    /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet.
735    /// Taking out the future means the combinator will be yielding
736    /// elements from the wrapped stream without ever stopping it.
737    pub fn take_future(&mut self) -> Option<Fut> {
738        if self.fut.is_some() {
739            self.free = true;
740        }
741
742        self.fut.take()
743    }
744
745    /// Once the stopping future is resolved, this method can be used
746    /// to extract the value returned by the stopping future.
747    ///
748    /// This may be used to retrieve arbitrary data from the stopping
749    /// future, for example a reason why the stream was stopped.
750    ///
751    /// This method will return `None` if the future isn't resolved yet,
752    /// or if the result was already taken out.
753    ///
754    /// # Examples
755    ///
756    /// ```
757    /// # spin_on::spin_on(async {
758    /// use futures_lite::stream::{self, StreamExt, stop_after_future};
759    /// use futures_lite::future;
760    /// use std::task::Poll;
761    ///
762    /// let stream = stream::iter(1..=10);
763    ///
764    /// let mut i = 0;
765    /// let stop_fut = future::poll_fn(|_cx| {
766    ///     i += 1;
767    ///     if i <= 5 {
768    ///         Poll::Pending
769    ///     } else {
770    ///         Poll::Ready("reason")
771    ///     }
772    /// });
773    ///
774    /// let mut stream = stop_after_future(stream, stop_fut);
775    /// let _ = (&mut stream).collect::<Vec<_>>().await;
776    ///
777    /// let result = stream.take_result().unwrap();
778    /// assert_eq!(result, "reason");
779    /// # });
780    /// ```
781    pub fn take_result(&mut self) -> Option<Fut::Output> {
782        self.fut_result.take()
783    }
784
785    /// Whether the stream was stopped yet by the stopping future
786    /// being resolved.
787    pub fn is_stopped(&self) -> bool {
788        !self.free && self.fut.is_none()
789    }
790}
791
792impl<St, Fut> Stream for StopAfterFuture<St, Fut>
793where
794    St: Stream,
795    Fut: Future,
796{
797    type Item = St::Item;
798
799    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
800        let mut this = self.project();
801
802        if let Some(f) = this.fut.as_mut().as_pin_mut() {
803            if let Poll::Ready(result) = f.poll(cx) {
804                this.fut.set(None);
805                *this.fut_result = Some(result);
806            }
807        }
808
809        if !*this.free && this.fut.is_none() {
810            // Future resolved, inner stream stopped
811            Poll::Ready(None)
812        } else {
813            // Future either not resolved yet or taken out by the user
814            let item = ready!(this.stream.poll_next(cx));
815            if item.is_none() {
816                this.fut.set(None);
817            }
818            Poll::Ready(item)
819        }
820    }
821
822    fn size_hint(&self) -> (usize, Option<usize>) {
823        if self.is_stopped() {
824            return (0, Some(0));
825        }
826
827        // Original stream can be truncated at any moment, so the lower bound isn't reliable.
828        let (_, upper_bound) = self.stream.size_hint();
829        (0, upper_bound)
830    }
831}
832
833/// Extension trait for [`Stream`].
834pub trait StreamExt: Stream {
835    /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.
836    fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
837    where
838        Self: Unpin,
839    {
840        Stream::poll_next(Pin::new(self), cx)
841    }
842
843    /// Retrieves the next item in the stream.
844    ///
845    /// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to
846    /// resume iteration after that.
847    ///
848    /// # Examples
849    ///
850    /// ```
851    /// use futures_lite::stream::{self, StreamExt};
852    ///
853    /// # spin_on::spin_on(async {
854    /// let mut s = stream::iter(1..=3);
855    ///
856    /// assert_eq!(s.next().await, Some(1));
857    /// assert_eq!(s.next().await, Some(2));
858    /// assert_eq!(s.next().await, Some(3));
859    /// assert_eq!(s.next().await, None);
860    /// # });
861    /// ```
862    fn next(&mut self) -> NextFuture<'_, Self>
863    where
864        Self: Unpin,
865    {
866        NextFuture { stream: self }
867    }
868
869    /// Retrieves the next item in the stream.
870    ///
871    /// This is similar to the [`next()`][`StreamExt::next()`] method, but returns
872    /// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`.
873    ///
874    /// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`.
875    ///
876    /// # Examples
877    ///
878    /// ```
879    /// use futures_lite::stream::{self, StreamExt};
880    ///
881    /// # spin_on::spin_on(async {
882    /// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]);
883    ///
884    /// assert_eq!(s.try_next().await, Ok(Some(1)));
885    /// assert_eq!(s.try_next().await, Ok(Some(2)));
886    /// assert_eq!(s.try_next().await, Err("error"));
887    /// assert_eq!(s.try_next().await, Ok(None));
888    /// # });
889    /// ```
890    fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
891    where
892        Self: Stream<Item = Result<T, E>> + Unpin,
893    {
894        TryNextFuture { stream: self }
895    }
896
897    /// Counts the number of items in the stream.
898    ///
899    /// # Examples
900    ///
901    /// ```
902    /// use futures_lite::stream::{self, StreamExt};
903    ///
904    /// # spin_on::spin_on(async {
905    /// let s1 = stream::iter(vec![0]);
906    /// let s2 = stream::iter(vec![1, 2, 3]);
907    ///
908    /// assert_eq!(s1.count().await, 1);
909    /// assert_eq!(s2.count().await, 3);
910    /// # });
911    /// ```
912    fn count(self) -> CountFuture<Self>
913    where
914        Self: Sized,
915    {
916        CountFuture {
917            stream: self,
918            count: 0,
919        }
920    }
921
922    /// Maps items of the stream to new values using a closure.
923    ///
924    /// # Examples
925    ///
926    /// ```
927    /// use futures_lite::stream::{self, StreamExt};
928    ///
929    /// # spin_on::spin_on(async {
930    /// let s = stream::iter(vec![1, 2, 3]);
931    /// let mut s = s.map(|x| 2 * x);
932    ///
933    /// assert_eq!(s.next().await, Some(2));
934    /// assert_eq!(s.next().await, Some(4));
935    /// assert_eq!(s.next().await, Some(6));
936    /// assert_eq!(s.next().await, None);
937    /// # });
938    /// ```
939    fn map<T, F>(self, f: F) -> Map<Self, F>
940    where
941        Self: Sized,
942        F: FnMut(Self::Item) -> T,
943    {
944        Map { stream: self, f }
945    }
946
947    /// Maps items to streams and then concatenates them.
948    ///
949    /// # Examples
950    ///
951    /// ```
952    /// use futures_lite::stream::{self, StreamExt};
953    ///
954    /// # spin_on::spin_on(async {
955    /// let words = stream::iter(vec!["one", "two"]);
956    ///
957    /// let s: String = words
958    ///     .flat_map(|s| stream::iter(s.chars()))
959    ///     .collect()
960    ///     .await;
961    ///
962    /// assert_eq!(s, "onetwo");
963    /// # });
964    /// ```
965    fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
966    where
967        Self: Sized,
968        U: Stream,
969        F: FnMut(Self::Item) -> U,
970    {
971        FlatMap {
972            stream: self.map(f),
973            inner_stream: None,
974        }
975    }
976
977    /// Concatenates inner streams.
978    ///
979    /// # Examples
980    ///
981    /// ```
982    /// use futures_lite::stream::{self, StreamExt};
983    ///
984    /// # spin_on::spin_on(async {
985    /// let s1 = stream::iter(vec![1, 2, 3]);
986    /// let s2 = stream::iter(vec![4, 5]);
987    ///
988    /// let s = stream::iter(vec![s1, s2]);
989    /// let v: Vec<_> = s.flatten().collect().await;
990    /// assert_eq!(v, [1, 2, 3, 4, 5]);
991    /// # });
992    /// ```
993    fn flatten(self) -> Flatten<Self>
994    where
995        Self: Sized,
996        Self::Item: Stream,
997    {
998        Flatten {
999            stream: self,
1000            inner_stream: None,
1001        }
1002    }
1003
1004    /// Maps items of the stream to new values using an async closure.
1005    ///
1006    /// # Examples
1007    ///
1008    /// ```
1009    /// use futures_lite::pin;
1010    /// use futures_lite::stream::{self, StreamExt};
1011    ///
1012    /// # spin_on::spin_on(async {
1013    /// let s = stream::iter(vec![1, 2, 3]);
1014    /// let mut s = s.then(|x| async move { 2 * x });
1015    ///
1016    /// pin!(s);
1017    /// assert_eq!(s.next().await, Some(2));
1018    /// assert_eq!(s.next().await, Some(4));
1019    /// assert_eq!(s.next().await, Some(6));
1020    /// assert_eq!(s.next().await, None);
1021    /// # });
1022    /// ```
1023    fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
1024    where
1025        Self: Sized,
1026        F: FnMut(Self::Item) -> Fut,
1027        Fut: Future,
1028    {
1029        Then {
1030            stream: self,
1031            future: None,
1032            f,
1033        }
1034    }
1035
1036    /// Keeps items of the stream for which `predicate` returns `true`.
1037    ///
1038    /// # Examples
1039    ///
1040    /// ```
1041    /// use futures_lite::stream::{self, StreamExt};
1042    ///
1043    /// # spin_on::spin_on(async {
1044    /// let s = stream::iter(vec![1, 2, 3, 4]);
1045    /// let mut s = s.filter(|i| i % 2 == 0);
1046    ///
1047    /// assert_eq!(s.next().await, Some(2));
1048    /// assert_eq!(s.next().await, Some(4));
1049    /// assert_eq!(s.next().await, None);
1050    /// # });
1051    /// ```
1052    fn filter<P>(self, predicate: P) -> Filter<Self, P>
1053    where
1054        Self: Sized,
1055        P: FnMut(&Self::Item) -> bool,
1056    {
1057        Filter {
1058            stream: self,
1059            predicate,
1060        }
1061    }
1062
1063    /// Filters and maps items of the stream using a closure.
1064    ///
1065    /// # Examples
1066    ///
1067    /// ```
1068    /// use futures_lite::stream::{self, StreamExt};
1069    ///
1070    /// # spin_on::spin_on(async {
1071    /// let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]);
1072    /// let mut s = s.filter_map(|a| a.parse::<u32>().ok());
1073    ///
1074    /// assert_eq!(s.next().await, Some(1));
1075    /// assert_eq!(s.next().await, Some(3));
1076    /// assert_eq!(s.next().await, Some(5));
1077    /// assert_eq!(s.next().await, None);
1078    /// # });
1079    /// ```
1080    fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
1081    where
1082        Self: Sized,
1083        F: FnMut(Self::Item) -> Option<T>,
1084    {
1085        FilterMap { stream: self, f }
1086    }
1087
1088    /// Takes only the first `n` items of the stream.
1089    ///
1090    /// # Examples
1091    ///
1092    /// ```
1093    /// use futures_lite::stream::{self, StreamExt};
1094    ///
1095    /// # spin_on::spin_on(async {
1096    /// let mut s = stream::repeat(7).take(2);
1097    ///
1098    /// assert_eq!(s.next().await, Some(7));
1099    /// assert_eq!(s.next().await, Some(7));
1100    /// assert_eq!(s.next().await, None);
1101    /// # });
1102    /// ```
1103    fn take(self, n: usize) -> Take<Self>
1104    where
1105        Self: Sized,
1106    {
1107        Take { stream: self, n }
1108    }
1109
1110    /// Takes items while `predicate` returns `true`.
1111    ///
1112    /// # Examples
1113    ///
1114    /// ```
1115    /// use futures_lite::stream::{self, StreamExt};
1116    ///
1117    /// # spin_on::spin_on(async {
1118    /// let s = stream::iter(vec![1, 2, 3, 4]);
1119    /// let mut s = s.take_while(|x| *x < 3);
1120    ///
1121    /// assert_eq!(s.next().await, Some(1));
1122    /// assert_eq!(s.next().await, Some(2));
1123    /// assert_eq!(s.next().await, None);
1124    /// # });
1125    /// ```
1126    fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
1127    where
1128        Self: Sized,
1129        P: FnMut(&Self::Item) -> bool,
1130    {
1131        TakeWhile {
1132            stream: self,
1133            predicate,
1134        }
1135    }
1136
1137    /// Maps items while `predicate` returns [`Some`].
1138    ///
1139    /// This stream is not fused. After the predicate returns [`None`] the stream still
1140    /// contains remaining items that can be obtained by subsequent `next` calls.
1141    /// You can [`fuse`](StreamExt::fuse) the stream if this behavior is undesirable.
1142    ///
1143    /// # Examples
1144    ///
1145    /// ```
1146    /// use futures_lite::stream::{self, StreamExt};
1147    ///
1148    /// # spin_on::spin_on(async {
1149    /// let s = stream::iter(vec![1, 2, 0, 3]);
1150    /// let mut s = s.map_while(|x: u32| x.checked_sub(1));
1151    ///
1152    /// assert_eq!(s.next().await, Some(0));
1153    /// assert_eq!(s.next().await, Some(1));
1154    /// assert_eq!(s.next().await, None);
1155    ///
1156    /// // Continue to iterate the stream.
1157    /// assert_eq!(s.next().await, Some(2));
1158    /// assert_eq!(s.next().await, None);
1159    /// # });
1160    /// ```
1161    fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
1162    where
1163        Self: Sized,
1164        P: FnMut(Self::Item) -> Option<B>,
1165    {
1166        MapWhile {
1167            stream: self,
1168            predicate,
1169        }
1170    }
1171
1172    /// Skips the first `n` items of the stream.
1173    ///
1174    /// # Examples
1175    ///
1176    /// ```
1177    /// use futures_lite::stream::{self, StreamExt};
1178    ///
1179    /// # spin_on::spin_on(async {
1180    /// let s = stream::iter(vec![1, 2, 3]);
1181    /// let mut s = s.skip(2);
1182    ///
1183    /// assert_eq!(s.next().await, Some(3));
1184    /// assert_eq!(s.next().await, None);
1185    /// # });
1186    /// ```
1187    fn skip(self, n: usize) -> Skip<Self>
1188    where
1189        Self: Sized,
1190    {
1191        Skip { stream: self, n }
1192    }
1193
1194    /// Skips items while `predicate` returns `true`.
1195    ///
1196    /// # Examples
1197    ///
1198    /// ```
1199    /// use futures_lite::stream::{self, StreamExt};
1200    ///
1201    /// # spin_on::spin_on(async {
1202    /// let s = stream::iter(vec![-1i32, 0, 1]);
1203    /// let mut s = s.skip_while(|x| x.is_negative());
1204    ///
1205    /// assert_eq!(s.next().await, Some(0));
1206    /// assert_eq!(s.next().await, Some(1));
1207    /// assert_eq!(s.next().await, None);
1208    /// # });
1209    /// ```
1210    fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1211    where
1212        Self: Sized,
1213        P: FnMut(&Self::Item) -> bool,
1214    {
1215        SkipWhile {
1216            stream: self,
1217            predicate: Some(predicate),
1218        }
1219    }
1220
1221    /// Yields every `step`th item.
1222    ///
1223    /// # Panics
1224    ///
1225    /// This method will panic if the `step` is 0.
1226    ///
1227    /// # Examples
1228    ///
1229    /// ```
1230    /// use futures_lite::stream::{self, StreamExt};
1231    ///
1232    /// # spin_on::spin_on(async {
1233    /// let s = stream::iter(vec![0, 1, 2, 3, 4]);
1234    /// let mut s = s.step_by(2);
1235    ///
1236    /// assert_eq!(s.next().await, Some(0));
1237    /// assert_eq!(s.next().await, Some(2));
1238    /// assert_eq!(s.next().await, Some(4));
1239    /// assert_eq!(s.next().await, None);
1240    /// # });
1241    /// ```
1242    fn step_by(self, step: usize) -> StepBy<Self>
1243    where
1244        Self: Sized,
1245    {
1246        assert!(step > 0, "`step` must be greater than zero");
1247        StepBy {
1248            stream: self,
1249            step,
1250            i: 0,
1251        }
1252    }
1253
1254    /// Appends another stream to the end of this one.
1255    ///
1256    /// # Examples
1257    ///
1258    /// ```
1259    /// use futures_lite::stream::{self, StreamExt};
1260    ///
1261    /// # spin_on::spin_on(async {
1262    /// let s1 = stream::iter(vec![1, 2]);
1263    /// let s2 = stream::iter(vec![7, 8]);
1264    /// let mut s = s1.chain(s2);
1265    ///
1266    /// assert_eq!(s.next().await, Some(1));
1267    /// assert_eq!(s.next().await, Some(2));
1268    /// assert_eq!(s.next().await, Some(7));
1269    /// assert_eq!(s.next().await, Some(8));
1270    /// assert_eq!(s.next().await, None);
1271    /// # });
1272    /// ```
1273    fn chain<U>(self, other: U) -> Chain<Self, U>
1274    where
1275        Self: Sized,
1276        U: Stream<Item = Self::Item> + Sized,
1277    {
1278        Chain {
1279            first: self.fuse(),
1280            second: other.fuse(),
1281        }
1282    }
1283
1284    /// Clones all items.
1285    ///
1286    /// # Examples
1287    ///
1288    /// ```
1289    /// use futures_lite::stream::{self, StreamExt};
1290    ///
1291    /// # spin_on::spin_on(async {
1292    /// let s = stream::iter(vec![&1, &2]);
1293    /// let mut s = s.cloned();
1294    ///
1295    /// assert_eq!(s.next().await, Some(1));
1296    /// assert_eq!(s.next().await, Some(2));
1297    /// assert_eq!(s.next().await, None);
1298    /// # });
1299    /// ```
1300    fn cloned<'a, T>(self) -> Cloned<Self>
1301    where
1302        Self: Stream<Item = &'a T> + Sized,
1303        T: Clone + 'a,
1304    {
1305        Cloned { stream: self }
1306    }
1307
1308    /// Copies all items.
1309    ///
1310    /// # Examples
1311    ///
1312    /// ```
1313    /// use futures_lite::stream::{self, StreamExt};
1314    ///
1315    /// # spin_on::spin_on(async {
1316    /// let s = stream::iter(vec![&1, &2]);
1317    /// let mut s = s.copied();
1318    ///
1319    /// assert_eq!(s.next().await, Some(1));
1320    /// assert_eq!(s.next().await, Some(2));
1321    /// assert_eq!(s.next().await, None);
1322    /// # });
1323    /// ```
1324    fn copied<'a, T>(self) -> Copied<Self>
1325    where
1326        Self: Stream<Item = &'a T> + Sized,
1327        T: Copy + 'a,
1328    {
1329        Copied { stream: self }
1330    }
1331
1332    /// Collects all items in the stream into a collection.
1333    ///
1334    /// # Examples
1335    ///
1336    /// ```
1337    /// use futures_lite::stream::{self, StreamExt};
1338    ///
1339    /// # spin_on::spin_on(async {
1340    /// let mut s = stream::iter(1..=3);
1341    ///
1342    /// let items: Vec<_> = s.collect().await;
1343    /// assert_eq!(items, [1, 2, 3]);
1344    /// # });
1345    /// ```
1346    fn collect<C>(self) -> CollectFuture<Self, C>
1347    where
1348        Self: Sized,
1349        C: Default + Extend<Self::Item>,
1350    {
1351        CollectFuture {
1352            stream: self,
1353            collection: Default::default(),
1354        }
1355    }
1356
1357    /// Collects all items in the fallible stream into a collection.
1358    ///
1359    /// ```
1360    /// use futures_lite::stream::{self, StreamExt};
1361    ///
1362    /// # spin_on::spin_on(async {
1363    /// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]);
1364    /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1365    /// assert_eq!(res, Err(2));
1366    ///
1367    /// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1368    /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1369    /// assert_eq!(res, Ok(vec![1, 2, 3]));
1370    /// # })
1371    /// ```
1372    fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1373    where
1374        Self: Stream<Item = Result<T, E>> + Sized,
1375        C: Default + Extend<T>,
1376    {
1377        TryCollectFuture {
1378            stream: self,
1379            items: Default::default(),
1380        }
1381    }
1382
1383    /// Partitions items into those for which `predicate` is `true` and those for which it is
1384    /// `false`, and then collects them into two collections.
1385    ///
1386    /// # Examples
1387    ///
1388    /// ```
1389    /// use futures_lite::stream::{self, StreamExt};
1390    ///
1391    /// # spin_on::spin_on(async {
1392    /// let s = stream::iter(vec![1, 2, 3]);
1393    /// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await;
1394    ///
1395    /// assert_eq!(even, &[2]);
1396    /// assert_eq!(odd, &[1, 3]);
1397    /// # })
1398    /// ```
1399    fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1400    where
1401        Self: Sized,
1402        B: Default + Extend<Self::Item>,
1403        P: FnMut(&Self::Item) -> bool,
1404    {
1405        PartitionFuture {
1406            stream: self,
1407            predicate,
1408            res: Some(Default::default()),
1409        }
1410    }
1411
1412    /// Accumulates a computation over the stream.
1413    ///
1414    /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1415    /// the accumulator and each item in the stream. The final accumulator value is returned.
1416    ///
1417    /// # Examples
1418    ///
1419    /// ```
1420    /// use futures_lite::stream::{self, StreamExt};
1421    ///
1422    /// # spin_on::spin_on(async {
1423    /// let s = stream::iter(vec![1, 2, 3]);
1424    /// let sum = s.fold(0, |acc, x| acc + x).await;
1425    ///
1426    /// assert_eq!(sum, 6);
1427    /// # })
1428    /// ```
1429    fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1430    where
1431        Self: Sized,
1432        F: FnMut(T, Self::Item) -> T,
1433    {
1434        FoldFuture {
1435            stream: self,
1436            f,
1437            acc: Some(init),
1438        }
1439    }
1440
1441    /// Accumulates a fallible computation over the stream.
1442    ///
1443    /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1444    /// the accumulator and each item in the stream. The final accumulator value is returned, or an
1445    /// error if `f` failed the computation.
1446    ///
1447    /// # Examples
1448    ///
1449    /// ```
1450    /// use futures_lite::stream::{self, StreamExt};
1451    ///
1452    /// # spin_on::spin_on(async {
1453    /// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1454    ///
1455    /// let sum = s.try_fold(0, |acc, v| {
1456    ///     if (acc + v) % 2 == 1 {
1457    ///         Ok(acc + v)
1458    ///     } else {
1459    ///         Err("fail")
1460    ///     }
1461    /// })
1462    /// .await;
1463    ///
1464    /// assert_eq!(sum, Err("fail"));
1465    /// # })
1466    /// ```
1467    fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1468    where
1469        Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1470        F: FnMut(B, T) -> Result<B, E>,
1471    {
1472        TryFoldFuture {
1473            stream: self,
1474            f,
1475            acc: Some(init),
1476        }
1477    }
1478
1479    /// Maps items of the stream to new values using a state value and a closure.
1480    ///
1481    /// Scanning begins with the initial state set to `initial_state`, and then applies `f` to the
1482    /// state and each item in the stream. The stream stops when `f` returns `None`.
1483    ///
1484    /// # Examples
1485    ///
1486    /// ```
1487    /// use futures_lite::stream::{self, StreamExt};
1488    ///
1489    /// # spin_on::spin_on(async {
1490    /// let s = stream::iter(vec![1, 2, 3]);
1491    /// let mut s = s.scan(1, |state, x| {
1492    ///     *state = *state * x;
1493    ///     Some(-*state)
1494    /// });
1495    ///
1496    /// assert_eq!(s.next().await, Some(-1));
1497    /// assert_eq!(s.next().await, Some(-2));
1498    /// assert_eq!(s.next().await, Some(-6));
1499    /// assert_eq!(s.next().await, None);
1500    /// # })
1501    /// ```
1502    fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1503    where
1504        Self: Sized,
1505        F: FnMut(&mut St, Self::Item) -> Option<B>,
1506    {
1507        Scan {
1508            stream: self,
1509            state_f: (initial_state, f),
1510        }
1511    }
1512
1513    /// Fuses the stream so that it stops yielding items after the first [`None`].
1514    ///
1515    /// # Examples
1516    ///
1517    /// ```
1518    /// use futures_lite::stream::{self, StreamExt};
1519    ///
1520    /// # spin_on::spin_on(async {
1521    /// let mut s = stream::once(1).fuse();
1522    ///
1523    /// assert_eq!(s.next().await, Some(1));
1524    /// assert_eq!(s.next().await, None);
1525    /// assert_eq!(s.next().await, None);
1526    /// # })
1527    /// ```
1528    fn fuse(self) -> Fuse<Self>
1529    where
1530        Self: Sized,
1531    {
1532        Fuse {
1533            stream: self,
1534            done: false,
1535        }
1536    }
1537
1538    /// Repeats the stream from beginning to end, forever.
1539    ///
1540    /// # Examples
1541    ///
1542    /// ```
1543    /// use futures_lite::stream::{self, StreamExt};
1544    ///
1545    /// # spin_on::spin_on(async {
1546    /// let mut s = stream::iter(vec![1, 2]).cycle();
1547    ///
1548    /// assert_eq!(s.next().await, Some(1));
1549    /// assert_eq!(s.next().await, Some(2));
1550    /// assert_eq!(s.next().await, Some(1));
1551    /// assert_eq!(s.next().await, Some(2));
1552    /// # });
1553    /// ```
1554    fn cycle(self) -> Cycle<Self>
1555    where
1556        Self: Clone + Sized,
1557    {
1558        Cycle {
1559            orig: self.clone(),
1560            stream: self,
1561        }
1562    }
1563
1564    /// Enumerates items, mapping them to `(index, item)`.
1565    ///
1566    /// # Examples
1567    ///
1568    /// ```
1569    /// use futures_lite::stream::{self, StreamExt};
1570    ///
1571    /// # spin_on::spin_on(async {
1572    /// let s = stream::iter(vec!['a', 'b', 'c']);
1573    /// let mut s = s.enumerate();
1574    ///
1575    /// assert_eq!(s.next().await, Some((0, 'a')));
1576    /// assert_eq!(s.next().await, Some((1, 'b')));
1577    /// assert_eq!(s.next().await, Some((2, 'c')));
1578    /// assert_eq!(s.next().await, None);
1579    /// # });
1580    /// ```
1581    fn enumerate(self) -> Enumerate<Self>
1582    where
1583        Self: Sized,
1584    {
1585        Enumerate { stream: self, i: 0 }
1586    }
1587
1588    /// Calls a closure on each item and passes it on.
1589    ///
1590    /// # Examples
1591    ///
1592    /// ```
1593    /// use futures_lite::stream::{self, StreamExt};
1594    ///
1595    /// # spin_on::spin_on(async {
1596    /// let s = stream::iter(vec![1, 2, 3, 4, 5]);
1597    ///
1598    /// let sum = s
1599    ///    .inspect(|x| println!("about to filter {}", x))
1600    ///    .filter(|x| x % 2 == 0)
1601    ///    .inspect(|x| println!("made it through filter: {}", x))
1602    ///    .fold(0, |sum, i| sum + i)
1603    ///    .await;
1604    /// # });
1605    /// ```
1606    fn inspect<F>(self, f: F) -> Inspect<Self, F>
1607    where
1608        Self: Sized,
1609        F: FnMut(&Self::Item),
1610    {
1611        Inspect { stream: self, f }
1612    }
1613
1614    /// Gets the `n`th item of the stream.
1615    ///
1616    /// In the end, `n+1` items of the stream will be consumed.
1617    ///
1618    /// # Examples
1619    ///
1620    /// ```
1621    /// use futures_lite::stream::{self, StreamExt};
1622    ///
1623    /// # spin_on::spin_on(async {
1624    /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]);
1625    ///
1626    /// assert_eq!(s.nth(2).await, Some(2));
1627    /// assert_eq!(s.nth(2).await, Some(5));
1628    /// assert_eq!(s.nth(2).await, None);
1629    /// # });
1630    /// ```
1631    fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1632    where
1633        Self: Unpin,
1634    {
1635        NthFuture { stream: self, n }
1636    }
1637
1638    /// Returns the last item in the stream.
1639    ///
1640    /// # Examples
1641    ///
1642    /// ```
1643    /// use futures_lite::stream::{self, StreamExt};
1644    ///
1645    /// # spin_on::spin_on(async {
1646    /// let s = stream::iter(vec![1, 2, 3, 4]);
1647    /// assert_eq!(s.last().await, Some(4));
1648    ///
1649    /// let s = stream::empty::<i32>();
1650    /// assert_eq!(s.last().await, None);
1651    /// # });
1652    /// ```
1653    fn last(self) -> LastFuture<Self>
1654    where
1655        Self: Sized,
1656    {
1657        LastFuture {
1658            stream: self,
1659            last: None,
1660        }
1661    }
1662
1663    /// Finds the first item of the stream for which `predicate` returns `true`.
1664    ///
1665    /// # Examples
1666    ///
1667    /// ```
1668    /// use futures_lite::stream::{self, StreamExt};
1669    ///
1670    /// # spin_on::spin_on(async {
1671    /// let mut s = stream::iter(vec![11, 12, 13, 14]);
1672    ///
1673    /// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12));
1674    /// assert_eq!(s.next().await, Some(13));
1675    /// # });
1676    /// ```
1677    fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1678    where
1679        Self: Unpin,
1680        P: FnMut(&Self::Item) -> bool,
1681    {
1682        FindFuture {
1683            stream: self,
1684            predicate,
1685        }
1686    }
1687
1688    /// Applies a closure to items in the stream and returns the first [`Some`] result.
1689    ///
1690    /// # Examples
1691    ///
1692    /// ```
1693    /// use futures_lite::stream::{self, StreamExt};
1694    ///
1695    /// # spin_on::spin_on(async {
1696    /// let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]);
1697    /// let number = s.find_map(|s| s.parse().ok()).await;
1698    ///
1699    /// assert_eq!(number, Some(2));
1700    /// # });
1701    /// ```
1702    fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1703    where
1704        Self: Unpin,
1705        F: FnMut(Self::Item) -> Option<B>,
1706    {
1707        FindMapFuture { stream: self, f }
1708    }
1709
1710    /// Finds the index of the first item of the stream for which `predicate` returns `true`.
1711    ///
1712    /// # Examples
1713    ///
1714    /// ```
1715    /// use futures_lite::stream::{self, StreamExt};
1716    ///
1717    /// # spin_on::spin_on(async {
1718    /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]);
1719    ///
1720    /// assert_eq!(s.position(|x| x == 2).await, Some(2));
1721    /// assert_eq!(s.position(|x| x == 3).await, Some(0));
1722    /// assert_eq!(s.position(|x| x == 9).await, None);
1723    /// # });
1724    /// ```
1725    fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1726    where
1727        Self: Unpin,
1728        P: FnMut(Self::Item) -> bool,
1729    {
1730        PositionFuture {
1731            stream: self,
1732            predicate,
1733            index: 0,
1734        }
1735    }
1736
1737    /// Tests if `predicate` returns `true` for all items in the stream.
1738    ///
1739    /// The result is `true` for an empty stream.
1740    ///
1741    /// # Examples
1742    ///
1743    /// ```
1744    /// use futures_lite::stream::{self, StreamExt};
1745    ///
1746    /// # spin_on::spin_on(async {
1747    /// let mut s = stream::iter(vec![1, 2, 3]);
1748    /// assert!(!s.all(|x| x % 2 == 0).await);
1749    ///
1750    /// let mut s = stream::iter(vec![2, 4, 6, 8]);
1751    /// assert!(s.all(|x| x % 2 == 0).await);
1752    ///
1753    /// let mut s = stream::empty::<i32>();
1754    /// assert!(s.all(|x| x % 2 == 0).await);
1755    /// # });
1756    /// ```
1757    fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1758    where
1759        Self: Unpin,
1760        P: FnMut(Self::Item) -> bool,
1761    {
1762        AllFuture {
1763            stream: self,
1764            predicate,
1765        }
1766    }
1767
1768    /// Tests if `predicate` returns `true` for any item in the stream.
1769    ///
1770    /// The result is `false` for an empty stream.
1771    ///
1772    /// # Examples
1773    ///
1774    /// ```
1775    /// use futures_lite::stream::{self, StreamExt};
1776    ///
1777    /// # spin_on::spin_on(async {
1778    /// let mut s = stream::iter(vec![1, 3, 5, 7]);
1779    /// assert!(!s.any(|x| x % 2 == 0).await);
1780    ///
1781    /// let mut s = stream::iter(vec![1, 2, 3]);
1782    /// assert!(s.any(|x| x % 2 == 0).await);
1783    ///
1784    /// let mut s = stream::empty::<i32>();
1785    /// assert!(!s.any(|x| x % 2 == 0).await);
1786    /// # });
1787    /// ```
1788    fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1789    where
1790        Self: Unpin,
1791        P: FnMut(Self::Item) -> bool,
1792    {
1793        AnyFuture {
1794            stream: self,
1795            predicate,
1796        }
1797    }
1798
1799    /// Calls a closure on each item of the stream.
1800    ///
1801    /// # Examples
1802    ///
1803    /// ```
1804    /// use futures_lite::stream::{self, StreamExt};
1805    ///
1806    /// # spin_on::spin_on(async {
1807    /// let mut s = stream::iter(vec![1, 2, 3]);
1808    /// s.for_each(|s| println!("{}", s)).await;
1809    /// # });
1810    /// ```
1811    fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1812    where
1813        Self: Sized,
1814        F: FnMut(Self::Item),
1815    {
1816        ForEachFuture { stream: self, f }
1817    }
1818
1819    /// Calls a fallible closure on each item of the stream, stopping on first error.
1820    ///
1821    /// # Examples
1822    ///
1823    /// ```
1824    /// use futures_lite::stream::{self, StreamExt};
1825    ///
1826    /// # spin_on::spin_on(async {
1827    /// let mut s = stream::iter(vec![0, 1, 2, 3]);
1828    ///
1829    /// let mut v = vec![];
1830    /// let res = s
1831    ///     .try_for_each(|n| {
1832    ///         if n < 2 {
1833    ///             v.push(n);
1834    ///             Ok(())
1835    ///         } else {
1836    ///             Err("too big")
1837    ///         }
1838    ///     })
1839    ///     .await;
1840    ///
1841    /// assert_eq!(v, &[0, 1]);
1842    /// assert_eq!(res, Err("too big"));
1843    /// # });
1844    /// ```
1845    fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1846    where
1847        Self: Unpin,
1848        F: FnMut(Self::Item) -> Result<(), E>,
1849    {
1850        TryForEachFuture { stream: self, f }
1851    }
1852
1853    /// Zips up two streams into a single stream of pairs.
1854    ///
1855    /// The stream of pairs stops when either of the original two streams is exhausted.
1856    ///
1857    /// # Examples
1858    ///
1859    /// ```
1860    /// use futures_lite::stream::{self, StreamExt};
1861    ///
1862    /// # spin_on::spin_on(async {
1863    /// let l = stream::iter(vec![1, 2, 3]);
1864    /// let r = stream::iter(vec![4, 5, 6, 7]);
1865    /// let mut s = l.zip(r);
1866    ///
1867    /// assert_eq!(s.next().await, Some((1, 4)));
1868    /// assert_eq!(s.next().await, Some((2, 5)));
1869    /// assert_eq!(s.next().await, Some((3, 6)));
1870    /// assert_eq!(s.next().await, None);
1871    /// # });
1872    /// ```
1873    fn zip<U>(self, other: U) -> Zip<Self, U>
1874    where
1875        Self: Sized,
1876        U: Stream,
1877    {
1878        Zip {
1879            item_slot: None,
1880            first: self,
1881            second: other,
1882        }
1883    }
1884
1885    /// Collects a stream of pairs into a pair of collections.
1886    ///
1887    /// # Examples
1888    ///
1889    /// ```
1890    /// use futures_lite::stream::{self, StreamExt};
1891    ///
1892    /// # spin_on::spin_on(async {
1893    /// let s = stream::iter(vec![(1, 2), (3, 4)]);
1894    /// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1895    ///
1896    /// assert_eq!(left, [1, 3]);
1897    /// assert_eq!(right, [2, 4]);
1898    /// # });
1899    /// ```
1900    fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1901    where
1902        FromA: Default + Extend<A>,
1903        FromB: Default + Extend<B>,
1904        Self: Stream<Item = (A, B)> + Sized,
1905    {
1906        UnzipFuture {
1907            stream: self,
1908            res: Some(Default::default()),
1909        }
1910    }
1911
1912    /// Merges with `other` stream, preferring items from `self` whenever both streams are ready.
1913    ///
1914    /// # Examples
1915    ///
1916    /// ```
1917    /// use futures_lite::stream::{self, StreamExt};
1918    /// use futures_lite::stream::{once, pending};
1919    ///
1920    /// # spin_on::spin_on(async {
1921    /// assert_eq!(once(1).or(pending()).next().await, Some(1));
1922    /// assert_eq!(pending().or(once(2)).next().await, Some(2));
1923    ///
1924    /// // The first future wins.
1925    /// assert_eq!(once(1).or(once(2)).next().await, Some(1));
1926    /// # })
1927    /// ```
1928    fn or<S>(self, other: S) -> Or<Self, S>
1929    where
1930        Self: Sized,
1931        S: Stream<Item = Self::Item>,
1932    {
1933        Or {
1934            stream1: self,
1935            stream2: other,
1936        }
1937    }
1938
1939    /// Merges with `other` stream, with no preference for either stream when both are ready.
1940    ///
1941    /// # Examples
1942    ///
1943    /// ```
1944    /// use futures_lite::stream::{self, StreamExt};
1945    /// use futures_lite::stream::{once, pending};
1946    ///
1947    /// # spin_on::spin_on(async {
1948    /// assert_eq!(once(1).race(pending()).next().await, Some(1));
1949    /// assert_eq!(pending().race(once(2)).next().await, Some(2));
1950    ///
1951    /// // One of the two stream is randomly chosen as the winner.
1952    /// let res = once(1).race(once(2)).next().await;
1953    /// # })
1954    /// ```
1955    #[cfg(all(feature = "std", feature = "race"))]
1956    fn race<S>(self, other: S) -> Race<Self, S>
1957    where
1958        Self: Sized,
1959        S: Stream<Item = Self::Item>,
1960    {
1961        Race {
1962            stream1: self,
1963            stream2: other,
1964            rng: Rng::new(),
1965        }
1966    }
1967
1968    /// Yields all immediately available values from a stream.
1969    ///
1970    /// This is intended to be used as a way of polling a stream without waiting, similar to the
1971    /// [`try_iter`] function on [`std::sync::mpsc::Receiver`]. For instance, running this stream
1972    /// on an [`async_channel::Receiver`] will return all messages that are currently in the
1973    /// channel, but will not wait for new messages.
1974    ///
1975    /// This returns a [`Stream`] instead of an [`Iterator`] because it still needs access to the
1976    /// polling context in order to poll the underlying stream. Since this stream will never return
1977    /// `Poll::Pending`, wrapping it in [`block_on`] will allow it to be effectively used as an
1978    /// [`Iterator`].
1979    ///
1980    /// This stream is not necessarily fused. After it returns `None`, it can return `Some(x)` in
1981    /// the future if it is polled again.
1982    ///
1983    /// [`try_iter`]: std::sync::mpsc::Receiver::try_iter
1984    /// [`async_channel::Receiver`]: https://docs.rs/async-channel/latest/async_channel/struct.Receiver.html
1985    /// [`Stream`]: crate::stream::Stream
1986    /// [`Iterator`]: std::iter::Iterator
1987    ///
1988    /// # Examples
1989    ///
1990    /// ```
1991    /// use futures_lite::{future, pin};
1992    /// use futures_lite::stream::{self, StreamExt};
1993    ///
1994    /// # #[cfg(feature = "std")] {
1995    /// // A stream that yields two values, returns `Pending`, and then yields one more value.
1996    /// let pend_once = stream::once_future(async {
1997    ///     future::yield_now().await;
1998    ///     3
1999    /// });
2000    /// let s = stream::iter(vec![1, 2]).chain(pend_once);
2001    /// pin!(s);
2002    ///
2003    /// // This will return the first two values, and then `None` because the stream returns
2004    /// // `Pending` after that.
2005    /// let mut iter = stream::block_on(s.drain());
2006    /// assert_eq!(iter.next(), Some(1));
2007    /// assert_eq!(iter.next(), Some(2));
2008    /// assert_eq!(iter.next(), None);
2009    ///
2010    /// // This will return the last value, because the stream returns `Ready` when polled.
2011    /// assert_eq!(iter.next(), Some(3));
2012    /// assert_eq!(iter.next(), None);
2013    /// # }
2014    /// ```
2015    fn drain(&mut self) -> Drain<'_, Self> {
2016        Drain { stream: self }
2017    }
2018
2019    /// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
2020    ///
2021    /// # Examples
2022    ///
2023    /// ```
2024    /// use futures_lite::stream::{self, StreamExt};
2025    ///
2026    /// # spin_on::spin_on(async {
2027    /// let a = stream::once(1);
2028    /// let b = stream::empty();
2029    ///
2030    /// // Streams of different types can be stored in
2031    /// // the same collection when they are boxed:
2032    /// let streams = vec![a.boxed(), b.boxed()];
2033    /// # })
2034    /// ```
2035    #[cfg(feature = "alloc")]
2036    fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
2037    where
2038        Self: Send + Sized + 'a,
2039    {
2040        Box::pin(self)
2041    }
2042
2043    /// Boxes the stream and changes its type to `dyn Stream + 'a`.
2044    ///
2045    /// # Examples
2046    ///
2047    /// ```
2048    /// use futures_lite::stream::{self, StreamExt};
2049    ///
2050    /// # spin_on::spin_on(async {
2051    /// let a = stream::once(1);
2052    /// let b = stream::empty();
2053    ///
2054    /// // Streams of different types can be stored in
2055    /// // the same collection when they are boxed:
2056    /// let streams = vec![a.boxed_local(), b.boxed_local()];
2057    /// # })
2058    /// ```
2059    #[cfg(feature = "alloc")]
2060    fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
2061    where
2062        Self: Sized + 'a,
2063    {
2064        Box::pin(self)
2065    }
2066}
2067
2068impl<S: Stream + ?Sized> StreamExt for S {}
2069
2070/// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`.
2071///
2072/// # Examples
2073///
2074/// ```
2075/// use futures_lite::stream::{self, StreamExt};
2076///
2077/// // These two lines are equivalent:
2078/// let s1: stream::Boxed<i32> = stream::once(7).boxed();
2079/// let s2: stream::Boxed<i32> = Box::pin(stream::once(7));
2080/// ```
2081#[cfg(feature = "alloc")]
2082pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
2083
2084/// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`.
2085///
2086/// # Examples
2087///
2088/// ```
2089/// use futures_lite::stream::{self, StreamExt};
2090///
2091/// // These two lines are equivalent:
2092/// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local();
2093/// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7));
2094/// ```
2095#[cfg(feature = "alloc")]
2096pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
2097
2098/// Future for the [`StreamExt::next()`] method.
2099#[derive(Debug)]
2100#[must_use = "futures do nothing unless you `.await` or poll them"]
2101pub struct NextFuture<'a, S: ?Sized> {
2102    stream: &'a mut S,
2103}
2104
2105impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
2106
2107impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
2108    type Output = Option<S::Item>;
2109
2110    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2111        self.stream.poll_next(cx)
2112    }
2113}
2114
2115/// Future for the [`StreamExt::try_next()`] method.
2116#[derive(Debug)]
2117#[must_use = "futures do nothing unless you `.await` or poll them"]
2118pub struct TryNextFuture<'a, S: ?Sized> {
2119    stream: &'a mut S,
2120}
2121
2122impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
2123
2124impl<T, E, S> Future for TryNextFuture<'_, S>
2125where
2126    S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
2127{
2128    type Output = Result<Option<T>, E>;
2129
2130    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2131        let res = ready!(self.stream.poll_next(cx));
2132        Poll::Ready(res.transpose())
2133    }
2134}
2135
2136pin_project! {
2137    /// Future for the [`StreamExt::count()`] method.
2138    #[derive(Debug)]
2139    #[must_use = "futures do nothing unless you `.await` or poll them"]
2140    pub struct CountFuture<S: ?Sized> {
2141        count: usize,
2142        #[pin]
2143        stream: S,
2144    }
2145}
2146
2147impl<S: Stream + ?Sized> Future for CountFuture<S> {
2148    type Output = usize;
2149
2150    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2151        loop {
2152            match ready!(self.as_mut().project().stream.poll_next(cx)) {
2153                None => return Poll::Ready(self.count),
2154                Some(_) => *self.as_mut().project().count += 1,
2155            }
2156        }
2157    }
2158}
2159
2160pin_project! {
2161    /// Future for the [`StreamExt::collect()`] method.
2162    #[derive(Debug)]
2163    #[must_use = "futures do nothing unless you `.await` or poll them"]
2164    pub struct CollectFuture<S, C> {
2165        #[pin]
2166        stream: S,
2167        collection: C,
2168    }
2169}
2170
2171impl<S, C> Future for CollectFuture<S, C>
2172where
2173    S: Stream,
2174    C: Default + Extend<S::Item>,
2175{
2176    type Output = C;
2177
2178    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
2179        let mut this = self.as_mut().project();
2180        loop {
2181            match ready!(this.stream.as_mut().poll_next(cx)) {
2182                Some(e) => this.collection.extend(Some(e)),
2183                None => return Poll::Ready(mem::take(self.project().collection)),
2184            }
2185        }
2186    }
2187}
2188
2189pin_project! {
2190    /// Future for the [`StreamExt::try_collect()`] method.
2191    #[derive(Debug)]
2192    #[must_use = "futures do nothing unless you `.await` or poll them"]
2193    pub struct TryCollectFuture<S, C> {
2194        #[pin]
2195        stream: S,
2196        items: C,
2197    }
2198}
2199
2200impl<T, E, S, C> Future for TryCollectFuture<S, C>
2201where
2202    S: Stream<Item = Result<T, E>>,
2203    C: Default + Extend<T>,
2204{
2205    type Output = Result<C, E>;
2206
2207    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2208        let mut this = self.project();
2209        Poll::Ready(Ok(loop {
2210            match ready!(this.stream.as_mut().poll_next(cx)?) {
2211                Some(x) => this.items.extend(Some(x)),
2212                None => break mem::take(this.items),
2213            }
2214        }))
2215    }
2216}
2217
2218pin_project! {
2219    /// Future for the [`StreamExt::partition()`] method.
2220    #[derive(Debug)]
2221    #[must_use = "futures do nothing unless you `.await` or poll them"]
2222    pub struct PartitionFuture<S, P, B> {
2223        #[pin]
2224        stream: S,
2225        predicate: P,
2226        res: Option<(B, B)>,
2227    }
2228}
2229
2230impl<S, P, B> Future for PartitionFuture<S, P, B>
2231where
2232    S: Stream + Sized,
2233    P: FnMut(&S::Item) -> bool,
2234    B: Default + Extend<S::Item>,
2235{
2236    type Output = (B, B);
2237
2238    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2239        let mut this = self.project();
2240        loop {
2241            match ready!(this.stream.as_mut().poll_next(cx)) {
2242                Some(v) => {
2243                    let res = this.res.as_mut().unwrap();
2244                    if (this.predicate)(&v) {
2245                        res.0.extend(Some(v))
2246                    } else {
2247                        res.1.extend(Some(v))
2248                    }
2249                }
2250                None => return Poll::Ready(this.res.take().unwrap()),
2251            }
2252        }
2253    }
2254}
2255
2256pin_project! {
2257    /// Future for the [`StreamExt::fold()`] method.
2258    #[derive(Debug)]
2259    #[must_use = "futures do nothing unless you `.await` or poll them"]
2260    pub struct FoldFuture<S, F, T> {
2261        #[pin]
2262        stream: S,
2263        f: F,
2264        acc: Option<T>,
2265    }
2266}
2267
2268impl<S, F, T> Future for FoldFuture<S, F, T>
2269where
2270    S: Stream,
2271    F: FnMut(T, S::Item) -> T,
2272{
2273    type Output = T;
2274
2275    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2276        let mut this = self.project();
2277        loop {
2278            match ready!(this.stream.as_mut().poll_next(cx)) {
2279                Some(v) => {
2280                    let old = this.acc.take().unwrap();
2281                    let new = (this.f)(old, v);
2282                    *this.acc = Some(new);
2283                }
2284                None => return Poll::Ready(this.acc.take().unwrap()),
2285            }
2286        }
2287    }
2288}
2289
2290/// Future for the [`StreamExt::try_fold()`] method.
2291#[derive(Debug)]
2292#[must_use = "futures do nothing unless you `.await` or poll them"]
2293pub struct TryFoldFuture<'a, S, F, B> {
2294    stream: &'a mut S,
2295    f: F,
2296    acc: Option<B>,
2297}
2298
2299impl<S, F, B> Unpin for TryFoldFuture<'_, S, F, B> {}
2300
2301impl<T, E, S, F, B> Future for TryFoldFuture<'_, S, F, B>
2302where
2303    S: Stream<Item = Result<T, E>> + Unpin,
2304    F: FnMut(B, T) -> Result<B, E>,
2305{
2306    type Output = Result<B, E>;
2307
2308    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2309        loop {
2310            match ready!(self.stream.poll_next(cx)) {
2311                Some(Err(e)) => return Poll::Ready(Err(e)),
2312                Some(Ok(t)) => {
2313                    let old = self.acc.take().unwrap();
2314                    let new = (&mut self.f)(old, t);
2315
2316                    match new {
2317                        Ok(t) => self.acc = Some(t),
2318                        Err(e) => return Poll::Ready(Err(e)),
2319                    }
2320                }
2321                None => return Poll::Ready(Ok(self.acc.take().unwrap())),
2322            }
2323        }
2324    }
2325}
2326
2327pin_project! {
2328    /// Stream for the [`StreamExt::scan()`] method.
2329    #[derive(Clone, Debug)]
2330    #[must_use = "streams do nothing unless polled"]
2331    pub struct Scan<S, St, F> {
2332        #[pin]
2333        stream: S,
2334        state_f: (St, F),
2335    }
2336}
2337
2338impl<S, St, F, B> Stream for Scan<S, St, F>
2339where
2340    S: Stream,
2341    F: FnMut(&mut St, S::Item) -> Option<B>,
2342{
2343    type Item = B;
2344
2345    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
2346        let mut this = self.project();
2347        this.stream.as_mut().poll_next(cx).map(|item| {
2348            item.and_then(|item| {
2349                let (state, f) = this.state_f;
2350                f(state, item)
2351            })
2352        })
2353    }
2354}
2355
2356pin_project! {
2357    /// Stream for the [`StreamExt::fuse()`] method.
2358    #[derive(Clone, Debug)]
2359    #[must_use = "streams do nothing unless polled"]
2360    pub struct Fuse<S> {
2361        #[pin]
2362        stream: S,
2363        done: bool,
2364    }
2365}
2366
2367impl<S: Stream> Stream for Fuse<S> {
2368    type Item = S::Item;
2369
2370    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2371        let this = self.project();
2372
2373        if *this.done {
2374            Poll::Ready(None)
2375        } else {
2376            let next = ready!(this.stream.poll_next(cx));
2377            if next.is_none() {
2378                *this.done = true;
2379            }
2380            Poll::Ready(next)
2381        }
2382    }
2383}
2384
2385pin_project! {
2386    /// Stream for the [`StreamExt::map()`] method.
2387    #[derive(Clone, Debug)]
2388    #[must_use = "streams do nothing unless polled"]
2389    pub struct Map<S, F> {
2390        #[pin]
2391        stream: S,
2392        f: F,
2393    }
2394}
2395
2396impl<S, F, T> Stream for Map<S, F>
2397where
2398    S: Stream,
2399    F: FnMut(S::Item) -> T,
2400{
2401    type Item = T;
2402
2403    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2404        let this = self.project();
2405        let next = ready!(this.stream.poll_next(cx));
2406        Poll::Ready(next.map(this.f))
2407    }
2408
2409    fn size_hint(&self) -> (usize, Option<usize>) {
2410        self.stream.size_hint()
2411    }
2412}
2413
2414pin_project! {
2415    /// Stream for the [`StreamExt::flat_map()`] method.
2416    #[derive(Clone, Debug)]
2417    #[must_use = "streams do nothing unless polled"]
2418    pub struct FlatMap<S, U, F> {
2419        #[pin]
2420        stream: Map<S, F>,
2421        #[pin]
2422        inner_stream: Option<U>,
2423    }
2424}
2425
2426impl<S, U, F> Stream for FlatMap<S, U, F>
2427where
2428    S: Stream,
2429    U: Stream,
2430    F: FnMut(S::Item) -> U,
2431{
2432    type Item = U::Item;
2433
2434    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2435        let mut this = self.project();
2436        loop {
2437            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2438                match ready!(inner.poll_next(cx)) {
2439                    Some(item) => return Poll::Ready(Some(item)),
2440                    None => this.inner_stream.set(None),
2441                }
2442            }
2443
2444            match ready!(this.stream.as_mut().poll_next(cx)) {
2445                Some(stream) => this.inner_stream.set(Some(stream)),
2446                None => return Poll::Ready(None),
2447            }
2448        }
2449    }
2450}
2451
2452pin_project! {
2453    /// Stream for the [`StreamExt::flatten()`] method.
2454    #[derive(Clone, Debug)]
2455    #[must_use = "streams do nothing unless polled"]
2456    pub struct Flatten<S: Stream> {
2457        #[pin]
2458        stream: S,
2459        #[pin]
2460        inner_stream: Option<S::Item>,
2461    }
2462}
2463
2464impl<S, U> Stream for Flatten<S>
2465where
2466    S: Stream<Item = U>,
2467    U: Stream,
2468{
2469    type Item = U::Item;
2470
2471    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2472        let mut this = self.project();
2473        loop {
2474            if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2475                match ready!(inner.poll_next(cx)) {
2476                    Some(item) => return Poll::Ready(Some(item)),
2477                    None => this.inner_stream.set(None),
2478                }
2479            }
2480
2481            match ready!(this.stream.as_mut().poll_next(cx)) {
2482                Some(inner) => this.inner_stream.set(Some(inner)),
2483                None => return Poll::Ready(None),
2484            }
2485        }
2486    }
2487}
2488
2489pin_project! {
2490    /// Stream for the [`StreamExt::then()`] method.
2491    #[derive(Clone, Debug)]
2492    #[must_use = "streams do nothing unless polled"]
2493    pub struct Then<S, F, Fut> {
2494        #[pin]
2495        stream: S,
2496        #[pin]
2497        future: Option<Fut>,
2498        f: F,
2499    }
2500}
2501
2502impl<S, F, Fut> Stream for Then<S, F, Fut>
2503where
2504    S: Stream,
2505    F: FnMut(S::Item) -> Fut,
2506    Fut: Future,
2507{
2508    type Item = Fut::Output;
2509
2510    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2511        let mut this = self.project();
2512
2513        loop {
2514            if let Some(fut) = this.future.as_mut().as_pin_mut() {
2515                let item = ready!(fut.poll(cx));
2516                this.future.set(None);
2517                return Poll::Ready(Some(item));
2518            } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2519                this.future.set(Some((this.f)(item)));
2520            } else {
2521                return Poll::Ready(None);
2522            }
2523        }
2524    }
2525
2526    fn size_hint(&self) -> (usize, Option<usize>) {
2527        let future_len = self.future.is_some() as usize;
2528        let (lower, upper) = self.stream.size_hint();
2529        let lower = lower.saturating_add(future_len);
2530        let upper = upper.and_then(|u| u.checked_add(future_len));
2531        (lower, upper)
2532    }
2533}
2534
2535pin_project! {
2536    /// Stream for the [`StreamExt::filter()`] method.
2537    #[derive(Clone, Debug)]
2538    #[must_use = "streams do nothing unless polled"]
2539    pub struct Filter<S, P> {
2540        #[pin]
2541        stream: S,
2542        predicate: P,
2543    }
2544}
2545
2546impl<S, P> Stream for Filter<S, P>
2547where
2548    S: Stream,
2549    P: FnMut(&S::Item) -> bool,
2550{
2551    type Item = S::Item;
2552
2553    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2554        let mut this = self.project();
2555        loop {
2556            match ready!(this.stream.as_mut().poll_next(cx)) {
2557                None => return Poll::Ready(None),
2558                Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2559                Some(_) => {}
2560            }
2561        }
2562    }
2563
2564    fn size_hint(&self) -> (usize, Option<usize>) {
2565        let (_, hi) = self.stream.size_hint();
2566
2567        // If the filter matches all of the elements, it will match the stream's upper bound.
2568        // If the filter matches none of the elements, there will be zero returned values.
2569        (0, hi)
2570    }
2571}
2572
2573/// Merges two streams, preferring items from `stream1` whenever both streams are ready.
2574///
2575/// # Examples
2576///
2577/// ```
2578/// use futures_lite::stream::{self, once, pending, StreamExt};
2579///
2580/// # spin_on::spin_on(async {
2581/// assert_eq!(stream::or(once(1), pending()).next().await, Some(1));
2582/// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2));
2583///
2584/// // The first stream wins.
2585/// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1));
2586/// # })
2587/// ```
2588pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2589where
2590    S1: Stream<Item = T>,
2591    S2: Stream<Item = T>,
2592{
2593    Or { stream1, stream2 }
2594}
2595
2596pin_project! {
2597    /// Stream for the [`or()`] function and the [`StreamExt::or()`] method.
2598    #[derive(Clone, Debug)]
2599    #[must_use = "streams do nothing unless polled"]
2600    pub struct Or<S1, S2> {
2601        #[pin]
2602        stream1: S1,
2603        #[pin]
2604        stream2: S2,
2605    }
2606}
2607
2608impl<T, S1, S2> Stream for Or<S1, S2>
2609where
2610    S1: Stream<Item = T>,
2611    S2: Stream<Item = T>,
2612{
2613    type Item = T;
2614
2615    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2616        let mut this = self.project();
2617
2618        if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2619            return Poll::Ready(Some(t));
2620        }
2621        this.stream2.as_mut().poll_next(cx)
2622    }
2623}
2624
2625/// Merges two streams, with no preference for either stream when both are ready.
2626///
2627/// # Examples
2628///
2629/// ```
2630/// use futures_lite::stream::{self, once, pending, StreamExt};
2631///
2632/// # spin_on::spin_on(async {
2633/// assert_eq!(stream::race(once(1), pending()).next().await, Some(1));
2634/// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2));
2635///
2636/// // One of the two stream is randomly chosen as the winner.
2637/// let res = stream::race(once(1), once(2)).next().await;
2638/// # })
2639/// ```
2640#[cfg(all(feature = "std", feature = "race"))]
2641pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2642where
2643    S1: Stream<Item = T>,
2644    S2: Stream<Item = T>,
2645{
2646    Race {
2647        stream1,
2648        stream2,
2649        rng: Rng::new(),
2650    }
2651}
2652
2653/// Races two streams, but with a user-provided seed for randomness.
2654///
2655/// # Examples
2656///
2657/// ```
2658/// use futures_lite::stream::{self, once, pending, StreamExt};
2659///
2660/// // A fixed seed is used for reproducibility.
2661/// const SEED: u64 = 123;
2662///
2663/// # spin_on::spin_on(async {
2664/// assert_eq!(stream::race_with_seed(once(1), pending(), SEED).next().await, Some(1));
2665/// assert_eq!(stream::race_with_seed(pending(), once(2), SEED).next().await, Some(2));
2666///
2667/// // One of the two stream is randomly chosen as the winner.
2668/// let res = stream::race_with_seed(once(1), once(2), SEED).next().await;
2669/// # })
2670/// ```
2671#[cfg(feature = "race")]
2672pub fn race_with_seed<T, S1, S2>(stream1: S1, stream2: S2, seed: u64) -> Race<S1, S2>
2673where
2674    S1: Stream<Item = T>,
2675    S2: Stream<Item = T>,
2676{
2677    Race {
2678        stream1,
2679        stream2,
2680        rng: Rng::with_seed(seed),
2681    }
2682}
2683
2684#[cfg(feature = "race")]
2685pin_project! {
2686    /// Stream for the [`race()`] function and the [`StreamExt::race()`] method.
2687    #[derive(Clone, Debug)]
2688    #[must_use = "streams do nothing unless polled"]
2689    pub struct Race<S1, S2> {
2690        #[pin]
2691        stream1: S1,
2692        #[pin]
2693        stream2: S2,
2694        rng: Rng,
2695    }
2696}
2697
2698#[cfg(feature = "race")]
2699impl<T, S1, S2> Stream for Race<S1, S2>
2700where
2701    S1: Stream<Item = T>,
2702    S2: Stream<Item = T>,
2703{
2704    type Item = T;
2705
2706    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2707        let mut this = self.project();
2708
2709        if this.rng.bool() {
2710            if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2711                return Poll::Ready(Some(t));
2712            }
2713            if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2714                return Poll::Ready(Some(t));
2715            }
2716        } else {
2717            if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2718                return Poll::Ready(Some(t));
2719            }
2720            if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2721                return Poll::Ready(Some(t));
2722            }
2723        }
2724        Poll::Pending
2725    }
2726}
2727
2728pin_project! {
2729    /// Stream for the [`StreamExt::filter_map()`] method.
2730    #[derive(Clone, Debug)]
2731    #[must_use = "streams do nothing unless polled"]
2732    pub struct FilterMap<S, F> {
2733        #[pin]
2734        stream: S,
2735        f: F,
2736    }
2737}
2738
2739impl<S, F, T> Stream for FilterMap<S, F>
2740where
2741    S: Stream,
2742    F: FnMut(S::Item) -> Option<T>,
2743{
2744    type Item = T;
2745
2746    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2747        let mut this = self.project();
2748        loop {
2749            match ready!(this.stream.as_mut().poll_next(cx)) {
2750                None => return Poll::Ready(None),
2751                Some(v) => {
2752                    if let Some(t) = (this.f)(v) {
2753                        return Poll::Ready(Some(t));
2754                    }
2755                }
2756            }
2757        }
2758    }
2759}
2760
2761pin_project! {
2762    /// Stream for the [`StreamExt::take()`] method.
2763    #[derive(Clone, Debug)]
2764    #[must_use = "streams do nothing unless polled"]
2765    pub struct Take<S> {
2766        #[pin]
2767        stream: S,
2768        n: usize,
2769    }
2770}
2771
2772impl<S: Stream> Stream for Take<S> {
2773    type Item = S::Item;
2774
2775    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2776        let this = self.project();
2777
2778        if *this.n == 0 {
2779            Poll::Ready(None)
2780        } else {
2781            let next = ready!(this.stream.poll_next(cx));
2782            match next {
2783                Some(_) => *this.n -= 1,
2784                None => *this.n = 0,
2785            }
2786            Poll::Ready(next)
2787        }
2788    }
2789}
2790
2791pin_project! {
2792    /// Stream for the [`StreamExt::take_while()`] method.
2793    #[derive(Clone, Debug)]
2794    #[must_use = "streams do nothing unless polled"]
2795    pub struct TakeWhile<S, P> {
2796        #[pin]
2797        stream: S,
2798        predicate: P,
2799    }
2800}
2801
2802impl<S, P> Stream for TakeWhile<S, P>
2803where
2804    S: Stream,
2805    P: FnMut(&S::Item) -> bool,
2806{
2807    type Item = S::Item;
2808
2809    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2810        let this = self.project();
2811
2812        match ready!(this.stream.poll_next(cx)) {
2813            Some(v) => {
2814                if (this.predicate)(&v) {
2815                    Poll::Ready(Some(v))
2816                } else {
2817                    Poll::Ready(None)
2818                }
2819            }
2820            None => Poll::Ready(None),
2821        }
2822    }
2823}
2824
2825pin_project! {
2826    /// Stream for the [`StreamExt::map_while()`] method.
2827    #[derive(Clone, Debug)]
2828    #[must_use = "streams do nothing unless polled"]
2829    pub struct MapWhile<S, P> {
2830        #[pin]
2831        stream: S,
2832        predicate: P,
2833    }
2834}
2835
2836impl<B, S, P> Stream for MapWhile<S, P>
2837where
2838    S: Stream,
2839    P: FnMut(S::Item) -> Option<B>,
2840{
2841    type Item = B;
2842
2843    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2844        let this = self.project();
2845
2846        match ready!(this.stream.poll_next(cx)) {
2847            Some(v) => Poll::Ready((this.predicate)(v)),
2848            None => Poll::Ready(None),
2849        }
2850    }
2851}
2852
2853pin_project! {
2854    /// Stream for the [`StreamExt::skip()`] method.
2855    #[derive(Clone, Debug)]
2856    #[must_use = "streams do nothing unless polled"]
2857    pub struct Skip<S> {
2858        #[pin]
2859        stream: S,
2860        n: usize,
2861    }
2862}
2863
2864impl<S: Stream> Stream for Skip<S> {
2865    type Item = S::Item;
2866
2867    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2868        let mut this = self.project();
2869        loop {
2870            match ready!(this.stream.as_mut().poll_next(cx)) {
2871                Some(v) => match *this.n {
2872                    0 => return Poll::Ready(Some(v)),
2873                    _ => *this.n -= 1,
2874                },
2875                None => return Poll::Ready(None),
2876            }
2877        }
2878    }
2879}
2880
2881pin_project! {
2882    /// Stream for the [`StreamExt::skip_while()`] method.
2883    #[derive(Clone, Debug)]
2884    #[must_use = "streams do nothing unless polled"]
2885    pub struct SkipWhile<S, P> {
2886        #[pin]
2887        stream: S,
2888        predicate: Option<P>,
2889    }
2890}
2891
2892impl<S, P> Stream for SkipWhile<S, P>
2893where
2894    S: Stream,
2895    P: FnMut(&S::Item) -> bool,
2896{
2897    type Item = S::Item;
2898
2899    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2900        let mut this = self.project();
2901        loop {
2902            match ready!(this.stream.as_mut().poll_next(cx)) {
2903                Some(v) => match this.predicate {
2904                    Some(p) => {
2905                        if !p(&v) {
2906                            *this.predicate = None;
2907                            return Poll::Ready(Some(v));
2908                        }
2909                    }
2910                    None => return Poll::Ready(Some(v)),
2911                },
2912                None => return Poll::Ready(None),
2913            }
2914        }
2915    }
2916}
2917
2918pin_project! {
2919    /// Stream for the [`StreamExt::step_by()`] method.
2920    #[derive(Clone, Debug)]
2921    #[must_use = "streams do nothing unless polled"]
2922    pub struct StepBy<S> {
2923        #[pin]
2924        stream: S,
2925        step: usize,
2926        i: usize,
2927    }
2928}
2929
2930impl<S: Stream> Stream for StepBy<S> {
2931    type Item = S::Item;
2932
2933    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2934        let mut this = self.project();
2935        loop {
2936            match ready!(this.stream.as_mut().poll_next(cx)) {
2937                Some(v) => {
2938                    if *this.i == 0 {
2939                        *this.i = *this.step - 1;
2940                        return Poll::Ready(Some(v));
2941                    } else {
2942                        *this.i -= 1;
2943                    }
2944                }
2945                None => return Poll::Ready(None),
2946            }
2947        }
2948    }
2949}
2950
2951pin_project! {
2952    /// Stream for the [`StreamExt::chain()`] method.
2953    #[derive(Clone, Debug)]
2954    #[must_use = "streams do nothing unless polled"]
2955    pub struct Chain<S, U> {
2956        #[pin]
2957        first: Fuse<S>,
2958        #[pin]
2959        second: Fuse<U>,
2960    }
2961}
2962
2963impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2964    type Item = S::Item;
2965
2966    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2967        let mut this = self.project();
2968
2969        if !this.first.done {
2970            let next = ready!(this.first.as_mut().poll_next(cx));
2971            if let Some(next) = next {
2972                return Poll::Ready(Some(next));
2973            }
2974        }
2975
2976        if !this.second.done {
2977            let next = ready!(this.second.as_mut().poll_next(cx));
2978            if let Some(next) = next {
2979                return Poll::Ready(Some(next));
2980            }
2981        }
2982
2983        if this.first.done && this.second.done {
2984            Poll::Ready(None)
2985        } else {
2986            Poll::Pending
2987        }
2988    }
2989}
2990
2991pin_project! {
2992    /// Stream for the [`StreamExt::cloned()`] method.
2993    #[derive(Clone, Debug)]
2994    #[must_use = "streams do nothing unless polled"]
2995    pub struct Cloned<S> {
2996        #[pin]
2997        stream: S,
2998    }
2999}
3000
3001impl<'a, S, T: 'a> Stream for Cloned<S>
3002where
3003    S: Stream<Item = &'a T>,
3004    T: Clone,
3005{
3006    type Item = T;
3007
3008    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3009        let this = self.project();
3010        let next = ready!(this.stream.poll_next(cx));
3011        Poll::Ready(next.cloned())
3012    }
3013}
3014
3015pin_project! {
3016    /// Stream for the [`StreamExt::copied()`] method.
3017    #[derive(Clone, Debug)]
3018    #[must_use = "streams do nothing unless polled"]
3019    pub struct Copied<S> {
3020        #[pin]
3021        stream: S,
3022    }
3023}
3024
3025impl<'a, S, T: 'a> Stream for Copied<S>
3026where
3027    S: Stream<Item = &'a T>,
3028    T: Copy,
3029{
3030    type Item = T;
3031
3032    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3033        let this = self.project();
3034        let next = ready!(this.stream.poll_next(cx));
3035        Poll::Ready(next.copied())
3036    }
3037}
3038
3039pin_project! {
3040    /// Stream for the [`StreamExt::cycle()`] method.
3041    #[derive(Clone, Debug)]
3042    #[must_use = "streams do nothing unless polled"]
3043    pub struct Cycle<S> {
3044        orig: S,
3045        #[pin]
3046        stream: S,
3047    }
3048}
3049
3050impl<S> Stream for Cycle<S>
3051where
3052    S: Stream + Clone,
3053{
3054    type Item = S::Item;
3055
3056    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3057        match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
3058            Some(item) => Poll::Ready(Some(item)),
3059            None => {
3060                let new = self.as_mut().orig.clone();
3061                self.as_mut().project().stream.set(new);
3062                self.project().stream.poll_next(cx)
3063            }
3064        }
3065    }
3066}
3067
3068pin_project! {
3069    /// Stream for the [`StreamExt::enumerate()`] method.
3070    #[derive(Clone, Debug)]
3071    #[must_use = "streams do nothing unless polled"]
3072    pub struct Enumerate<S> {
3073        #[pin]
3074        stream: S,
3075        i: usize,
3076    }
3077}
3078
3079impl<S> Stream for Enumerate<S>
3080where
3081    S: Stream,
3082{
3083    type Item = (usize, S::Item);
3084
3085    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3086        let this = self.project();
3087
3088        match ready!(this.stream.poll_next(cx)) {
3089            Some(v) => {
3090                let ret = (*this.i, v);
3091                *this.i += 1;
3092                Poll::Ready(Some(ret))
3093            }
3094            None => Poll::Ready(None),
3095        }
3096    }
3097}
3098
3099pin_project! {
3100    /// Stream for the [`StreamExt::inspect()`] method.
3101    #[derive(Clone, Debug)]
3102    #[must_use = "streams do nothing unless polled"]
3103    pub struct Inspect<S, F> {
3104        #[pin]
3105        stream: S,
3106        f: F,
3107    }
3108}
3109
3110impl<S, F> Stream for Inspect<S, F>
3111where
3112    S: Stream,
3113    F: FnMut(&S::Item),
3114{
3115    type Item = S::Item;
3116
3117    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3118        let mut this = self.project();
3119        let next = ready!(this.stream.as_mut().poll_next(cx));
3120        if let Some(x) = &next {
3121            (this.f)(x);
3122        }
3123        Poll::Ready(next)
3124    }
3125}
3126
3127/// Future for the [`StreamExt::nth()`] method.
3128#[derive(Debug)]
3129#[must_use = "futures do nothing unless you `.await` or poll them"]
3130pub struct NthFuture<'a, S: ?Sized> {
3131    stream: &'a mut S,
3132    n: usize,
3133}
3134
3135impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
3136
3137impl<S> Future for NthFuture<'_, S>
3138where
3139    S: Stream + Unpin + ?Sized,
3140{
3141    type Output = Option<S::Item>;
3142
3143    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3144        loop {
3145            match ready!(self.stream.poll_next(cx)) {
3146                Some(v) => match self.n {
3147                    0 => return Poll::Ready(Some(v)),
3148                    _ => self.n -= 1,
3149                },
3150                None => return Poll::Ready(None),
3151            }
3152        }
3153    }
3154}
3155
3156pin_project! {
3157    /// Future for the [`StreamExt::last()`] method.
3158    #[derive(Debug)]
3159    #[must_use = "futures do nothing unless you `.await` or poll them"]
3160    pub struct LastFuture<S: Stream> {
3161        #[pin]
3162        stream: S,
3163        last: Option<S::Item>,
3164    }
3165}
3166
3167impl<S: Stream> Future for LastFuture<S> {
3168    type Output = Option<S::Item>;
3169
3170    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3171        let mut this = self.project();
3172        loop {
3173            match ready!(this.stream.as_mut().poll_next(cx)) {
3174                Some(new) => *this.last = Some(new),
3175                None => return Poll::Ready(this.last.take()),
3176            }
3177        }
3178    }
3179}
3180
3181/// Future for the [`StreamExt::find()`] method.
3182#[derive(Debug)]
3183#[must_use = "futures do nothing unless you `.await` or poll them"]
3184pub struct FindFuture<'a, S: ?Sized, P> {
3185    stream: &'a mut S,
3186    predicate: P,
3187}
3188
3189impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
3190
3191impl<S, P> Future for FindFuture<'_, S, P>
3192where
3193    S: Stream + Unpin + ?Sized,
3194    P: FnMut(&S::Item) -> bool,
3195{
3196    type Output = Option<S::Item>;
3197
3198    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3199        loop {
3200            match ready!(self.stream.poll_next(cx)) {
3201                Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
3202                Some(_) => {}
3203                None => return Poll::Ready(None),
3204            }
3205        }
3206    }
3207}
3208
3209/// Future for the [`StreamExt::find_map()`] method.
3210#[derive(Debug)]
3211#[must_use = "futures do nothing unless you `.await` or poll them"]
3212pub struct FindMapFuture<'a, S: ?Sized, F> {
3213    stream: &'a mut S,
3214    f: F,
3215}
3216
3217impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
3218
3219impl<S, B, F> Future for FindMapFuture<'_, S, F>
3220where
3221    S: Stream + Unpin + ?Sized,
3222    F: FnMut(S::Item) -> Option<B>,
3223{
3224    type Output = Option<B>;
3225
3226    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3227        loop {
3228            match ready!(self.stream.poll_next(cx)) {
3229                Some(v) => {
3230                    if let Some(v) = (&mut self.f)(v) {
3231                        return Poll::Ready(Some(v));
3232                    }
3233                }
3234                None => return Poll::Ready(None),
3235            }
3236        }
3237    }
3238}
3239
3240/// Future for the [`StreamExt::position()`] method.
3241#[derive(Debug)]
3242#[must_use = "futures do nothing unless you `.await` or poll them"]
3243pub struct PositionFuture<'a, S: ?Sized, P> {
3244    stream: &'a mut S,
3245    predicate: P,
3246    index: usize,
3247}
3248
3249impl<S: Unpin + ?Sized, P> Unpin for PositionFuture<'_, S, P> {}
3250
3251impl<S, P> Future for PositionFuture<'_, S, P>
3252where
3253    S: Stream + Unpin + ?Sized,
3254    P: FnMut(S::Item) -> bool,
3255{
3256    type Output = Option<usize>;
3257
3258    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3259        loop {
3260            match ready!(self.stream.poll_next(cx)) {
3261                Some(v) => {
3262                    if (&mut self.predicate)(v) {
3263                        return Poll::Ready(Some(self.index));
3264                    } else {
3265                        self.index += 1;
3266                    }
3267                }
3268                None => return Poll::Ready(None),
3269            }
3270        }
3271    }
3272}
3273
3274/// Future for the [`StreamExt::all()`] method.
3275#[derive(Debug)]
3276#[must_use = "futures do nothing unless you `.await` or poll them"]
3277pub struct AllFuture<'a, S: ?Sized, P> {
3278    stream: &'a mut S,
3279    predicate: P,
3280}
3281
3282impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
3283
3284impl<S, P> Future for AllFuture<'_, S, P>
3285where
3286    S: Stream + Unpin + ?Sized,
3287    P: FnMut(S::Item) -> bool,
3288{
3289    type Output = bool;
3290
3291    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3292        loop {
3293            match ready!(self.stream.poll_next(cx)) {
3294                Some(v) => {
3295                    if !(&mut self.predicate)(v) {
3296                        return Poll::Ready(false);
3297                    }
3298                }
3299                None => return Poll::Ready(true),
3300            }
3301        }
3302    }
3303}
3304
3305/// Future for the [`StreamExt::any()`] method.
3306#[derive(Debug)]
3307#[must_use = "futures do nothing unless you `.await` or poll them"]
3308pub struct AnyFuture<'a, S: ?Sized, P> {
3309    stream: &'a mut S,
3310    predicate: P,
3311}
3312
3313impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
3314
3315impl<S, P> Future for AnyFuture<'_, S, P>
3316where
3317    S: Stream + Unpin + ?Sized,
3318    P: FnMut(S::Item) -> bool,
3319{
3320    type Output = bool;
3321
3322    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3323        loop {
3324            match ready!(self.stream.poll_next(cx)) {
3325                Some(v) => {
3326                    if (&mut self.predicate)(v) {
3327                        return Poll::Ready(true);
3328                    }
3329                }
3330                None => return Poll::Ready(false),
3331            }
3332        }
3333    }
3334}
3335
3336pin_project! {
3337    /// Future for the [`StreamExt::for_each()`] method.
3338    #[derive(Debug)]
3339    #[must_use = "futures do nothing unless you `.await` or poll them"]
3340    pub struct ForEachFuture<S, F> {
3341        #[pin]
3342        stream: S,
3343        f: F,
3344    }
3345}
3346
3347impl<S, F> Future for ForEachFuture<S, F>
3348where
3349    S: Stream,
3350    F: FnMut(S::Item),
3351{
3352    type Output = ();
3353
3354    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3355        let mut this = self.project();
3356        loop {
3357            match ready!(this.stream.as_mut().poll_next(cx)) {
3358                Some(v) => (this.f)(v),
3359                None => return Poll::Ready(()),
3360            }
3361        }
3362    }
3363}
3364
3365/// Future for the [`StreamExt::try_for_each()`] method.
3366#[derive(Debug)]
3367#[must_use = "futures do nothing unless you `.await` or poll them"]
3368pub struct TryForEachFuture<'a, S: ?Sized, F> {
3369    stream: &'a mut S,
3370    f: F,
3371}
3372
3373impl<S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'_, S, F> {}
3374
3375impl<S, F, E> Future for TryForEachFuture<'_, S, F>
3376where
3377    S: Stream + Unpin + ?Sized,
3378    F: FnMut(S::Item) -> Result<(), E>,
3379{
3380    type Output = Result<(), E>;
3381
3382    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3383        loop {
3384            match ready!(self.stream.poll_next(cx)) {
3385                None => return Poll::Ready(Ok(())),
3386                Some(v) => (&mut self.f)(v)?,
3387            }
3388        }
3389    }
3390}
3391
3392pin_project! {
3393    /// Stream for the [`StreamExt::zip()`] method.
3394    #[derive(Clone, Debug)]
3395    #[must_use = "streams do nothing unless polled"]
3396    pub struct Zip<A: Stream, B> {
3397        item_slot: Option<A::Item>,
3398        #[pin]
3399        first: A,
3400        #[pin]
3401        second: B,
3402    }
3403}
3404
3405impl<A: Stream, B: Stream> Stream for Zip<A, B> {
3406    type Item = (A::Item, B::Item);
3407
3408    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3409        let this = self.project();
3410
3411        if this.item_slot.is_none() {
3412            match this.first.poll_next(cx) {
3413                Poll::Pending => return Poll::Pending,
3414                Poll::Ready(None) => return Poll::Ready(None),
3415                Poll::Ready(Some(item)) => *this.item_slot = Some(item),
3416            }
3417        }
3418
3419        let second_item = ready!(this.second.poll_next(cx));
3420        let first_item = this.item_slot.take().unwrap();
3421        Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
3422    }
3423}
3424
3425pin_project! {
3426    /// Future for the [`StreamExt::unzip()`] method.
3427    #[derive(Debug)]
3428    #[must_use = "futures do nothing unless you `.await` or poll them"]
3429    pub struct UnzipFuture<S, FromA, FromB> {
3430        #[pin]
3431        stream: S,
3432        res: Option<(FromA, FromB)>,
3433    }
3434}
3435
3436impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
3437where
3438    S: Stream<Item = (A, B)>,
3439    FromA: Default + Extend<A>,
3440    FromB: Default + Extend<B>,
3441{
3442    type Output = (FromA, FromB);
3443
3444    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3445        let mut this = self.project();
3446
3447        loop {
3448            match ready!(this.stream.as_mut().poll_next(cx)) {
3449                Some((a, b)) => {
3450                    let res = this.res.as_mut().unwrap();
3451                    res.0.extend(Some(a));
3452                    res.1.extend(Some(b));
3453                }
3454                None => return Poll::Ready(this.res.take().unwrap()),
3455            }
3456        }
3457    }
3458}
3459
3460/// Stream for the [`StreamExt::drain()`] method.
3461#[derive(Debug)]
3462#[must_use = "streams do nothing unless polled"]
3463pub struct Drain<'a, S: ?Sized> {
3464    stream: &'a mut S,
3465}
3466
3467impl<S: Unpin + ?Sized> Unpin for Drain<'_, S> {}
3468
3469impl<'a, S: Unpin + ?Sized> Drain<'a, S> {
3470    /// Get a reference to the underlying stream.
3471    ///
3472    /// ## Examples
3473    ///
3474    /// ```
3475    /// use futures_lite::{prelude::*, stream};
3476    ///
3477    /// # spin_on::spin_on(async {
3478    /// let mut s = stream::iter(vec![1, 2, 3]);
3479    /// let s2 = s.drain();
3480    ///
3481    /// let inner = s2.get_ref();
3482    /// // s and inner are the same.
3483    /// # });
3484    /// ```
3485    pub fn get_ref(&self) -> &S {
3486        &self.stream
3487    }
3488
3489    /// Get a mutable reference to the underlying stream.
3490    ///
3491    /// ## Examples
3492    ///
3493    /// ```
3494    /// use futures_lite::{prelude::*, stream};
3495    ///
3496    /// # spin_on::spin_on(async {
3497    /// let mut s = stream::iter(vec![1, 2, 3]);
3498    /// let mut s2 = s.drain();
3499    ///
3500    /// let inner = s2.get_mut();
3501    /// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
3502    /// # });
3503    /// ```
3504    pub fn get_mut(&mut self) -> &mut S {
3505        &mut self.stream
3506    }
3507
3508    /// Consume this stream and get the underlying stream.
3509    ///
3510    /// ## Examples
3511    ///
3512    /// ```
3513    /// use futures_lite::{prelude::*, stream};
3514    ///
3515    /// # spin_on::spin_on(async {
3516    /// let mut s = stream::iter(vec![1, 2, 3]);
3517    /// let mut s2 = s.drain();
3518    ///
3519    /// let inner = s2.into_inner();
3520    /// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
3521    /// # });
3522    /// ```
3523    pub fn into_inner(self) -> &'a mut S {
3524        self.stream
3525    }
3526}
3527
3528impl<S: Stream + Unpin + ?Sized> Stream for Drain<'_, S> {
3529    type Item = S::Item;
3530
3531    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3532        match self.stream.poll_next(cx) {
3533            Poll::Ready(x) => Poll::Ready(x),
3534            Poll::Pending => Poll::Ready(None),
3535        }
3536    }
3537
3538    fn size_hint(&self) -> (usize, Option<usize>) {
3539        let (_, hi) = self.stream.size_hint();
3540        (0, hi)
3541    }
3542}