futures_lite/stream.rs
1//! Combinators for the [`Stream`] trait.
2//!
3//! # Examples
4//!
5//! ```
6//! use futures_lite::stream::{self, StreamExt};
7//!
8//! # spin_on::spin_on(async {
9//! let mut s = stream::iter(vec![1, 2, 3]);
10//!
11//! assert_eq!(s.next().await, Some(1));
12//! assert_eq!(s.next().await, Some(2));
13//! assert_eq!(s.next().await, Some(3));
14//! assert_eq!(s.next().await, None);
15//! # });
16//! ```
17
18#[doc(no_inline)]
19pub use futures_core::stream::Stream;
20
21#[cfg(feature = "alloc")]
22use alloc::boxed::Box;
23
24use core::fmt;
25use core::future::Future;
26use core::marker::PhantomData;
27use core::mem;
28use core::pin::Pin;
29use core::task::{Context, Poll};
30
31#[cfg(feature = "race")]
32use fastrand::Rng;
33
34use pin_project_lite::pin_project;
35
36use crate::ready;
37
38/// Converts a stream into a blocking iterator.
39///
40/// # Examples
41///
42/// ```
43/// use futures_lite::{pin, stream};
44///
45/// let stream = stream::once(7);
46/// pin!(stream);
47///
48/// let mut iter = stream::block_on(stream);
49/// assert_eq!(iter.next(), Some(7));
50/// assert_eq!(iter.next(), None);
51/// ```
52#[cfg(feature = "std")]
53pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
54 BlockOn(stream)
55}
56
57/// Iterator for the [`block_on()`] function.
58#[derive(Debug)]
59pub struct BlockOn<S>(S);
60
61#[cfg(feature = "std")]
62impl<S: Stream + Unpin> Iterator for BlockOn<S> {
63 type Item = S::Item;
64
65 fn next(&mut self) -> Option<Self::Item> {
66 crate::future::block_on(self.0.next())
67 }
68
69 fn size_hint(&self) -> (usize, Option<usize>) {
70 self.0.size_hint()
71 }
72
73 fn count(self) -> usize {
74 crate::future::block_on(self.0.count())
75 }
76
77 fn last(self) -> Option<Self::Item> {
78 crate::future::block_on(self.0.last())
79 }
80
81 fn nth(&mut self, n: usize) -> Option<Self::Item> {
82 crate::future::block_on(self.0.nth(n))
83 }
84
85 fn fold<B, F>(self, init: B, f: F) -> B
86 where
87 F: FnMut(B, Self::Item) -> B,
88 {
89 crate::future::block_on(self.0.fold(init, f))
90 }
91
92 fn for_each<F>(self, f: F) -> F::Output
93 where
94 F: FnMut(Self::Item),
95 {
96 crate::future::block_on(self.0.for_each(f))
97 }
98
99 fn all<F>(&mut self, f: F) -> bool
100 where
101 F: FnMut(Self::Item) -> bool,
102 {
103 crate::future::block_on(self.0.all(f))
104 }
105
106 fn any<F>(&mut self, f: F) -> bool
107 where
108 F: FnMut(Self::Item) -> bool,
109 {
110 crate::future::block_on(self.0.any(f))
111 }
112
113 fn find<P>(&mut self, predicate: P) -> Option<Self::Item>
114 where
115 P: FnMut(&Self::Item) -> bool,
116 {
117 crate::future::block_on(self.0.find(predicate))
118 }
119
120 fn find_map<B, F>(&mut self, f: F) -> Option<B>
121 where
122 F: FnMut(Self::Item) -> Option<B>,
123 {
124 crate::future::block_on(self.0.find_map(f))
125 }
126
127 fn position<P>(&mut self, predicate: P) -> Option<usize>
128 where
129 P: FnMut(Self::Item) -> bool,
130 {
131 crate::future::block_on(self.0.position(predicate))
132 }
133}
134
135/// Creates an empty stream.
136///
137/// # Examples
138///
139/// ```
140/// use futures_lite::stream::{self, StreamExt};
141///
142/// # spin_on::spin_on(async {
143/// let mut s = stream::empty::<i32>();
144/// assert_eq!(s.next().await, None);
145/// # })
146/// ```
147pub fn empty<T>() -> Empty<T> {
148 Empty {
149 _marker: PhantomData,
150 }
151}
152
153/// Stream for the [`empty()`] function.
154#[derive(Clone, Debug)]
155#[must_use = "streams do nothing unless polled"]
156pub struct Empty<T> {
157 _marker: PhantomData<T>,
158}
159
160impl<T> Unpin for Empty<T> {}
161
162impl<T> Stream for Empty<T> {
163 type Item = T;
164
165 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
166 Poll::Ready(None)
167 }
168
169 fn size_hint(&self) -> (usize, Option<usize>) {
170 (0, Some(0))
171 }
172}
173
174/// Creates a stream from an iterator.
175///
176/// # Examples
177///
178/// ```
179/// use futures_lite::stream::{self, StreamExt};
180///
181/// # spin_on::spin_on(async {
182/// let mut s = stream::iter(vec![1, 2]);
183///
184/// assert_eq!(s.next().await, Some(1));
185/// assert_eq!(s.next().await, Some(2));
186/// assert_eq!(s.next().await, None);
187/// # })
188/// ```
189pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
190 Iter {
191 iter: iter.into_iter(),
192 }
193}
194
195/// Stream for the [`iter()`] function.
196#[derive(Clone, Debug)]
197#[must_use = "streams do nothing unless polled"]
198pub struct Iter<I> {
199 iter: I,
200}
201
202impl<I> Unpin for Iter<I> {}
203
204impl<I: Iterator> Stream for Iter<I> {
205 type Item = I::Item;
206
207 fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
208 Poll::Ready(self.iter.next())
209 }
210
211 fn size_hint(&self) -> (usize, Option<usize>) {
212 self.iter.size_hint()
213 }
214}
215
216/// Creates a stream that yields a single item.
217///
218/// # Examples
219///
220/// ```
221/// use futures_lite::stream::{self, StreamExt};
222///
223/// # spin_on::spin_on(async {
224/// let mut s = stream::once(7);
225///
226/// assert_eq!(s.next().await, Some(7));
227/// assert_eq!(s.next().await, None);
228/// # })
229/// ```
230pub fn once<T>(t: T) -> Once<T> {
231 Once { value: Some(t) }
232}
233
234pin_project! {
235 /// Stream for the [`once()`] function.
236 #[derive(Clone, Debug)]
237 #[must_use = "streams do nothing unless polled"]
238 pub struct Once<T> {
239 value: Option<T>,
240 }
241}
242
243impl<T> Stream for Once<T> {
244 type Item = T;
245
246 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
247 Poll::Ready(self.project().value.take())
248 }
249
250 fn size_hint(&self) -> (usize, Option<usize>) {
251 if self.value.is_some() {
252 (1, Some(1))
253 } else {
254 (0, Some(0))
255 }
256 }
257}
258
259/// Creates a stream that is always pending.
260///
261/// # Examples
262///
263/// ```no_run
264/// use futures_lite::stream::{self, StreamExt};
265///
266/// # spin_on::spin_on(async {
267/// let mut s = stream::pending::<i32>();
268/// s.next().await;
269/// unreachable!();
270/// # })
271/// ```
272pub fn pending<T>() -> Pending<T> {
273 Pending {
274 _marker: PhantomData,
275 }
276}
277
278/// Stream for the [`pending()`] function.
279#[derive(Clone, Debug)]
280#[must_use = "streams do nothing unless polled"]
281pub struct Pending<T> {
282 _marker: PhantomData<T>,
283}
284
285impl<T> Unpin for Pending<T> {}
286
287impl<T> Stream for Pending<T> {
288 type Item = T;
289
290 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
291 Poll::Pending
292 }
293
294 fn size_hint(&self) -> (usize, Option<usize>) {
295 (0, Some(0))
296 }
297}
298
299/// Creates a stream from a function returning [`Poll`].
300///
301/// # Examples
302///
303/// ```
304/// use futures_lite::stream::{self, StreamExt};
305/// use std::task::{Context, Poll};
306///
307/// # spin_on::spin_on(async {
308/// fn f(_: &mut Context<'_>) -> Poll<Option<i32>> {
309/// Poll::Ready(Some(7))
310/// }
311///
312/// assert_eq!(stream::poll_fn(f).next().await, Some(7));
313/// # })
314/// ```
315pub fn poll_fn<T, F>(f: F) -> PollFn<F>
316where
317 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
318{
319 PollFn { f }
320}
321
322/// Stream for the [`poll_fn()`] function.
323#[derive(Clone)]
324#[must_use = "streams do nothing unless polled"]
325pub struct PollFn<F> {
326 f: F,
327}
328
329impl<F> Unpin for PollFn<F> {}
330
331impl<F> fmt::Debug for PollFn<F> {
332 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
333 f.debug_struct("PollFn").finish()
334 }
335}
336
337impl<T, F> Stream for PollFn<F>
338where
339 F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
340{
341 type Item = T;
342
343 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
344 (&mut self.f)(cx)
345 }
346}
347
348/// Creates an infinite stream that yields the same item repeatedly.
349///
350/// # Examples
351///
352/// ```
353/// use futures_lite::stream::{self, StreamExt};
354///
355/// # spin_on::spin_on(async {
356/// let mut s = stream::repeat(7);
357///
358/// assert_eq!(s.next().await, Some(7));
359/// assert_eq!(s.next().await, Some(7));
360/// # })
361/// ```
362pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
363 Repeat { item }
364}
365
366/// Stream for the [`repeat()`] function.
367#[derive(Clone, Debug)]
368#[must_use = "streams do nothing unless polled"]
369pub struct Repeat<T> {
370 item: T,
371}
372
373impl<T> Unpin for Repeat<T> {}
374
375impl<T: Clone> Stream for Repeat<T> {
376 type Item = T;
377
378 fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
379 Poll::Ready(Some(self.item.clone()))
380 }
381
382 fn size_hint(&self) -> (usize, Option<usize>) {
383 (usize::MAX, None)
384 }
385}
386
387/// Creates an infinite stream from a closure that generates items.
388///
389/// # Examples
390///
391/// ```
392/// use futures_lite::stream::{self, StreamExt};
393///
394/// # spin_on::spin_on(async {
395/// let mut s = stream::repeat_with(|| 7);
396///
397/// assert_eq!(s.next().await, Some(7));
398/// assert_eq!(s.next().await, Some(7));
399/// # })
400/// ```
401pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
402where
403 F: FnMut() -> T,
404{
405 RepeatWith { f: repeater }
406}
407
408/// Stream for the [`repeat_with()`] function.
409#[derive(Clone, Debug)]
410#[must_use = "streams do nothing unless polled"]
411pub struct RepeatWith<F> {
412 f: F,
413}
414
415impl<F> Unpin for RepeatWith<F> {}
416
417impl<T, F> Stream for RepeatWith<F>
418where
419 F: FnMut() -> T,
420{
421 type Item = T;
422
423 fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
424 let item = (&mut self.f)();
425 Poll::Ready(Some(item))
426 }
427
428 fn size_hint(&self) -> (usize, Option<usize>) {
429 (usize::MAX, None)
430 }
431}
432
433/// Creates a stream from a seed value and an async closure operating on it.
434///
435/// # Examples
436///
437/// ```
438/// use futures_lite::stream::{self, StreamExt};
439///
440/// # spin_on::spin_on(async {
441/// let s = stream::unfold(0, |mut n| async move {
442/// if n < 2 {
443/// let m = n + 1;
444/// Some((n, m))
445/// } else {
446/// None
447/// }
448/// });
449///
450/// let v: Vec<i32> = s.collect().await;
451/// assert_eq!(v, [0, 1]);
452/// # })
453/// ```
454pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
455where
456 F: FnMut(T) -> Fut,
457 Fut: Future<Output = Option<(Item, T)>>,
458{
459 Unfold {
460 f,
461 state: Some(seed),
462 fut: None,
463 }
464}
465
466pin_project! {
467 /// Stream for the [`unfold()`] function.
468 #[derive(Clone)]
469 #[must_use = "streams do nothing unless polled"]
470 pub struct Unfold<T, F, Fut> {
471 f: F,
472 state: Option<T>,
473 #[pin]
474 fut: Option<Fut>,
475 }
476}
477
478impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
479where
480 T: fmt::Debug,
481 Fut: fmt::Debug,
482{
483 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
484 f.debug_struct("Unfold")
485 .field("state", &self.state)
486 .field("fut", &self.fut)
487 .finish()
488 }
489}
490
491impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
492where
493 F: FnMut(T) -> Fut,
494 Fut: Future<Output = Option<(Item, T)>>,
495{
496 type Item = Item;
497
498 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
499 let mut this = self.project();
500
501 if let Some(state) = this.state.take() {
502 this.fut.set(Some((this.f)(state)));
503 }
504
505 let step = ready!(this
506 .fut
507 .as_mut()
508 .as_pin_mut()
509 .expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
510 .poll(cx));
511 this.fut.set(None);
512
513 if let Some((item, next_state)) = step {
514 *this.state = Some(next_state);
515 Poll::Ready(Some(item))
516 } else {
517 Poll::Ready(None)
518 }
519 }
520}
521
522/// Creates a stream from a seed value and a fallible async closure operating on it.
523///
524/// # Examples
525///
526/// ```
527/// use futures_lite::stream::{self, StreamExt};
528///
529/// # spin_on::spin_on(async {
530/// let s = stream::try_unfold(0, |mut n| async move {
531/// if n < 2 {
532/// let m = n + 1;
533/// Ok(Some((n, m)))
534/// } else {
535/// std::io::Result::Ok(None)
536/// }
537/// });
538///
539/// let v: Vec<i32> = s.try_collect().await?;
540/// assert_eq!(v, [0, 1]);
541/// # std::io::Result::Ok(()) });
542/// ```
543pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
544where
545 F: FnMut(T) -> Fut,
546 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
547{
548 TryUnfold {
549 f,
550 state: Some(init),
551 fut: None,
552 }
553}
554
555pin_project! {
556 /// Stream for the [`try_unfold()`] function.
557 #[derive(Clone)]
558 #[must_use = "streams do nothing unless polled"]
559 pub struct TryUnfold<T, F, Fut> {
560 f: F,
561 state: Option<T>,
562 #[pin]
563 fut: Option<Fut>,
564 }
565}
566
567impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
568where
569 T: fmt::Debug,
570 Fut: fmt::Debug,
571{
572 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
573 f.debug_struct("TryUnfold")
574 .field("state", &self.state)
575 .field("fut", &self.fut)
576 .finish()
577 }
578}
579
580impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
581where
582 F: FnMut(T) -> Fut,
583 Fut: Future<Output = Result<Option<(Item, T)>, E>>,
584{
585 type Item = Result<Item, E>;
586
587 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
588 let mut this = self.project();
589
590 if let Some(state) = this.state.take() {
591 this.fut.set(Some((this.f)(state)));
592 }
593
594 match this.fut.as_mut().as_pin_mut() {
595 None => {
596 // The future previously errored
597 Poll::Ready(None)
598 }
599 Some(future) => {
600 let step = ready!(future.poll(cx));
601 this.fut.set(None);
602
603 match step {
604 Ok(Some((item, next_state))) => {
605 *this.state = Some(next_state);
606 Poll::Ready(Some(Ok(item)))
607 }
608 Ok(None) => Poll::Ready(None),
609 Err(e) => Poll::Ready(Some(Err(e))),
610 }
611 }
612 }
613 }
614}
615
616/// Creates a stream that invokes the given future as its first item, and then
617/// produces no more items.
618///
619/// # Example
620///
621/// ```
622/// use futures_lite::{stream, prelude::*};
623///
624/// # spin_on::spin_on(async {
625/// let mut stream = Box::pin(stream::once_future(async { 1 }));
626/// assert_eq!(stream.next().await, Some(1));
627/// assert_eq!(stream.next().await, None);
628/// # });
629/// ```
630pub fn once_future<F: Future>(future: F) -> OnceFuture<F> {
631 OnceFuture {
632 future: Some(future),
633 }
634}
635
636pin_project! {
637 /// Stream for the [`once_future()`] function.
638 #[derive(Debug)]
639 #[must_use = "futures do nothing unless you `.await` or poll them"]
640 pub struct OnceFuture<F> {
641 #[pin]
642 future: Option<F>,
643 }
644}
645
646impl<F: Future> Stream for OnceFuture<F> {
647 type Item = F::Output;
648
649 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
650 let mut this = self.project();
651
652 match this.future.as_mut().as_pin_mut().map(|f| f.poll(cx)) {
653 Some(Poll::Ready(t)) => {
654 this.future.set(None);
655 Poll::Ready(Some(t))
656 }
657 Some(Poll::Pending) => Poll::Pending,
658 None => Poll::Ready(None),
659 }
660 }
661}
662
663/// Take elements from this stream until the provided future resolves.
664///
665/// This function will take elements from the stream until the provided
666/// stopping future `fut` resolves. Once the `fut` future becomes ready,
667/// this stream combinator will always return that the stream is done.
668///
669/// The stopping future may return any type. Once the stream is stopped
670/// the result of the stopping future may be accessed with `StopAfterFuture::take_result()`.
671/// The stream may also be resumed with `StopAfterFuture::take_future()`.
672/// See the documentation of [`StopAfterFuture`] for more information.
673///
674/// ```
675/// use futures_lite::stream::{self, StreamExt, stop_after_future};
676/// use futures_lite::future;
677/// use std::task::Poll;
678///
679/// let stream = stream::iter(1..=10);
680///
681/// # spin_on::spin_on(async {
682/// let mut i = 0;
683/// let stop_fut = future::poll_fn(|_cx| {
684/// i += 1;
685/// if i <= 5 {
686/// Poll::Pending
687/// } else {
688/// Poll::Ready(())
689/// }
690/// });
691///
692/// let stream = stop_after_future(stream, stop_fut);
693///
694/// assert_eq!(vec![1, 2, 3, 4, 5], stream.collect::<Vec<_>>().await);
695/// # });
696pub fn stop_after_future<S, F>(stream: S, future: F) -> StopAfterFuture<S, F>
697where
698 S: Sized + Stream,
699 F: Future,
700{
701 StopAfterFuture {
702 stream,
703 fut: Some(future),
704 fut_result: None,
705 free: false,
706 }
707}
708
709pin_project! {
710 /// Stream for the [`stop_after_future()`] function.
711 #[derive(Clone, Debug)]
712 #[must_use = "streams do nothing unless polled"]
713 pub struct StopAfterFuture<S: Stream, Fut: Future> {
714 #[pin]
715 stream: S,
716 // Contains the inner Future on start and None once the inner Future is resolved
717 // or taken out by the user.
718 #[pin]
719 fut: Option<Fut>,
720 // Contains fut's return value once fut is resolved
721 fut_result: Option<Fut::Output>,
722 // Whether the future was taken out by the user.
723 free: bool,
724 }
725}
726
727impl<St, Fut> StopAfterFuture<St, Fut>
728where
729 St: Stream,
730 Fut: Future,
731{
732 /// Extract the stopping future out of the combinator.
733 ///
734 /// The future is returned only if it isn't resolved yet, ie. if the stream isn't stopped yet.
735 /// Taking out the future means the combinator will be yielding
736 /// elements from the wrapped stream without ever stopping it.
737 pub fn take_future(&mut self) -> Option<Fut> {
738 if self.fut.is_some() {
739 self.free = true;
740 }
741
742 self.fut.take()
743 }
744
745 /// Once the stopping future is resolved, this method can be used
746 /// to extract the value returned by the stopping future.
747 ///
748 /// This may be used to retrieve arbitrary data from the stopping
749 /// future, for example a reason why the stream was stopped.
750 ///
751 /// This method will return `None` if the future isn't resolved yet,
752 /// or if the result was already taken out.
753 ///
754 /// # Examples
755 ///
756 /// ```
757 /// # spin_on::spin_on(async {
758 /// use futures_lite::stream::{self, StreamExt, stop_after_future};
759 /// use futures_lite::future;
760 /// use std::task::Poll;
761 ///
762 /// let stream = stream::iter(1..=10);
763 ///
764 /// let mut i = 0;
765 /// let stop_fut = future::poll_fn(|_cx| {
766 /// i += 1;
767 /// if i <= 5 {
768 /// Poll::Pending
769 /// } else {
770 /// Poll::Ready("reason")
771 /// }
772 /// });
773 ///
774 /// let mut stream = stop_after_future(stream, stop_fut);
775 /// let _ = (&mut stream).collect::<Vec<_>>().await;
776 ///
777 /// let result = stream.take_result().unwrap();
778 /// assert_eq!(result, "reason");
779 /// # });
780 /// ```
781 pub fn take_result(&mut self) -> Option<Fut::Output> {
782 self.fut_result.take()
783 }
784
785 /// Whether the stream was stopped yet by the stopping future
786 /// being resolved.
787 pub fn is_stopped(&self) -> bool {
788 !self.free && self.fut.is_none()
789 }
790}
791
792impl<St, Fut> Stream for StopAfterFuture<St, Fut>
793where
794 St: Stream,
795 Fut: Future,
796{
797 type Item = St::Item;
798
799 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<St::Item>> {
800 let mut this = self.project();
801
802 if let Some(f) = this.fut.as_mut().as_pin_mut() {
803 if let Poll::Ready(result) = f.poll(cx) {
804 this.fut.set(None);
805 *this.fut_result = Some(result);
806 }
807 }
808
809 if !*this.free && this.fut.is_none() {
810 // Future resolved, inner stream stopped
811 Poll::Ready(None)
812 } else {
813 // Future either not resolved yet or taken out by the user
814 let item = ready!(this.stream.poll_next(cx));
815 if item.is_none() {
816 this.fut.set(None);
817 }
818 Poll::Ready(item)
819 }
820 }
821
822 fn size_hint(&self) -> (usize, Option<usize>) {
823 if self.is_stopped() {
824 return (0, Some(0));
825 }
826
827 // Original stream can be truncated at any moment, so the lower bound isn't reliable.
828 let (_, upper_bound) = self.stream.size_hint();
829 (0, upper_bound)
830 }
831}
832
833/// Extension trait for [`Stream`].
834pub trait StreamExt: Stream {
835 /// A convenience for calling [`Stream::poll_next()`] on `!`[`Unpin`] types.
836 fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
837 where
838 Self: Unpin,
839 {
840 Stream::poll_next(Pin::new(self), cx)
841 }
842
843 /// Retrieves the next item in the stream.
844 ///
845 /// Returns [`None`] when iteration is finished. Stream implementations may choose to or not to
846 /// resume iteration after that.
847 ///
848 /// # Examples
849 ///
850 /// ```
851 /// use futures_lite::stream::{self, StreamExt};
852 ///
853 /// # spin_on::spin_on(async {
854 /// let mut s = stream::iter(1..=3);
855 ///
856 /// assert_eq!(s.next().await, Some(1));
857 /// assert_eq!(s.next().await, Some(2));
858 /// assert_eq!(s.next().await, Some(3));
859 /// assert_eq!(s.next().await, None);
860 /// # });
861 /// ```
862 fn next(&mut self) -> NextFuture<'_, Self>
863 where
864 Self: Unpin,
865 {
866 NextFuture { stream: self }
867 }
868
869 /// Retrieves the next item in the stream.
870 ///
871 /// This is similar to the [`next()`][`StreamExt::next()`] method, but returns
872 /// `Result<Option<T>, E>` rather than `Option<Result<T, E>>`.
873 ///
874 /// Note that `s.try_next().await` is equivalent to `s.next().await.transpose()`.
875 ///
876 /// # Examples
877 ///
878 /// ```
879 /// use futures_lite::stream::{self, StreamExt};
880 ///
881 /// # spin_on::spin_on(async {
882 /// let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]);
883 ///
884 /// assert_eq!(s.try_next().await, Ok(Some(1)));
885 /// assert_eq!(s.try_next().await, Ok(Some(2)));
886 /// assert_eq!(s.try_next().await, Err("error"));
887 /// assert_eq!(s.try_next().await, Ok(None));
888 /// # });
889 /// ```
890 fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
891 where
892 Self: Stream<Item = Result<T, E>> + Unpin,
893 {
894 TryNextFuture { stream: self }
895 }
896
897 /// Counts the number of items in the stream.
898 ///
899 /// # Examples
900 ///
901 /// ```
902 /// use futures_lite::stream::{self, StreamExt};
903 ///
904 /// # spin_on::spin_on(async {
905 /// let s1 = stream::iter(vec![0]);
906 /// let s2 = stream::iter(vec![1, 2, 3]);
907 ///
908 /// assert_eq!(s1.count().await, 1);
909 /// assert_eq!(s2.count().await, 3);
910 /// # });
911 /// ```
912 fn count(self) -> CountFuture<Self>
913 where
914 Self: Sized,
915 {
916 CountFuture {
917 stream: self,
918 count: 0,
919 }
920 }
921
922 /// Maps items of the stream to new values using a closure.
923 ///
924 /// # Examples
925 ///
926 /// ```
927 /// use futures_lite::stream::{self, StreamExt};
928 ///
929 /// # spin_on::spin_on(async {
930 /// let s = stream::iter(vec![1, 2, 3]);
931 /// let mut s = s.map(|x| 2 * x);
932 ///
933 /// assert_eq!(s.next().await, Some(2));
934 /// assert_eq!(s.next().await, Some(4));
935 /// assert_eq!(s.next().await, Some(6));
936 /// assert_eq!(s.next().await, None);
937 /// # });
938 /// ```
939 fn map<T, F>(self, f: F) -> Map<Self, F>
940 where
941 Self: Sized,
942 F: FnMut(Self::Item) -> T,
943 {
944 Map { stream: self, f }
945 }
946
947 /// Maps items to streams and then concatenates them.
948 ///
949 /// # Examples
950 ///
951 /// ```
952 /// use futures_lite::stream::{self, StreamExt};
953 ///
954 /// # spin_on::spin_on(async {
955 /// let words = stream::iter(vec!["one", "two"]);
956 ///
957 /// let s: String = words
958 /// .flat_map(|s| stream::iter(s.chars()))
959 /// .collect()
960 /// .await;
961 ///
962 /// assert_eq!(s, "onetwo");
963 /// # });
964 /// ```
965 fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
966 where
967 Self: Sized,
968 U: Stream,
969 F: FnMut(Self::Item) -> U,
970 {
971 FlatMap {
972 stream: self.map(f),
973 inner_stream: None,
974 }
975 }
976
977 /// Concatenates inner streams.
978 ///
979 /// # Examples
980 ///
981 /// ```
982 /// use futures_lite::stream::{self, StreamExt};
983 ///
984 /// # spin_on::spin_on(async {
985 /// let s1 = stream::iter(vec![1, 2, 3]);
986 /// let s2 = stream::iter(vec![4, 5]);
987 ///
988 /// let s = stream::iter(vec![s1, s2]);
989 /// let v: Vec<_> = s.flatten().collect().await;
990 /// assert_eq!(v, [1, 2, 3, 4, 5]);
991 /// # });
992 /// ```
993 fn flatten(self) -> Flatten<Self>
994 where
995 Self: Sized,
996 Self::Item: Stream,
997 {
998 Flatten {
999 stream: self,
1000 inner_stream: None,
1001 }
1002 }
1003
1004 /// Maps items of the stream to new values using an async closure.
1005 ///
1006 /// # Examples
1007 ///
1008 /// ```
1009 /// use futures_lite::pin;
1010 /// use futures_lite::stream::{self, StreamExt};
1011 ///
1012 /// # spin_on::spin_on(async {
1013 /// let s = stream::iter(vec![1, 2, 3]);
1014 /// let mut s = s.then(|x| async move { 2 * x });
1015 ///
1016 /// pin!(s);
1017 /// assert_eq!(s.next().await, Some(2));
1018 /// assert_eq!(s.next().await, Some(4));
1019 /// assert_eq!(s.next().await, Some(6));
1020 /// assert_eq!(s.next().await, None);
1021 /// # });
1022 /// ```
1023 fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
1024 where
1025 Self: Sized,
1026 F: FnMut(Self::Item) -> Fut,
1027 Fut: Future,
1028 {
1029 Then {
1030 stream: self,
1031 future: None,
1032 f,
1033 }
1034 }
1035
1036 /// Keeps items of the stream for which `predicate` returns `true`.
1037 ///
1038 /// # Examples
1039 ///
1040 /// ```
1041 /// use futures_lite::stream::{self, StreamExt};
1042 ///
1043 /// # spin_on::spin_on(async {
1044 /// let s = stream::iter(vec![1, 2, 3, 4]);
1045 /// let mut s = s.filter(|i| i % 2 == 0);
1046 ///
1047 /// assert_eq!(s.next().await, Some(2));
1048 /// assert_eq!(s.next().await, Some(4));
1049 /// assert_eq!(s.next().await, None);
1050 /// # });
1051 /// ```
1052 fn filter<P>(self, predicate: P) -> Filter<Self, P>
1053 where
1054 Self: Sized,
1055 P: FnMut(&Self::Item) -> bool,
1056 {
1057 Filter {
1058 stream: self,
1059 predicate,
1060 }
1061 }
1062
1063 /// Filters and maps items of the stream using a closure.
1064 ///
1065 /// # Examples
1066 ///
1067 /// ```
1068 /// use futures_lite::stream::{self, StreamExt};
1069 ///
1070 /// # spin_on::spin_on(async {
1071 /// let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]);
1072 /// let mut s = s.filter_map(|a| a.parse::<u32>().ok());
1073 ///
1074 /// assert_eq!(s.next().await, Some(1));
1075 /// assert_eq!(s.next().await, Some(3));
1076 /// assert_eq!(s.next().await, Some(5));
1077 /// assert_eq!(s.next().await, None);
1078 /// # });
1079 /// ```
1080 fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
1081 where
1082 Self: Sized,
1083 F: FnMut(Self::Item) -> Option<T>,
1084 {
1085 FilterMap { stream: self, f }
1086 }
1087
1088 /// Takes only the first `n` items of the stream.
1089 ///
1090 /// # Examples
1091 ///
1092 /// ```
1093 /// use futures_lite::stream::{self, StreamExt};
1094 ///
1095 /// # spin_on::spin_on(async {
1096 /// let mut s = stream::repeat(7).take(2);
1097 ///
1098 /// assert_eq!(s.next().await, Some(7));
1099 /// assert_eq!(s.next().await, Some(7));
1100 /// assert_eq!(s.next().await, None);
1101 /// # });
1102 /// ```
1103 fn take(self, n: usize) -> Take<Self>
1104 where
1105 Self: Sized,
1106 {
1107 Take { stream: self, n }
1108 }
1109
1110 /// Takes items while `predicate` returns `true`.
1111 ///
1112 /// # Examples
1113 ///
1114 /// ```
1115 /// use futures_lite::stream::{self, StreamExt};
1116 ///
1117 /// # spin_on::spin_on(async {
1118 /// let s = stream::iter(vec![1, 2, 3, 4]);
1119 /// let mut s = s.take_while(|x| *x < 3);
1120 ///
1121 /// assert_eq!(s.next().await, Some(1));
1122 /// assert_eq!(s.next().await, Some(2));
1123 /// assert_eq!(s.next().await, None);
1124 /// # });
1125 /// ```
1126 fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
1127 where
1128 Self: Sized,
1129 P: FnMut(&Self::Item) -> bool,
1130 {
1131 TakeWhile {
1132 stream: self,
1133 predicate,
1134 }
1135 }
1136
1137 /// Maps items while `predicate` returns [`Some`].
1138 ///
1139 /// This stream is not fused. After the predicate returns [`None`] the stream still
1140 /// contains remaining items that can be obtained by subsequent `next` calls.
1141 /// You can [`fuse`](StreamExt::fuse) the stream if this behavior is undesirable.
1142 ///
1143 /// # Examples
1144 ///
1145 /// ```
1146 /// use futures_lite::stream::{self, StreamExt};
1147 ///
1148 /// # spin_on::spin_on(async {
1149 /// let s = stream::iter(vec![1, 2, 0, 3]);
1150 /// let mut s = s.map_while(|x: u32| x.checked_sub(1));
1151 ///
1152 /// assert_eq!(s.next().await, Some(0));
1153 /// assert_eq!(s.next().await, Some(1));
1154 /// assert_eq!(s.next().await, None);
1155 ///
1156 /// // Continue to iterate the stream.
1157 /// assert_eq!(s.next().await, Some(2));
1158 /// assert_eq!(s.next().await, None);
1159 /// # });
1160 /// ```
1161 fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
1162 where
1163 Self: Sized,
1164 P: FnMut(Self::Item) -> Option<B>,
1165 {
1166 MapWhile {
1167 stream: self,
1168 predicate,
1169 }
1170 }
1171
1172 /// Skips the first `n` items of the stream.
1173 ///
1174 /// # Examples
1175 ///
1176 /// ```
1177 /// use futures_lite::stream::{self, StreamExt};
1178 ///
1179 /// # spin_on::spin_on(async {
1180 /// let s = stream::iter(vec![1, 2, 3]);
1181 /// let mut s = s.skip(2);
1182 ///
1183 /// assert_eq!(s.next().await, Some(3));
1184 /// assert_eq!(s.next().await, None);
1185 /// # });
1186 /// ```
1187 fn skip(self, n: usize) -> Skip<Self>
1188 where
1189 Self: Sized,
1190 {
1191 Skip { stream: self, n }
1192 }
1193
1194 /// Skips items while `predicate` returns `true`.
1195 ///
1196 /// # Examples
1197 ///
1198 /// ```
1199 /// use futures_lite::stream::{self, StreamExt};
1200 ///
1201 /// # spin_on::spin_on(async {
1202 /// let s = stream::iter(vec![-1i32, 0, 1]);
1203 /// let mut s = s.skip_while(|x| x.is_negative());
1204 ///
1205 /// assert_eq!(s.next().await, Some(0));
1206 /// assert_eq!(s.next().await, Some(1));
1207 /// assert_eq!(s.next().await, None);
1208 /// # });
1209 /// ```
1210 fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
1211 where
1212 Self: Sized,
1213 P: FnMut(&Self::Item) -> bool,
1214 {
1215 SkipWhile {
1216 stream: self,
1217 predicate: Some(predicate),
1218 }
1219 }
1220
1221 /// Yields every `step`th item.
1222 ///
1223 /// # Panics
1224 ///
1225 /// This method will panic if the `step` is 0.
1226 ///
1227 /// # Examples
1228 ///
1229 /// ```
1230 /// use futures_lite::stream::{self, StreamExt};
1231 ///
1232 /// # spin_on::spin_on(async {
1233 /// let s = stream::iter(vec![0, 1, 2, 3, 4]);
1234 /// let mut s = s.step_by(2);
1235 ///
1236 /// assert_eq!(s.next().await, Some(0));
1237 /// assert_eq!(s.next().await, Some(2));
1238 /// assert_eq!(s.next().await, Some(4));
1239 /// assert_eq!(s.next().await, None);
1240 /// # });
1241 /// ```
1242 fn step_by(self, step: usize) -> StepBy<Self>
1243 where
1244 Self: Sized,
1245 {
1246 assert!(step > 0, "`step` must be greater than zero");
1247 StepBy {
1248 stream: self,
1249 step,
1250 i: 0,
1251 }
1252 }
1253
1254 /// Appends another stream to the end of this one.
1255 ///
1256 /// # Examples
1257 ///
1258 /// ```
1259 /// use futures_lite::stream::{self, StreamExt};
1260 ///
1261 /// # spin_on::spin_on(async {
1262 /// let s1 = stream::iter(vec![1, 2]);
1263 /// let s2 = stream::iter(vec![7, 8]);
1264 /// let mut s = s1.chain(s2);
1265 ///
1266 /// assert_eq!(s.next().await, Some(1));
1267 /// assert_eq!(s.next().await, Some(2));
1268 /// assert_eq!(s.next().await, Some(7));
1269 /// assert_eq!(s.next().await, Some(8));
1270 /// assert_eq!(s.next().await, None);
1271 /// # });
1272 /// ```
1273 fn chain<U>(self, other: U) -> Chain<Self, U>
1274 where
1275 Self: Sized,
1276 U: Stream<Item = Self::Item> + Sized,
1277 {
1278 Chain {
1279 first: self.fuse(),
1280 second: other.fuse(),
1281 }
1282 }
1283
1284 /// Clones all items.
1285 ///
1286 /// # Examples
1287 ///
1288 /// ```
1289 /// use futures_lite::stream::{self, StreamExt};
1290 ///
1291 /// # spin_on::spin_on(async {
1292 /// let s = stream::iter(vec![&1, &2]);
1293 /// let mut s = s.cloned();
1294 ///
1295 /// assert_eq!(s.next().await, Some(1));
1296 /// assert_eq!(s.next().await, Some(2));
1297 /// assert_eq!(s.next().await, None);
1298 /// # });
1299 /// ```
1300 fn cloned<'a, T>(self) -> Cloned<Self>
1301 where
1302 Self: Stream<Item = &'a T> + Sized,
1303 T: Clone + 'a,
1304 {
1305 Cloned { stream: self }
1306 }
1307
1308 /// Copies all items.
1309 ///
1310 /// # Examples
1311 ///
1312 /// ```
1313 /// use futures_lite::stream::{self, StreamExt};
1314 ///
1315 /// # spin_on::spin_on(async {
1316 /// let s = stream::iter(vec![&1, &2]);
1317 /// let mut s = s.copied();
1318 ///
1319 /// assert_eq!(s.next().await, Some(1));
1320 /// assert_eq!(s.next().await, Some(2));
1321 /// assert_eq!(s.next().await, None);
1322 /// # });
1323 /// ```
1324 fn copied<'a, T>(self) -> Copied<Self>
1325 where
1326 Self: Stream<Item = &'a T> + Sized,
1327 T: Copy + 'a,
1328 {
1329 Copied { stream: self }
1330 }
1331
1332 /// Collects all items in the stream into a collection.
1333 ///
1334 /// # Examples
1335 ///
1336 /// ```
1337 /// use futures_lite::stream::{self, StreamExt};
1338 ///
1339 /// # spin_on::spin_on(async {
1340 /// let mut s = stream::iter(1..=3);
1341 ///
1342 /// let items: Vec<_> = s.collect().await;
1343 /// assert_eq!(items, [1, 2, 3]);
1344 /// # });
1345 /// ```
1346 fn collect<C>(self) -> CollectFuture<Self, C>
1347 where
1348 Self: Sized,
1349 C: Default + Extend<Self::Item>,
1350 {
1351 CollectFuture {
1352 stream: self,
1353 collection: Default::default(),
1354 }
1355 }
1356
1357 /// Collects all items in the fallible stream into a collection.
1358 ///
1359 /// ```
1360 /// use futures_lite::stream::{self, StreamExt};
1361 ///
1362 /// # spin_on::spin_on(async {
1363 /// let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]);
1364 /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1365 /// assert_eq!(res, Err(2));
1366 ///
1367 /// let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1368 /// let res: Result<Vec<i32>, i32> = s.try_collect().await;
1369 /// assert_eq!(res, Ok(vec![1, 2, 3]));
1370 /// # })
1371 /// ```
1372 fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
1373 where
1374 Self: Stream<Item = Result<T, E>> + Sized,
1375 C: Default + Extend<T>,
1376 {
1377 TryCollectFuture {
1378 stream: self,
1379 items: Default::default(),
1380 }
1381 }
1382
1383 /// Partitions items into those for which `predicate` is `true` and those for which it is
1384 /// `false`, and then collects them into two collections.
1385 ///
1386 /// # Examples
1387 ///
1388 /// ```
1389 /// use futures_lite::stream::{self, StreamExt};
1390 ///
1391 /// # spin_on::spin_on(async {
1392 /// let s = stream::iter(vec![1, 2, 3]);
1393 /// let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await;
1394 ///
1395 /// assert_eq!(even, &[2]);
1396 /// assert_eq!(odd, &[1, 3]);
1397 /// # })
1398 /// ```
1399 fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
1400 where
1401 Self: Sized,
1402 B: Default + Extend<Self::Item>,
1403 P: FnMut(&Self::Item) -> bool,
1404 {
1405 PartitionFuture {
1406 stream: self,
1407 predicate,
1408 res: Some(Default::default()),
1409 }
1410 }
1411
1412 /// Accumulates a computation over the stream.
1413 ///
1414 /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1415 /// the accumulator and each item in the stream. The final accumulator value is returned.
1416 ///
1417 /// # Examples
1418 ///
1419 /// ```
1420 /// use futures_lite::stream::{self, StreamExt};
1421 ///
1422 /// # spin_on::spin_on(async {
1423 /// let s = stream::iter(vec![1, 2, 3]);
1424 /// let sum = s.fold(0, |acc, x| acc + x).await;
1425 ///
1426 /// assert_eq!(sum, 6);
1427 /// # })
1428 /// ```
1429 fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
1430 where
1431 Self: Sized,
1432 F: FnMut(T, Self::Item) -> T,
1433 {
1434 FoldFuture {
1435 stream: self,
1436 f,
1437 acc: Some(init),
1438 }
1439 }
1440
1441 /// Accumulates a fallible computation over the stream.
1442 ///
1443 /// The computation begins with the accumulator value set to `init`, and then applies `f` to
1444 /// the accumulator and each item in the stream. The final accumulator value is returned, or an
1445 /// error if `f` failed the computation.
1446 ///
1447 /// # Examples
1448 ///
1449 /// ```
1450 /// use futures_lite::stream::{self, StreamExt};
1451 ///
1452 /// # spin_on::spin_on(async {
1453 /// let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
1454 ///
1455 /// let sum = s.try_fold(0, |acc, v| {
1456 /// if (acc + v) % 2 == 1 {
1457 /// Ok(acc + v)
1458 /// } else {
1459 /// Err("fail")
1460 /// }
1461 /// })
1462 /// .await;
1463 ///
1464 /// assert_eq!(sum, Err("fail"));
1465 /// # })
1466 /// ```
1467 fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
1468 where
1469 Self: Stream<Item = Result<T, E>> + Unpin + Sized,
1470 F: FnMut(B, T) -> Result<B, E>,
1471 {
1472 TryFoldFuture {
1473 stream: self,
1474 f,
1475 acc: Some(init),
1476 }
1477 }
1478
1479 /// Maps items of the stream to new values using a state value and a closure.
1480 ///
1481 /// Scanning begins with the initial state set to `initial_state`, and then applies `f` to the
1482 /// state and each item in the stream. The stream stops when `f` returns `None`.
1483 ///
1484 /// # Examples
1485 ///
1486 /// ```
1487 /// use futures_lite::stream::{self, StreamExt};
1488 ///
1489 /// # spin_on::spin_on(async {
1490 /// let s = stream::iter(vec![1, 2, 3]);
1491 /// let mut s = s.scan(1, |state, x| {
1492 /// *state = *state * x;
1493 /// Some(-*state)
1494 /// });
1495 ///
1496 /// assert_eq!(s.next().await, Some(-1));
1497 /// assert_eq!(s.next().await, Some(-2));
1498 /// assert_eq!(s.next().await, Some(-6));
1499 /// assert_eq!(s.next().await, None);
1500 /// # })
1501 /// ```
1502 fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
1503 where
1504 Self: Sized,
1505 F: FnMut(&mut St, Self::Item) -> Option<B>,
1506 {
1507 Scan {
1508 stream: self,
1509 state_f: (initial_state, f),
1510 }
1511 }
1512
1513 /// Fuses the stream so that it stops yielding items after the first [`None`].
1514 ///
1515 /// # Examples
1516 ///
1517 /// ```
1518 /// use futures_lite::stream::{self, StreamExt};
1519 ///
1520 /// # spin_on::spin_on(async {
1521 /// let mut s = stream::once(1).fuse();
1522 ///
1523 /// assert_eq!(s.next().await, Some(1));
1524 /// assert_eq!(s.next().await, None);
1525 /// assert_eq!(s.next().await, None);
1526 /// # })
1527 /// ```
1528 fn fuse(self) -> Fuse<Self>
1529 where
1530 Self: Sized,
1531 {
1532 Fuse {
1533 stream: self,
1534 done: false,
1535 }
1536 }
1537
1538 /// Repeats the stream from beginning to end, forever.
1539 ///
1540 /// # Examples
1541 ///
1542 /// ```
1543 /// use futures_lite::stream::{self, StreamExt};
1544 ///
1545 /// # spin_on::spin_on(async {
1546 /// let mut s = stream::iter(vec![1, 2]).cycle();
1547 ///
1548 /// assert_eq!(s.next().await, Some(1));
1549 /// assert_eq!(s.next().await, Some(2));
1550 /// assert_eq!(s.next().await, Some(1));
1551 /// assert_eq!(s.next().await, Some(2));
1552 /// # });
1553 /// ```
1554 fn cycle(self) -> Cycle<Self>
1555 where
1556 Self: Clone + Sized,
1557 {
1558 Cycle {
1559 orig: self.clone(),
1560 stream: self,
1561 }
1562 }
1563
1564 /// Enumerates items, mapping them to `(index, item)`.
1565 ///
1566 /// # Examples
1567 ///
1568 /// ```
1569 /// use futures_lite::stream::{self, StreamExt};
1570 ///
1571 /// # spin_on::spin_on(async {
1572 /// let s = stream::iter(vec!['a', 'b', 'c']);
1573 /// let mut s = s.enumerate();
1574 ///
1575 /// assert_eq!(s.next().await, Some((0, 'a')));
1576 /// assert_eq!(s.next().await, Some((1, 'b')));
1577 /// assert_eq!(s.next().await, Some((2, 'c')));
1578 /// assert_eq!(s.next().await, None);
1579 /// # });
1580 /// ```
1581 fn enumerate(self) -> Enumerate<Self>
1582 where
1583 Self: Sized,
1584 {
1585 Enumerate { stream: self, i: 0 }
1586 }
1587
1588 /// Calls a closure on each item and passes it on.
1589 ///
1590 /// # Examples
1591 ///
1592 /// ```
1593 /// use futures_lite::stream::{self, StreamExt};
1594 ///
1595 /// # spin_on::spin_on(async {
1596 /// let s = stream::iter(vec![1, 2, 3, 4, 5]);
1597 ///
1598 /// let sum = s
1599 /// .inspect(|x| println!("about to filter {}", x))
1600 /// .filter(|x| x % 2 == 0)
1601 /// .inspect(|x| println!("made it through filter: {}", x))
1602 /// .fold(0, |sum, i| sum + i)
1603 /// .await;
1604 /// # });
1605 /// ```
1606 fn inspect<F>(self, f: F) -> Inspect<Self, F>
1607 where
1608 Self: Sized,
1609 F: FnMut(&Self::Item),
1610 {
1611 Inspect { stream: self, f }
1612 }
1613
1614 /// Gets the `n`th item of the stream.
1615 ///
1616 /// In the end, `n+1` items of the stream will be consumed.
1617 ///
1618 /// # Examples
1619 ///
1620 /// ```
1621 /// use futures_lite::stream::{self, StreamExt};
1622 ///
1623 /// # spin_on::spin_on(async {
1624 /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]);
1625 ///
1626 /// assert_eq!(s.nth(2).await, Some(2));
1627 /// assert_eq!(s.nth(2).await, Some(5));
1628 /// assert_eq!(s.nth(2).await, None);
1629 /// # });
1630 /// ```
1631 fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
1632 where
1633 Self: Unpin,
1634 {
1635 NthFuture { stream: self, n }
1636 }
1637
1638 /// Returns the last item in the stream.
1639 ///
1640 /// # Examples
1641 ///
1642 /// ```
1643 /// use futures_lite::stream::{self, StreamExt};
1644 ///
1645 /// # spin_on::spin_on(async {
1646 /// let s = stream::iter(vec![1, 2, 3, 4]);
1647 /// assert_eq!(s.last().await, Some(4));
1648 ///
1649 /// let s = stream::empty::<i32>();
1650 /// assert_eq!(s.last().await, None);
1651 /// # });
1652 /// ```
1653 fn last(self) -> LastFuture<Self>
1654 where
1655 Self: Sized,
1656 {
1657 LastFuture {
1658 stream: self,
1659 last: None,
1660 }
1661 }
1662
1663 /// Finds the first item of the stream for which `predicate` returns `true`.
1664 ///
1665 /// # Examples
1666 ///
1667 /// ```
1668 /// use futures_lite::stream::{self, StreamExt};
1669 ///
1670 /// # spin_on::spin_on(async {
1671 /// let mut s = stream::iter(vec![11, 12, 13, 14]);
1672 ///
1673 /// assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12));
1674 /// assert_eq!(s.next().await, Some(13));
1675 /// # });
1676 /// ```
1677 fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
1678 where
1679 Self: Unpin,
1680 P: FnMut(&Self::Item) -> bool,
1681 {
1682 FindFuture {
1683 stream: self,
1684 predicate,
1685 }
1686 }
1687
1688 /// Applies a closure to items in the stream and returns the first [`Some`] result.
1689 ///
1690 /// # Examples
1691 ///
1692 /// ```
1693 /// use futures_lite::stream::{self, StreamExt};
1694 ///
1695 /// # spin_on::spin_on(async {
1696 /// let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]);
1697 /// let number = s.find_map(|s| s.parse().ok()).await;
1698 ///
1699 /// assert_eq!(number, Some(2));
1700 /// # });
1701 /// ```
1702 fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
1703 where
1704 Self: Unpin,
1705 F: FnMut(Self::Item) -> Option<B>,
1706 {
1707 FindMapFuture { stream: self, f }
1708 }
1709
1710 /// Finds the index of the first item of the stream for which `predicate` returns `true`.
1711 ///
1712 /// # Examples
1713 ///
1714 /// ```
1715 /// use futures_lite::stream::{self, StreamExt};
1716 ///
1717 /// # spin_on::spin_on(async {
1718 /// let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]);
1719 ///
1720 /// assert_eq!(s.position(|x| x == 2).await, Some(2));
1721 /// assert_eq!(s.position(|x| x == 3).await, Some(0));
1722 /// assert_eq!(s.position(|x| x == 9).await, None);
1723 /// # });
1724 /// ```
1725 fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
1726 where
1727 Self: Unpin,
1728 P: FnMut(Self::Item) -> bool,
1729 {
1730 PositionFuture {
1731 stream: self,
1732 predicate,
1733 index: 0,
1734 }
1735 }
1736
1737 /// Tests if `predicate` returns `true` for all items in the stream.
1738 ///
1739 /// The result is `true` for an empty stream.
1740 ///
1741 /// # Examples
1742 ///
1743 /// ```
1744 /// use futures_lite::stream::{self, StreamExt};
1745 ///
1746 /// # spin_on::spin_on(async {
1747 /// let mut s = stream::iter(vec![1, 2, 3]);
1748 /// assert!(!s.all(|x| x % 2 == 0).await);
1749 ///
1750 /// let mut s = stream::iter(vec![2, 4, 6, 8]);
1751 /// assert!(s.all(|x| x % 2 == 0).await);
1752 ///
1753 /// let mut s = stream::empty::<i32>();
1754 /// assert!(s.all(|x| x % 2 == 0).await);
1755 /// # });
1756 /// ```
1757 fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
1758 where
1759 Self: Unpin,
1760 P: FnMut(Self::Item) -> bool,
1761 {
1762 AllFuture {
1763 stream: self,
1764 predicate,
1765 }
1766 }
1767
1768 /// Tests if `predicate` returns `true` for any item in the stream.
1769 ///
1770 /// The result is `false` for an empty stream.
1771 ///
1772 /// # Examples
1773 ///
1774 /// ```
1775 /// use futures_lite::stream::{self, StreamExt};
1776 ///
1777 /// # spin_on::spin_on(async {
1778 /// let mut s = stream::iter(vec![1, 3, 5, 7]);
1779 /// assert!(!s.any(|x| x % 2 == 0).await);
1780 ///
1781 /// let mut s = stream::iter(vec![1, 2, 3]);
1782 /// assert!(s.any(|x| x % 2 == 0).await);
1783 ///
1784 /// let mut s = stream::empty::<i32>();
1785 /// assert!(!s.any(|x| x % 2 == 0).await);
1786 /// # });
1787 /// ```
1788 fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
1789 where
1790 Self: Unpin,
1791 P: FnMut(Self::Item) -> bool,
1792 {
1793 AnyFuture {
1794 stream: self,
1795 predicate,
1796 }
1797 }
1798
1799 /// Calls a closure on each item of the stream.
1800 ///
1801 /// # Examples
1802 ///
1803 /// ```
1804 /// use futures_lite::stream::{self, StreamExt};
1805 ///
1806 /// # spin_on::spin_on(async {
1807 /// let mut s = stream::iter(vec![1, 2, 3]);
1808 /// s.for_each(|s| println!("{}", s)).await;
1809 /// # });
1810 /// ```
1811 fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
1812 where
1813 Self: Sized,
1814 F: FnMut(Self::Item),
1815 {
1816 ForEachFuture { stream: self, f }
1817 }
1818
1819 /// Calls a fallible closure on each item of the stream, stopping on first error.
1820 ///
1821 /// # Examples
1822 ///
1823 /// ```
1824 /// use futures_lite::stream::{self, StreamExt};
1825 ///
1826 /// # spin_on::spin_on(async {
1827 /// let mut s = stream::iter(vec![0, 1, 2, 3]);
1828 ///
1829 /// let mut v = vec![];
1830 /// let res = s
1831 /// .try_for_each(|n| {
1832 /// if n < 2 {
1833 /// v.push(n);
1834 /// Ok(())
1835 /// } else {
1836 /// Err("too big")
1837 /// }
1838 /// })
1839 /// .await;
1840 ///
1841 /// assert_eq!(v, &[0, 1]);
1842 /// assert_eq!(res, Err("too big"));
1843 /// # });
1844 /// ```
1845 fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
1846 where
1847 Self: Unpin,
1848 F: FnMut(Self::Item) -> Result<(), E>,
1849 {
1850 TryForEachFuture { stream: self, f }
1851 }
1852
1853 /// Zips up two streams into a single stream of pairs.
1854 ///
1855 /// The stream of pairs stops when either of the original two streams is exhausted.
1856 ///
1857 /// # Examples
1858 ///
1859 /// ```
1860 /// use futures_lite::stream::{self, StreamExt};
1861 ///
1862 /// # spin_on::spin_on(async {
1863 /// let l = stream::iter(vec![1, 2, 3]);
1864 /// let r = stream::iter(vec![4, 5, 6, 7]);
1865 /// let mut s = l.zip(r);
1866 ///
1867 /// assert_eq!(s.next().await, Some((1, 4)));
1868 /// assert_eq!(s.next().await, Some((2, 5)));
1869 /// assert_eq!(s.next().await, Some((3, 6)));
1870 /// assert_eq!(s.next().await, None);
1871 /// # });
1872 /// ```
1873 fn zip<U>(self, other: U) -> Zip<Self, U>
1874 where
1875 Self: Sized,
1876 U: Stream,
1877 {
1878 Zip {
1879 item_slot: None,
1880 first: self,
1881 second: other,
1882 }
1883 }
1884
1885 /// Collects a stream of pairs into a pair of collections.
1886 ///
1887 /// # Examples
1888 ///
1889 /// ```
1890 /// use futures_lite::stream::{self, StreamExt};
1891 ///
1892 /// # spin_on::spin_on(async {
1893 /// let s = stream::iter(vec![(1, 2), (3, 4)]);
1894 /// let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;
1895 ///
1896 /// assert_eq!(left, [1, 3]);
1897 /// assert_eq!(right, [2, 4]);
1898 /// # });
1899 /// ```
1900 fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
1901 where
1902 FromA: Default + Extend<A>,
1903 FromB: Default + Extend<B>,
1904 Self: Stream<Item = (A, B)> + Sized,
1905 {
1906 UnzipFuture {
1907 stream: self,
1908 res: Some(Default::default()),
1909 }
1910 }
1911
1912 /// Merges with `other` stream, preferring items from `self` whenever both streams are ready.
1913 ///
1914 /// # Examples
1915 ///
1916 /// ```
1917 /// use futures_lite::stream::{self, StreamExt};
1918 /// use futures_lite::stream::{once, pending};
1919 ///
1920 /// # spin_on::spin_on(async {
1921 /// assert_eq!(once(1).or(pending()).next().await, Some(1));
1922 /// assert_eq!(pending().or(once(2)).next().await, Some(2));
1923 ///
1924 /// // The first future wins.
1925 /// assert_eq!(once(1).or(once(2)).next().await, Some(1));
1926 /// # })
1927 /// ```
1928 fn or<S>(self, other: S) -> Or<Self, S>
1929 where
1930 Self: Sized,
1931 S: Stream<Item = Self::Item>,
1932 {
1933 Or {
1934 stream1: self,
1935 stream2: other,
1936 }
1937 }
1938
1939 /// Merges with `other` stream, with no preference for either stream when both are ready.
1940 ///
1941 /// # Examples
1942 ///
1943 /// ```
1944 /// use futures_lite::stream::{self, StreamExt};
1945 /// use futures_lite::stream::{once, pending};
1946 ///
1947 /// # spin_on::spin_on(async {
1948 /// assert_eq!(once(1).race(pending()).next().await, Some(1));
1949 /// assert_eq!(pending().race(once(2)).next().await, Some(2));
1950 ///
1951 /// // One of the two stream is randomly chosen as the winner.
1952 /// let res = once(1).race(once(2)).next().await;
1953 /// # })
1954 /// ```
1955 #[cfg(all(feature = "std", feature = "race"))]
1956 fn race<S>(self, other: S) -> Race<Self, S>
1957 where
1958 Self: Sized,
1959 S: Stream<Item = Self::Item>,
1960 {
1961 Race {
1962 stream1: self,
1963 stream2: other,
1964 rng: Rng::new(),
1965 }
1966 }
1967
1968 /// Yields all immediately available values from a stream.
1969 ///
1970 /// This is intended to be used as a way of polling a stream without waiting, similar to the
1971 /// [`try_iter`] function on [`std::sync::mpsc::Receiver`]. For instance, running this stream
1972 /// on an [`async_channel::Receiver`] will return all messages that are currently in the
1973 /// channel, but will not wait for new messages.
1974 ///
1975 /// This returns a [`Stream`] instead of an [`Iterator`] because it still needs access to the
1976 /// polling context in order to poll the underlying stream. Since this stream will never return
1977 /// `Poll::Pending`, wrapping it in [`block_on`] will allow it to be effectively used as an
1978 /// [`Iterator`].
1979 ///
1980 /// This stream is not necessarily fused. After it returns `None`, it can return `Some(x)` in
1981 /// the future if it is polled again.
1982 ///
1983 /// [`try_iter`]: std::sync::mpsc::Receiver::try_iter
1984 /// [`async_channel::Receiver`]: https://docs.rs/async-channel/latest/async_channel/struct.Receiver.html
1985 /// [`Stream`]: crate::stream::Stream
1986 /// [`Iterator`]: std::iter::Iterator
1987 ///
1988 /// # Examples
1989 ///
1990 /// ```
1991 /// use futures_lite::{future, pin};
1992 /// use futures_lite::stream::{self, StreamExt};
1993 ///
1994 /// # #[cfg(feature = "std")] {
1995 /// // A stream that yields two values, returns `Pending`, and then yields one more value.
1996 /// let pend_once = stream::once_future(async {
1997 /// future::yield_now().await;
1998 /// 3
1999 /// });
2000 /// let s = stream::iter(vec![1, 2]).chain(pend_once);
2001 /// pin!(s);
2002 ///
2003 /// // This will return the first two values, and then `None` because the stream returns
2004 /// // `Pending` after that.
2005 /// let mut iter = stream::block_on(s.drain());
2006 /// assert_eq!(iter.next(), Some(1));
2007 /// assert_eq!(iter.next(), Some(2));
2008 /// assert_eq!(iter.next(), None);
2009 ///
2010 /// // This will return the last value, because the stream returns `Ready` when polled.
2011 /// assert_eq!(iter.next(), Some(3));
2012 /// assert_eq!(iter.next(), None);
2013 /// # }
2014 /// ```
2015 fn drain(&mut self) -> Drain<'_, Self> {
2016 Drain { stream: self }
2017 }
2018
2019 /// Boxes the stream and changes its type to `dyn Stream + Send + 'a`.
2020 ///
2021 /// # Examples
2022 ///
2023 /// ```
2024 /// use futures_lite::stream::{self, StreamExt};
2025 ///
2026 /// # spin_on::spin_on(async {
2027 /// let a = stream::once(1);
2028 /// let b = stream::empty();
2029 ///
2030 /// // Streams of different types can be stored in
2031 /// // the same collection when they are boxed:
2032 /// let streams = vec![a.boxed(), b.boxed()];
2033 /// # })
2034 /// ```
2035 #[cfg(feature = "alloc")]
2036 fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
2037 where
2038 Self: Send + Sized + 'a,
2039 {
2040 Box::pin(self)
2041 }
2042
2043 /// Boxes the stream and changes its type to `dyn Stream + 'a`.
2044 ///
2045 /// # Examples
2046 ///
2047 /// ```
2048 /// use futures_lite::stream::{self, StreamExt};
2049 ///
2050 /// # spin_on::spin_on(async {
2051 /// let a = stream::once(1);
2052 /// let b = stream::empty();
2053 ///
2054 /// // Streams of different types can be stored in
2055 /// // the same collection when they are boxed:
2056 /// let streams = vec![a.boxed_local(), b.boxed_local()];
2057 /// # })
2058 /// ```
2059 #[cfg(feature = "alloc")]
2060 fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
2061 where
2062 Self: Sized + 'a,
2063 {
2064 Box::pin(self)
2065 }
2066}
2067
2068impl<S: Stream + ?Sized> StreamExt for S {}
2069
2070/// Type alias for `Pin<Box<dyn Stream<Item = T> + Send + 'static>>`.
2071///
2072/// # Examples
2073///
2074/// ```
2075/// use futures_lite::stream::{self, StreamExt};
2076///
2077/// // These two lines are equivalent:
2078/// let s1: stream::Boxed<i32> = stream::once(7).boxed();
2079/// let s2: stream::Boxed<i32> = Box::pin(stream::once(7));
2080/// ```
2081#[cfg(feature = "alloc")]
2082pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send + 'static>>;
2083
2084/// Type alias for `Pin<Box<dyn Stream<Item = T> + 'static>>`.
2085///
2086/// # Examples
2087///
2088/// ```
2089/// use futures_lite::stream::{self, StreamExt};
2090///
2091/// // These two lines are equivalent:
2092/// let s1: stream::BoxedLocal<i32> = stream::once(7).boxed_local();
2093/// let s2: stream::BoxedLocal<i32> = Box::pin(stream::once(7));
2094/// ```
2095#[cfg(feature = "alloc")]
2096pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T> + 'static>>;
2097
2098/// Future for the [`StreamExt::next()`] method.
2099#[derive(Debug)]
2100#[must_use = "futures do nothing unless you `.await` or poll them"]
2101pub struct NextFuture<'a, S: ?Sized> {
2102 stream: &'a mut S,
2103}
2104
2105impl<S: Unpin + ?Sized> Unpin for NextFuture<'_, S> {}
2106
2107impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
2108 type Output = Option<S::Item>;
2109
2110 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2111 self.stream.poll_next(cx)
2112 }
2113}
2114
2115/// Future for the [`StreamExt::try_next()`] method.
2116#[derive(Debug)]
2117#[must_use = "futures do nothing unless you `.await` or poll them"]
2118pub struct TryNextFuture<'a, S: ?Sized> {
2119 stream: &'a mut S,
2120}
2121
2122impl<S: Unpin + ?Sized> Unpin for TryNextFuture<'_, S> {}
2123
2124impl<T, E, S> Future for TryNextFuture<'_, S>
2125where
2126 S: Stream<Item = Result<T, E>> + Unpin + ?Sized,
2127{
2128 type Output = Result<Option<T>, E>;
2129
2130 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2131 let res = ready!(self.stream.poll_next(cx));
2132 Poll::Ready(res.transpose())
2133 }
2134}
2135
2136pin_project! {
2137 /// Future for the [`StreamExt::count()`] method.
2138 #[derive(Debug)]
2139 #[must_use = "futures do nothing unless you `.await` or poll them"]
2140 pub struct CountFuture<S: ?Sized> {
2141 count: usize,
2142 #[pin]
2143 stream: S,
2144 }
2145}
2146
2147impl<S: Stream + ?Sized> Future for CountFuture<S> {
2148 type Output = usize;
2149
2150 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2151 loop {
2152 match ready!(self.as_mut().project().stream.poll_next(cx)) {
2153 None => return Poll::Ready(self.count),
2154 Some(_) => *self.as_mut().project().count += 1,
2155 }
2156 }
2157 }
2158}
2159
2160pin_project! {
2161 /// Future for the [`StreamExt::collect()`] method.
2162 #[derive(Debug)]
2163 #[must_use = "futures do nothing unless you `.await` or poll them"]
2164 pub struct CollectFuture<S, C> {
2165 #[pin]
2166 stream: S,
2167 collection: C,
2168 }
2169}
2170
2171impl<S, C> Future for CollectFuture<S, C>
2172where
2173 S: Stream,
2174 C: Default + Extend<S::Item>,
2175{
2176 type Output = C;
2177
2178 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
2179 let mut this = self.as_mut().project();
2180 loop {
2181 match ready!(this.stream.as_mut().poll_next(cx)) {
2182 Some(e) => this.collection.extend(Some(e)),
2183 None => return Poll::Ready(mem::take(self.project().collection)),
2184 }
2185 }
2186 }
2187}
2188
2189pin_project! {
2190 /// Future for the [`StreamExt::try_collect()`] method.
2191 #[derive(Debug)]
2192 #[must_use = "futures do nothing unless you `.await` or poll them"]
2193 pub struct TryCollectFuture<S, C> {
2194 #[pin]
2195 stream: S,
2196 items: C,
2197 }
2198}
2199
2200impl<T, E, S, C> Future for TryCollectFuture<S, C>
2201where
2202 S: Stream<Item = Result<T, E>>,
2203 C: Default + Extend<T>,
2204{
2205 type Output = Result<C, E>;
2206
2207 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2208 let mut this = self.project();
2209 Poll::Ready(Ok(loop {
2210 match ready!(this.stream.as_mut().poll_next(cx)?) {
2211 Some(x) => this.items.extend(Some(x)),
2212 None => break mem::take(this.items),
2213 }
2214 }))
2215 }
2216}
2217
2218pin_project! {
2219 /// Future for the [`StreamExt::partition()`] method.
2220 #[derive(Debug)]
2221 #[must_use = "futures do nothing unless you `.await` or poll them"]
2222 pub struct PartitionFuture<S, P, B> {
2223 #[pin]
2224 stream: S,
2225 predicate: P,
2226 res: Option<(B, B)>,
2227 }
2228}
2229
2230impl<S, P, B> Future for PartitionFuture<S, P, B>
2231where
2232 S: Stream + Sized,
2233 P: FnMut(&S::Item) -> bool,
2234 B: Default + Extend<S::Item>,
2235{
2236 type Output = (B, B);
2237
2238 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2239 let mut this = self.project();
2240 loop {
2241 match ready!(this.stream.as_mut().poll_next(cx)) {
2242 Some(v) => {
2243 let res = this.res.as_mut().unwrap();
2244 if (this.predicate)(&v) {
2245 res.0.extend(Some(v))
2246 } else {
2247 res.1.extend(Some(v))
2248 }
2249 }
2250 None => return Poll::Ready(this.res.take().unwrap()),
2251 }
2252 }
2253 }
2254}
2255
2256pin_project! {
2257 /// Future for the [`StreamExt::fold()`] method.
2258 #[derive(Debug)]
2259 #[must_use = "futures do nothing unless you `.await` or poll them"]
2260 pub struct FoldFuture<S, F, T> {
2261 #[pin]
2262 stream: S,
2263 f: F,
2264 acc: Option<T>,
2265 }
2266}
2267
2268impl<S, F, T> Future for FoldFuture<S, F, T>
2269where
2270 S: Stream,
2271 F: FnMut(T, S::Item) -> T,
2272{
2273 type Output = T;
2274
2275 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2276 let mut this = self.project();
2277 loop {
2278 match ready!(this.stream.as_mut().poll_next(cx)) {
2279 Some(v) => {
2280 let old = this.acc.take().unwrap();
2281 let new = (this.f)(old, v);
2282 *this.acc = Some(new);
2283 }
2284 None => return Poll::Ready(this.acc.take().unwrap()),
2285 }
2286 }
2287 }
2288}
2289
2290/// Future for the [`StreamExt::try_fold()`] method.
2291#[derive(Debug)]
2292#[must_use = "futures do nothing unless you `.await` or poll them"]
2293pub struct TryFoldFuture<'a, S, F, B> {
2294 stream: &'a mut S,
2295 f: F,
2296 acc: Option<B>,
2297}
2298
2299impl<S, F, B> Unpin for TryFoldFuture<'_, S, F, B> {}
2300
2301impl<T, E, S, F, B> Future for TryFoldFuture<'_, S, F, B>
2302where
2303 S: Stream<Item = Result<T, E>> + Unpin,
2304 F: FnMut(B, T) -> Result<B, E>,
2305{
2306 type Output = Result<B, E>;
2307
2308 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
2309 loop {
2310 match ready!(self.stream.poll_next(cx)) {
2311 Some(Err(e)) => return Poll::Ready(Err(e)),
2312 Some(Ok(t)) => {
2313 let old = self.acc.take().unwrap();
2314 let new = (&mut self.f)(old, t);
2315
2316 match new {
2317 Ok(t) => self.acc = Some(t),
2318 Err(e) => return Poll::Ready(Err(e)),
2319 }
2320 }
2321 None => return Poll::Ready(Ok(self.acc.take().unwrap())),
2322 }
2323 }
2324 }
2325}
2326
2327pin_project! {
2328 /// Stream for the [`StreamExt::scan()`] method.
2329 #[derive(Clone, Debug)]
2330 #[must_use = "streams do nothing unless polled"]
2331 pub struct Scan<S, St, F> {
2332 #[pin]
2333 stream: S,
2334 state_f: (St, F),
2335 }
2336}
2337
2338impl<S, St, F, B> Stream for Scan<S, St, F>
2339where
2340 S: Stream,
2341 F: FnMut(&mut St, S::Item) -> Option<B>,
2342{
2343 type Item = B;
2344
2345 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
2346 let mut this = self.project();
2347 this.stream.as_mut().poll_next(cx).map(|item| {
2348 item.and_then(|item| {
2349 let (state, f) = this.state_f;
2350 f(state, item)
2351 })
2352 })
2353 }
2354}
2355
2356pin_project! {
2357 /// Stream for the [`StreamExt::fuse()`] method.
2358 #[derive(Clone, Debug)]
2359 #[must_use = "streams do nothing unless polled"]
2360 pub struct Fuse<S> {
2361 #[pin]
2362 stream: S,
2363 done: bool,
2364 }
2365}
2366
2367impl<S: Stream> Stream for Fuse<S> {
2368 type Item = S::Item;
2369
2370 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2371 let this = self.project();
2372
2373 if *this.done {
2374 Poll::Ready(None)
2375 } else {
2376 let next = ready!(this.stream.poll_next(cx));
2377 if next.is_none() {
2378 *this.done = true;
2379 }
2380 Poll::Ready(next)
2381 }
2382 }
2383}
2384
2385pin_project! {
2386 /// Stream for the [`StreamExt::map()`] method.
2387 #[derive(Clone, Debug)]
2388 #[must_use = "streams do nothing unless polled"]
2389 pub struct Map<S, F> {
2390 #[pin]
2391 stream: S,
2392 f: F,
2393 }
2394}
2395
2396impl<S, F, T> Stream for Map<S, F>
2397where
2398 S: Stream,
2399 F: FnMut(S::Item) -> T,
2400{
2401 type Item = T;
2402
2403 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2404 let this = self.project();
2405 let next = ready!(this.stream.poll_next(cx));
2406 Poll::Ready(next.map(this.f))
2407 }
2408
2409 fn size_hint(&self) -> (usize, Option<usize>) {
2410 self.stream.size_hint()
2411 }
2412}
2413
2414pin_project! {
2415 /// Stream for the [`StreamExt::flat_map()`] method.
2416 #[derive(Clone, Debug)]
2417 #[must_use = "streams do nothing unless polled"]
2418 pub struct FlatMap<S, U, F> {
2419 #[pin]
2420 stream: Map<S, F>,
2421 #[pin]
2422 inner_stream: Option<U>,
2423 }
2424}
2425
2426impl<S, U, F> Stream for FlatMap<S, U, F>
2427where
2428 S: Stream,
2429 U: Stream,
2430 F: FnMut(S::Item) -> U,
2431{
2432 type Item = U::Item;
2433
2434 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2435 let mut this = self.project();
2436 loop {
2437 if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2438 match ready!(inner.poll_next(cx)) {
2439 Some(item) => return Poll::Ready(Some(item)),
2440 None => this.inner_stream.set(None),
2441 }
2442 }
2443
2444 match ready!(this.stream.as_mut().poll_next(cx)) {
2445 Some(stream) => this.inner_stream.set(Some(stream)),
2446 None => return Poll::Ready(None),
2447 }
2448 }
2449 }
2450}
2451
2452pin_project! {
2453 /// Stream for the [`StreamExt::flatten()`] method.
2454 #[derive(Clone, Debug)]
2455 #[must_use = "streams do nothing unless polled"]
2456 pub struct Flatten<S: Stream> {
2457 #[pin]
2458 stream: S,
2459 #[pin]
2460 inner_stream: Option<S::Item>,
2461 }
2462}
2463
2464impl<S, U> Stream for Flatten<S>
2465where
2466 S: Stream<Item = U>,
2467 U: Stream,
2468{
2469 type Item = U::Item;
2470
2471 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2472 let mut this = self.project();
2473 loop {
2474 if let Some(inner) = this.inner_stream.as_mut().as_pin_mut() {
2475 match ready!(inner.poll_next(cx)) {
2476 Some(item) => return Poll::Ready(Some(item)),
2477 None => this.inner_stream.set(None),
2478 }
2479 }
2480
2481 match ready!(this.stream.as_mut().poll_next(cx)) {
2482 Some(inner) => this.inner_stream.set(Some(inner)),
2483 None => return Poll::Ready(None),
2484 }
2485 }
2486 }
2487}
2488
2489pin_project! {
2490 /// Stream for the [`StreamExt::then()`] method.
2491 #[derive(Clone, Debug)]
2492 #[must_use = "streams do nothing unless polled"]
2493 pub struct Then<S, F, Fut> {
2494 #[pin]
2495 stream: S,
2496 #[pin]
2497 future: Option<Fut>,
2498 f: F,
2499 }
2500}
2501
2502impl<S, F, Fut> Stream for Then<S, F, Fut>
2503where
2504 S: Stream,
2505 F: FnMut(S::Item) -> Fut,
2506 Fut: Future,
2507{
2508 type Item = Fut::Output;
2509
2510 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2511 let mut this = self.project();
2512
2513 loop {
2514 if let Some(fut) = this.future.as_mut().as_pin_mut() {
2515 let item = ready!(fut.poll(cx));
2516 this.future.set(None);
2517 return Poll::Ready(Some(item));
2518 } else if let Some(item) = ready!(this.stream.as_mut().poll_next(cx)) {
2519 this.future.set(Some((this.f)(item)));
2520 } else {
2521 return Poll::Ready(None);
2522 }
2523 }
2524 }
2525
2526 fn size_hint(&self) -> (usize, Option<usize>) {
2527 let future_len = self.future.is_some() as usize;
2528 let (lower, upper) = self.stream.size_hint();
2529 let lower = lower.saturating_add(future_len);
2530 let upper = upper.and_then(|u| u.checked_add(future_len));
2531 (lower, upper)
2532 }
2533}
2534
2535pin_project! {
2536 /// Stream for the [`StreamExt::filter()`] method.
2537 #[derive(Clone, Debug)]
2538 #[must_use = "streams do nothing unless polled"]
2539 pub struct Filter<S, P> {
2540 #[pin]
2541 stream: S,
2542 predicate: P,
2543 }
2544}
2545
2546impl<S, P> Stream for Filter<S, P>
2547where
2548 S: Stream,
2549 P: FnMut(&S::Item) -> bool,
2550{
2551 type Item = S::Item;
2552
2553 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2554 let mut this = self.project();
2555 loop {
2556 match ready!(this.stream.as_mut().poll_next(cx)) {
2557 None => return Poll::Ready(None),
2558 Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
2559 Some(_) => {}
2560 }
2561 }
2562 }
2563
2564 fn size_hint(&self) -> (usize, Option<usize>) {
2565 let (_, hi) = self.stream.size_hint();
2566
2567 // If the filter matches all of the elements, it will match the stream's upper bound.
2568 // If the filter matches none of the elements, there will be zero returned values.
2569 (0, hi)
2570 }
2571}
2572
2573/// Merges two streams, preferring items from `stream1` whenever both streams are ready.
2574///
2575/// # Examples
2576///
2577/// ```
2578/// use futures_lite::stream::{self, once, pending, StreamExt};
2579///
2580/// # spin_on::spin_on(async {
2581/// assert_eq!(stream::or(once(1), pending()).next().await, Some(1));
2582/// assert_eq!(stream::or(pending(), once(2)).next().await, Some(2));
2583///
2584/// // The first stream wins.
2585/// assert_eq!(stream::or(once(1), once(2)).next().await, Some(1));
2586/// # })
2587/// ```
2588pub fn or<T, S1, S2>(stream1: S1, stream2: S2) -> Or<S1, S2>
2589where
2590 S1: Stream<Item = T>,
2591 S2: Stream<Item = T>,
2592{
2593 Or { stream1, stream2 }
2594}
2595
2596pin_project! {
2597 /// Stream for the [`or()`] function and the [`StreamExt::or()`] method.
2598 #[derive(Clone, Debug)]
2599 #[must_use = "streams do nothing unless polled"]
2600 pub struct Or<S1, S2> {
2601 #[pin]
2602 stream1: S1,
2603 #[pin]
2604 stream2: S2,
2605 }
2606}
2607
2608impl<T, S1, S2> Stream for Or<S1, S2>
2609where
2610 S1: Stream<Item = T>,
2611 S2: Stream<Item = T>,
2612{
2613 type Item = T;
2614
2615 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2616 let mut this = self.project();
2617
2618 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2619 return Poll::Ready(Some(t));
2620 }
2621 this.stream2.as_mut().poll_next(cx)
2622 }
2623}
2624
2625/// Merges two streams, with no preference for either stream when both are ready.
2626///
2627/// # Examples
2628///
2629/// ```
2630/// use futures_lite::stream::{self, once, pending, StreamExt};
2631///
2632/// # spin_on::spin_on(async {
2633/// assert_eq!(stream::race(once(1), pending()).next().await, Some(1));
2634/// assert_eq!(stream::race(pending(), once(2)).next().await, Some(2));
2635///
2636/// // One of the two stream is randomly chosen as the winner.
2637/// let res = stream::race(once(1), once(2)).next().await;
2638/// # })
2639/// ```
2640#[cfg(all(feature = "std", feature = "race"))]
2641pub fn race<T, S1, S2>(stream1: S1, stream2: S2) -> Race<S1, S2>
2642where
2643 S1: Stream<Item = T>,
2644 S2: Stream<Item = T>,
2645{
2646 Race {
2647 stream1,
2648 stream2,
2649 rng: Rng::new(),
2650 }
2651}
2652
2653/// Races two streams, but with a user-provided seed for randomness.
2654///
2655/// # Examples
2656///
2657/// ```
2658/// use futures_lite::stream::{self, once, pending, StreamExt};
2659///
2660/// // A fixed seed is used for reproducibility.
2661/// const SEED: u64 = 123;
2662///
2663/// # spin_on::spin_on(async {
2664/// assert_eq!(stream::race_with_seed(once(1), pending(), SEED).next().await, Some(1));
2665/// assert_eq!(stream::race_with_seed(pending(), once(2), SEED).next().await, Some(2));
2666///
2667/// // One of the two stream is randomly chosen as the winner.
2668/// let res = stream::race_with_seed(once(1), once(2), SEED).next().await;
2669/// # })
2670/// ```
2671#[cfg(feature = "race")]
2672pub fn race_with_seed<T, S1, S2>(stream1: S1, stream2: S2, seed: u64) -> Race<S1, S2>
2673where
2674 S1: Stream<Item = T>,
2675 S2: Stream<Item = T>,
2676{
2677 Race {
2678 stream1,
2679 stream2,
2680 rng: Rng::with_seed(seed),
2681 }
2682}
2683
2684#[cfg(feature = "race")]
2685pin_project! {
2686 /// Stream for the [`race()`] function and the [`StreamExt::race()`] method.
2687 #[derive(Clone, Debug)]
2688 #[must_use = "streams do nothing unless polled"]
2689 pub struct Race<S1, S2> {
2690 #[pin]
2691 stream1: S1,
2692 #[pin]
2693 stream2: S2,
2694 rng: Rng,
2695 }
2696}
2697
2698#[cfg(feature = "race")]
2699impl<T, S1, S2> Stream for Race<S1, S2>
2700where
2701 S1: Stream<Item = T>,
2702 S2: Stream<Item = T>,
2703{
2704 type Item = T;
2705
2706 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2707 let mut this = self.project();
2708
2709 if this.rng.bool() {
2710 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2711 return Poll::Ready(Some(t));
2712 }
2713 if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2714 return Poll::Ready(Some(t));
2715 }
2716 } else {
2717 if let Poll::Ready(Some(t)) = this.stream2.as_mut().poll_next(cx) {
2718 return Poll::Ready(Some(t));
2719 }
2720 if let Poll::Ready(Some(t)) = this.stream1.as_mut().poll_next(cx) {
2721 return Poll::Ready(Some(t));
2722 }
2723 }
2724 Poll::Pending
2725 }
2726}
2727
2728pin_project! {
2729 /// Stream for the [`StreamExt::filter_map()`] method.
2730 #[derive(Clone, Debug)]
2731 #[must_use = "streams do nothing unless polled"]
2732 pub struct FilterMap<S, F> {
2733 #[pin]
2734 stream: S,
2735 f: F,
2736 }
2737}
2738
2739impl<S, F, T> Stream for FilterMap<S, F>
2740where
2741 S: Stream,
2742 F: FnMut(S::Item) -> Option<T>,
2743{
2744 type Item = T;
2745
2746 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2747 let mut this = self.project();
2748 loop {
2749 match ready!(this.stream.as_mut().poll_next(cx)) {
2750 None => return Poll::Ready(None),
2751 Some(v) => {
2752 if let Some(t) = (this.f)(v) {
2753 return Poll::Ready(Some(t));
2754 }
2755 }
2756 }
2757 }
2758 }
2759}
2760
2761pin_project! {
2762 /// Stream for the [`StreamExt::take()`] method.
2763 #[derive(Clone, Debug)]
2764 #[must_use = "streams do nothing unless polled"]
2765 pub struct Take<S> {
2766 #[pin]
2767 stream: S,
2768 n: usize,
2769 }
2770}
2771
2772impl<S: Stream> Stream for Take<S> {
2773 type Item = S::Item;
2774
2775 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
2776 let this = self.project();
2777
2778 if *this.n == 0 {
2779 Poll::Ready(None)
2780 } else {
2781 let next = ready!(this.stream.poll_next(cx));
2782 match next {
2783 Some(_) => *this.n -= 1,
2784 None => *this.n = 0,
2785 }
2786 Poll::Ready(next)
2787 }
2788 }
2789}
2790
2791pin_project! {
2792 /// Stream for the [`StreamExt::take_while()`] method.
2793 #[derive(Clone, Debug)]
2794 #[must_use = "streams do nothing unless polled"]
2795 pub struct TakeWhile<S, P> {
2796 #[pin]
2797 stream: S,
2798 predicate: P,
2799 }
2800}
2801
2802impl<S, P> Stream for TakeWhile<S, P>
2803where
2804 S: Stream,
2805 P: FnMut(&S::Item) -> bool,
2806{
2807 type Item = S::Item;
2808
2809 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2810 let this = self.project();
2811
2812 match ready!(this.stream.poll_next(cx)) {
2813 Some(v) => {
2814 if (this.predicate)(&v) {
2815 Poll::Ready(Some(v))
2816 } else {
2817 Poll::Ready(None)
2818 }
2819 }
2820 None => Poll::Ready(None),
2821 }
2822 }
2823}
2824
2825pin_project! {
2826 /// Stream for the [`StreamExt::map_while()`] method.
2827 #[derive(Clone, Debug)]
2828 #[must_use = "streams do nothing unless polled"]
2829 pub struct MapWhile<S, P> {
2830 #[pin]
2831 stream: S,
2832 predicate: P,
2833 }
2834}
2835
2836impl<B, S, P> Stream for MapWhile<S, P>
2837where
2838 S: Stream,
2839 P: FnMut(S::Item) -> Option<B>,
2840{
2841 type Item = B;
2842
2843 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2844 let this = self.project();
2845
2846 match ready!(this.stream.poll_next(cx)) {
2847 Some(v) => Poll::Ready((this.predicate)(v)),
2848 None => Poll::Ready(None),
2849 }
2850 }
2851}
2852
2853pin_project! {
2854 /// Stream for the [`StreamExt::skip()`] method.
2855 #[derive(Clone, Debug)]
2856 #[must_use = "streams do nothing unless polled"]
2857 pub struct Skip<S> {
2858 #[pin]
2859 stream: S,
2860 n: usize,
2861 }
2862}
2863
2864impl<S: Stream> Stream for Skip<S> {
2865 type Item = S::Item;
2866
2867 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2868 let mut this = self.project();
2869 loop {
2870 match ready!(this.stream.as_mut().poll_next(cx)) {
2871 Some(v) => match *this.n {
2872 0 => return Poll::Ready(Some(v)),
2873 _ => *this.n -= 1,
2874 },
2875 None => return Poll::Ready(None),
2876 }
2877 }
2878 }
2879}
2880
2881pin_project! {
2882 /// Stream for the [`StreamExt::skip_while()`] method.
2883 #[derive(Clone, Debug)]
2884 #[must_use = "streams do nothing unless polled"]
2885 pub struct SkipWhile<S, P> {
2886 #[pin]
2887 stream: S,
2888 predicate: Option<P>,
2889 }
2890}
2891
2892impl<S, P> Stream for SkipWhile<S, P>
2893where
2894 S: Stream,
2895 P: FnMut(&S::Item) -> bool,
2896{
2897 type Item = S::Item;
2898
2899 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2900 let mut this = self.project();
2901 loop {
2902 match ready!(this.stream.as_mut().poll_next(cx)) {
2903 Some(v) => match this.predicate {
2904 Some(p) => {
2905 if !p(&v) {
2906 *this.predicate = None;
2907 return Poll::Ready(Some(v));
2908 }
2909 }
2910 None => return Poll::Ready(Some(v)),
2911 },
2912 None => return Poll::Ready(None),
2913 }
2914 }
2915 }
2916}
2917
2918pin_project! {
2919 /// Stream for the [`StreamExt::step_by()`] method.
2920 #[derive(Clone, Debug)]
2921 #[must_use = "streams do nothing unless polled"]
2922 pub struct StepBy<S> {
2923 #[pin]
2924 stream: S,
2925 step: usize,
2926 i: usize,
2927 }
2928}
2929
2930impl<S: Stream> Stream for StepBy<S> {
2931 type Item = S::Item;
2932
2933 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2934 let mut this = self.project();
2935 loop {
2936 match ready!(this.stream.as_mut().poll_next(cx)) {
2937 Some(v) => {
2938 if *this.i == 0 {
2939 *this.i = *this.step - 1;
2940 return Poll::Ready(Some(v));
2941 } else {
2942 *this.i -= 1;
2943 }
2944 }
2945 None => return Poll::Ready(None),
2946 }
2947 }
2948 }
2949}
2950
2951pin_project! {
2952 /// Stream for the [`StreamExt::chain()`] method.
2953 #[derive(Clone, Debug)]
2954 #[must_use = "streams do nothing unless polled"]
2955 pub struct Chain<S, U> {
2956 #[pin]
2957 first: Fuse<S>,
2958 #[pin]
2959 second: Fuse<U>,
2960 }
2961}
2962
2963impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
2964 type Item = S::Item;
2965
2966 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2967 let mut this = self.project();
2968
2969 if !this.first.done {
2970 let next = ready!(this.first.as_mut().poll_next(cx));
2971 if let Some(next) = next {
2972 return Poll::Ready(Some(next));
2973 }
2974 }
2975
2976 if !this.second.done {
2977 let next = ready!(this.second.as_mut().poll_next(cx));
2978 if let Some(next) = next {
2979 return Poll::Ready(Some(next));
2980 }
2981 }
2982
2983 if this.first.done && this.second.done {
2984 Poll::Ready(None)
2985 } else {
2986 Poll::Pending
2987 }
2988 }
2989}
2990
2991pin_project! {
2992 /// Stream for the [`StreamExt::cloned()`] method.
2993 #[derive(Clone, Debug)]
2994 #[must_use = "streams do nothing unless polled"]
2995 pub struct Cloned<S> {
2996 #[pin]
2997 stream: S,
2998 }
2999}
3000
3001impl<'a, S, T: 'a> Stream for Cloned<S>
3002where
3003 S: Stream<Item = &'a T>,
3004 T: Clone,
3005{
3006 type Item = T;
3007
3008 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3009 let this = self.project();
3010 let next = ready!(this.stream.poll_next(cx));
3011 Poll::Ready(next.cloned())
3012 }
3013}
3014
3015pin_project! {
3016 /// Stream for the [`StreamExt::copied()`] method.
3017 #[derive(Clone, Debug)]
3018 #[must_use = "streams do nothing unless polled"]
3019 pub struct Copied<S> {
3020 #[pin]
3021 stream: S,
3022 }
3023}
3024
3025impl<'a, S, T: 'a> Stream for Copied<S>
3026where
3027 S: Stream<Item = &'a T>,
3028 T: Copy,
3029{
3030 type Item = T;
3031
3032 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3033 let this = self.project();
3034 let next = ready!(this.stream.poll_next(cx));
3035 Poll::Ready(next.copied())
3036 }
3037}
3038
3039pin_project! {
3040 /// Stream for the [`StreamExt::cycle()`] method.
3041 #[derive(Clone, Debug)]
3042 #[must_use = "streams do nothing unless polled"]
3043 pub struct Cycle<S> {
3044 orig: S,
3045 #[pin]
3046 stream: S,
3047 }
3048}
3049
3050impl<S> Stream for Cycle<S>
3051where
3052 S: Stream + Clone,
3053{
3054 type Item = S::Item;
3055
3056 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3057 match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
3058 Some(item) => Poll::Ready(Some(item)),
3059 None => {
3060 let new = self.as_mut().orig.clone();
3061 self.as_mut().project().stream.set(new);
3062 self.project().stream.poll_next(cx)
3063 }
3064 }
3065 }
3066}
3067
3068pin_project! {
3069 /// Stream for the [`StreamExt::enumerate()`] method.
3070 #[derive(Clone, Debug)]
3071 #[must_use = "streams do nothing unless polled"]
3072 pub struct Enumerate<S> {
3073 #[pin]
3074 stream: S,
3075 i: usize,
3076 }
3077}
3078
3079impl<S> Stream for Enumerate<S>
3080where
3081 S: Stream,
3082{
3083 type Item = (usize, S::Item);
3084
3085 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3086 let this = self.project();
3087
3088 match ready!(this.stream.poll_next(cx)) {
3089 Some(v) => {
3090 let ret = (*this.i, v);
3091 *this.i += 1;
3092 Poll::Ready(Some(ret))
3093 }
3094 None => Poll::Ready(None),
3095 }
3096 }
3097}
3098
3099pin_project! {
3100 /// Stream for the [`StreamExt::inspect()`] method.
3101 #[derive(Clone, Debug)]
3102 #[must_use = "streams do nothing unless polled"]
3103 pub struct Inspect<S, F> {
3104 #[pin]
3105 stream: S,
3106 f: F,
3107 }
3108}
3109
3110impl<S, F> Stream for Inspect<S, F>
3111where
3112 S: Stream,
3113 F: FnMut(&S::Item),
3114{
3115 type Item = S::Item;
3116
3117 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3118 let mut this = self.project();
3119 let next = ready!(this.stream.as_mut().poll_next(cx));
3120 if let Some(x) = &next {
3121 (this.f)(x);
3122 }
3123 Poll::Ready(next)
3124 }
3125}
3126
3127/// Future for the [`StreamExt::nth()`] method.
3128#[derive(Debug)]
3129#[must_use = "futures do nothing unless you `.await` or poll them"]
3130pub struct NthFuture<'a, S: ?Sized> {
3131 stream: &'a mut S,
3132 n: usize,
3133}
3134
3135impl<S: Unpin + ?Sized> Unpin for NthFuture<'_, S> {}
3136
3137impl<S> Future for NthFuture<'_, S>
3138where
3139 S: Stream + Unpin + ?Sized,
3140{
3141 type Output = Option<S::Item>;
3142
3143 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3144 loop {
3145 match ready!(self.stream.poll_next(cx)) {
3146 Some(v) => match self.n {
3147 0 => return Poll::Ready(Some(v)),
3148 _ => self.n -= 1,
3149 },
3150 None => return Poll::Ready(None),
3151 }
3152 }
3153 }
3154}
3155
3156pin_project! {
3157 /// Future for the [`StreamExt::last()`] method.
3158 #[derive(Debug)]
3159 #[must_use = "futures do nothing unless you `.await` or poll them"]
3160 pub struct LastFuture<S: Stream> {
3161 #[pin]
3162 stream: S,
3163 last: Option<S::Item>,
3164 }
3165}
3166
3167impl<S: Stream> Future for LastFuture<S> {
3168 type Output = Option<S::Item>;
3169
3170 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3171 let mut this = self.project();
3172 loop {
3173 match ready!(this.stream.as_mut().poll_next(cx)) {
3174 Some(new) => *this.last = Some(new),
3175 None => return Poll::Ready(this.last.take()),
3176 }
3177 }
3178 }
3179}
3180
3181/// Future for the [`StreamExt::find()`] method.
3182#[derive(Debug)]
3183#[must_use = "futures do nothing unless you `.await` or poll them"]
3184pub struct FindFuture<'a, S: ?Sized, P> {
3185 stream: &'a mut S,
3186 predicate: P,
3187}
3188
3189impl<S: Unpin + ?Sized, P> Unpin for FindFuture<'_, S, P> {}
3190
3191impl<S, P> Future for FindFuture<'_, S, P>
3192where
3193 S: Stream + Unpin + ?Sized,
3194 P: FnMut(&S::Item) -> bool,
3195{
3196 type Output = Option<S::Item>;
3197
3198 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3199 loop {
3200 match ready!(self.stream.poll_next(cx)) {
3201 Some(v) if (&mut self.predicate)(&v) => return Poll::Ready(Some(v)),
3202 Some(_) => {}
3203 None => return Poll::Ready(None),
3204 }
3205 }
3206 }
3207}
3208
3209/// Future for the [`StreamExt::find_map()`] method.
3210#[derive(Debug)]
3211#[must_use = "futures do nothing unless you `.await` or poll them"]
3212pub struct FindMapFuture<'a, S: ?Sized, F> {
3213 stream: &'a mut S,
3214 f: F,
3215}
3216
3217impl<S: Unpin + ?Sized, F> Unpin for FindMapFuture<'_, S, F> {}
3218
3219impl<S, B, F> Future for FindMapFuture<'_, S, F>
3220where
3221 S: Stream + Unpin + ?Sized,
3222 F: FnMut(S::Item) -> Option<B>,
3223{
3224 type Output = Option<B>;
3225
3226 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3227 loop {
3228 match ready!(self.stream.poll_next(cx)) {
3229 Some(v) => {
3230 if let Some(v) = (&mut self.f)(v) {
3231 return Poll::Ready(Some(v));
3232 }
3233 }
3234 None => return Poll::Ready(None),
3235 }
3236 }
3237 }
3238}
3239
3240/// Future for the [`StreamExt::position()`] method.
3241#[derive(Debug)]
3242#[must_use = "futures do nothing unless you `.await` or poll them"]
3243pub struct PositionFuture<'a, S: ?Sized, P> {
3244 stream: &'a mut S,
3245 predicate: P,
3246 index: usize,
3247}
3248
3249impl<S: Unpin + ?Sized, P> Unpin for PositionFuture<'_, S, P> {}
3250
3251impl<S, P> Future for PositionFuture<'_, S, P>
3252where
3253 S: Stream + Unpin + ?Sized,
3254 P: FnMut(S::Item) -> bool,
3255{
3256 type Output = Option<usize>;
3257
3258 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3259 loop {
3260 match ready!(self.stream.poll_next(cx)) {
3261 Some(v) => {
3262 if (&mut self.predicate)(v) {
3263 return Poll::Ready(Some(self.index));
3264 } else {
3265 self.index += 1;
3266 }
3267 }
3268 None => return Poll::Ready(None),
3269 }
3270 }
3271 }
3272}
3273
3274/// Future for the [`StreamExt::all()`] method.
3275#[derive(Debug)]
3276#[must_use = "futures do nothing unless you `.await` or poll them"]
3277pub struct AllFuture<'a, S: ?Sized, P> {
3278 stream: &'a mut S,
3279 predicate: P,
3280}
3281
3282impl<S: Unpin + ?Sized, P> Unpin for AllFuture<'_, S, P> {}
3283
3284impl<S, P> Future for AllFuture<'_, S, P>
3285where
3286 S: Stream + Unpin + ?Sized,
3287 P: FnMut(S::Item) -> bool,
3288{
3289 type Output = bool;
3290
3291 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3292 loop {
3293 match ready!(self.stream.poll_next(cx)) {
3294 Some(v) => {
3295 if !(&mut self.predicate)(v) {
3296 return Poll::Ready(false);
3297 }
3298 }
3299 None => return Poll::Ready(true),
3300 }
3301 }
3302 }
3303}
3304
3305/// Future for the [`StreamExt::any()`] method.
3306#[derive(Debug)]
3307#[must_use = "futures do nothing unless you `.await` or poll them"]
3308pub struct AnyFuture<'a, S: ?Sized, P> {
3309 stream: &'a mut S,
3310 predicate: P,
3311}
3312
3313impl<S: Unpin + ?Sized, P> Unpin for AnyFuture<'_, S, P> {}
3314
3315impl<S, P> Future for AnyFuture<'_, S, P>
3316where
3317 S: Stream + Unpin + ?Sized,
3318 P: FnMut(S::Item) -> bool,
3319{
3320 type Output = bool;
3321
3322 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3323 loop {
3324 match ready!(self.stream.poll_next(cx)) {
3325 Some(v) => {
3326 if (&mut self.predicate)(v) {
3327 return Poll::Ready(true);
3328 }
3329 }
3330 None => return Poll::Ready(false),
3331 }
3332 }
3333 }
3334}
3335
3336pin_project! {
3337 /// Future for the [`StreamExt::for_each()`] method.
3338 #[derive(Debug)]
3339 #[must_use = "futures do nothing unless you `.await` or poll them"]
3340 pub struct ForEachFuture<S, F> {
3341 #[pin]
3342 stream: S,
3343 f: F,
3344 }
3345}
3346
3347impl<S, F> Future for ForEachFuture<S, F>
3348where
3349 S: Stream,
3350 F: FnMut(S::Item),
3351{
3352 type Output = ();
3353
3354 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3355 let mut this = self.project();
3356 loop {
3357 match ready!(this.stream.as_mut().poll_next(cx)) {
3358 Some(v) => (this.f)(v),
3359 None => return Poll::Ready(()),
3360 }
3361 }
3362 }
3363}
3364
3365/// Future for the [`StreamExt::try_for_each()`] method.
3366#[derive(Debug)]
3367#[must_use = "futures do nothing unless you `.await` or poll them"]
3368pub struct TryForEachFuture<'a, S: ?Sized, F> {
3369 stream: &'a mut S,
3370 f: F,
3371}
3372
3373impl<S: Unpin + ?Sized, F> Unpin for TryForEachFuture<'_, S, F> {}
3374
3375impl<S, F, E> Future for TryForEachFuture<'_, S, F>
3376where
3377 S: Stream + Unpin + ?Sized,
3378 F: FnMut(S::Item) -> Result<(), E>,
3379{
3380 type Output = Result<(), E>;
3381
3382 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3383 loop {
3384 match ready!(self.stream.poll_next(cx)) {
3385 None => return Poll::Ready(Ok(())),
3386 Some(v) => (&mut self.f)(v)?,
3387 }
3388 }
3389 }
3390}
3391
3392pin_project! {
3393 /// Stream for the [`StreamExt::zip()`] method.
3394 #[derive(Clone, Debug)]
3395 #[must_use = "streams do nothing unless polled"]
3396 pub struct Zip<A: Stream, B> {
3397 item_slot: Option<A::Item>,
3398 #[pin]
3399 first: A,
3400 #[pin]
3401 second: B,
3402 }
3403}
3404
3405impl<A: Stream, B: Stream> Stream for Zip<A, B> {
3406 type Item = (A::Item, B::Item);
3407
3408 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3409 let this = self.project();
3410
3411 if this.item_slot.is_none() {
3412 match this.first.poll_next(cx) {
3413 Poll::Pending => return Poll::Pending,
3414 Poll::Ready(None) => return Poll::Ready(None),
3415 Poll::Ready(Some(item)) => *this.item_slot = Some(item),
3416 }
3417 }
3418
3419 let second_item = ready!(this.second.poll_next(cx));
3420 let first_item = this.item_slot.take().unwrap();
3421 Poll::Ready(second_item.map(|second_item| (first_item, second_item)))
3422 }
3423}
3424
3425pin_project! {
3426 /// Future for the [`StreamExt::unzip()`] method.
3427 #[derive(Debug)]
3428 #[must_use = "futures do nothing unless you `.await` or poll them"]
3429 pub struct UnzipFuture<S, FromA, FromB> {
3430 #[pin]
3431 stream: S,
3432 res: Option<(FromA, FromB)>,
3433 }
3434}
3435
3436impl<S, A, B, FromA, FromB> Future for UnzipFuture<S, FromA, FromB>
3437where
3438 S: Stream<Item = (A, B)>,
3439 FromA: Default + Extend<A>,
3440 FromB: Default + Extend<B>,
3441{
3442 type Output = (FromA, FromB);
3443
3444 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
3445 let mut this = self.project();
3446
3447 loop {
3448 match ready!(this.stream.as_mut().poll_next(cx)) {
3449 Some((a, b)) => {
3450 let res = this.res.as_mut().unwrap();
3451 res.0.extend(Some(a));
3452 res.1.extend(Some(b));
3453 }
3454 None => return Poll::Ready(this.res.take().unwrap()),
3455 }
3456 }
3457 }
3458}
3459
3460/// Stream for the [`StreamExt::drain()`] method.
3461#[derive(Debug)]
3462#[must_use = "streams do nothing unless polled"]
3463pub struct Drain<'a, S: ?Sized> {
3464 stream: &'a mut S,
3465}
3466
3467impl<S: Unpin + ?Sized> Unpin for Drain<'_, S> {}
3468
3469impl<'a, S: Unpin + ?Sized> Drain<'a, S> {
3470 /// Get a reference to the underlying stream.
3471 ///
3472 /// ## Examples
3473 ///
3474 /// ```
3475 /// use futures_lite::{prelude::*, stream};
3476 ///
3477 /// # spin_on::spin_on(async {
3478 /// let mut s = stream::iter(vec![1, 2, 3]);
3479 /// let s2 = s.drain();
3480 ///
3481 /// let inner = s2.get_ref();
3482 /// // s and inner are the same.
3483 /// # });
3484 /// ```
3485 pub fn get_ref(&self) -> &S {
3486 &self.stream
3487 }
3488
3489 /// Get a mutable reference to the underlying stream.
3490 ///
3491 /// ## Examples
3492 ///
3493 /// ```
3494 /// use futures_lite::{prelude::*, stream};
3495 ///
3496 /// # spin_on::spin_on(async {
3497 /// let mut s = stream::iter(vec![1, 2, 3]);
3498 /// let mut s2 = s.drain();
3499 ///
3500 /// let inner = s2.get_mut();
3501 /// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
3502 /// # });
3503 /// ```
3504 pub fn get_mut(&mut self) -> &mut S {
3505 &mut self.stream
3506 }
3507
3508 /// Consume this stream and get the underlying stream.
3509 ///
3510 /// ## Examples
3511 ///
3512 /// ```
3513 /// use futures_lite::{prelude::*, stream};
3514 ///
3515 /// # spin_on::spin_on(async {
3516 /// let mut s = stream::iter(vec![1, 2, 3]);
3517 /// let mut s2 = s.drain();
3518 ///
3519 /// let inner = s2.into_inner();
3520 /// assert_eq!(inner.collect::<Vec<_>>().await, vec![1, 2, 3]);
3521 /// # });
3522 /// ```
3523 pub fn into_inner(self) -> &'a mut S {
3524 self.stream
3525 }
3526}
3527
3528impl<S: Stream + Unpin + ?Sized> Stream for Drain<'_, S> {
3529 type Item = S::Item;
3530
3531 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3532 match self.stream.poll_next(cx) {
3533 Poll::Ready(x) => Poll::Ready(x),
3534 Poll::Pending => Poll::Ready(None),
3535 }
3536 }
3537
3538 fn size_hint(&self) -> (usize, Option<usize>) {
3539 let (_, hi) = self.stream.size_hint();
3540 (0, hi)
3541 }
3542}