futures_lite::stream

Trait StreamExt

source
pub trait StreamExt: Stream {
Show 45 methods // Provided methods fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> where Self: Unpin { ... } fn next(&mut self) -> NextFuture<'_, Self> where Self: Unpin { ... } fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self> where Self: Stream<Item = Result<T, E>> + Unpin { ... } fn count(self) -> CountFuture<Self> where Self: Sized { ... } fn map<T, F>(self, f: F) -> Map<Self, F> where Self: Sized, F: FnMut(Self::Item) -> T { ... } fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F> where Self: Sized, U: Stream, F: FnMut(Self::Item) -> U { ... } fn flatten(self) -> Flatten<Self> where Self: Sized, Self::Item: Stream { ... } fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut> where Self: Sized, F: FnMut(Self::Item) -> Fut, Fut: Future { ... } fn filter<P>(self, predicate: P) -> Filter<Self, P> where Self: Sized, P: FnMut(&Self::Item) -> bool { ... } fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F> where Self: Sized, F: FnMut(Self::Item) -> Option<T> { ... } fn take(self, n: usize) -> Take<Self> where Self: Sized { ... } fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P> where Self: Sized, P: FnMut(&Self::Item) -> bool { ... } fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P> where Self: Sized, P: FnMut(Self::Item) -> Option<B> { ... } fn skip(self, n: usize) -> Skip<Self> where Self: Sized { ... } fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P> where Self: Sized, P: FnMut(&Self::Item) -> bool { ... } fn step_by(self, step: usize) -> StepBy<Self> where Self: Sized { ... } fn chain<U>(self, other: U) -> Chain<Self, U> where Self: Sized, U: Stream<Item = Self::Item> + Sized { ... } fn cloned<'a, T>(self) -> Cloned<Self> where Self: Stream<Item = &'a T> + Sized, T: Clone + 'a { ... } fn copied<'a, T>(self) -> Copied<Self> where Self: Stream<Item = &'a T> + Sized, T: Copy + 'a { ... } fn collect<C>(self) -> CollectFuture<Self, C> where Self: Sized, C: Default + Extend<Self::Item> { ... } fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C> where Self: Stream<Item = Result<T, E>> + Sized, C: Default + Extend<T> { ... } fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B> where Self: Sized, B: Default + Extend<Self::Item>, P: FnMut(&Self::Item) -> bool { ... } fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T> where Self: Sized, F: FnMut(T, Self::Item) -> T { ... } fn try_fold<T, E, F, B>( &mut self, init: B, f: F, ) -> TryFoldFuture<'_, Self, F, B> where Self: Stream<Item = Result<T, E>> + Unpin + Sized, F: FnMut(B, T) -> Result<B, E> { ... } fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F> where Self: Sized, F: FnMut(&mut St, Self::Item) -> Option<B> { ... } fn fuse(self) -> Fuse<Self> where Self: Sized { ... } fn cycle(self) -> Cycle<Self> where Self: Clone + Sized { ... } fn enumerate(self) -> Enumerate<Self> where Self: Sized { ... } fn inspect<F>(self, f: F) -> Inspect<Self, F> where Self: Sized, F: FnMut(&Self::Item) { ... } fn nth(&mut self, n: usize) -> NthFuture<'_, Self> where Self: Unpin { ... } fn last(self) -> LastFuture<Self> where Self: Sized { ... } fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P> where Self: Unpin, P: FnMut(&Self::Item) -> bool { ... } fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> Option<B> { ... } fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P> where Self: Unpin, P: FnMut(Self::Item) -> bool { ... } fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P> where Self: Unpin, P: FnMut(Self::Item) -> bool { ... } fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P> where Self: Unpin, P: FnMut(Self::Item) -> bool { ... } fn for_each<F>(self, f: F) -> ForEachFuture<Self, F> where Self: Sized, F: FnMut(Self::Item) { ... } fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F> where Self: Unpin, F: FnMut(Self::Item) -> Result<(), E> { ... } fn zip<U>(self, other: U) -> Zip<Self, U> where Self: Sized, U: Stream { ... } fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB> where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Stream<Item = (A, B)> + Sized { ... } fn or<S>(self, other: S) -> Or<Self, S> where Self: Sized, S: Stream<Item = Self::Item> { ... } fn race<S>(self, other: S) -> Race<Self, S> where Self: Sized, S: Stream<Item = Self::Item> { ... } fn drain(&mut self) -> Drain<'_, Self> { ... } fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>> where Self: Send + Sized + 'a { ... } fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>> where Self: Sized + 'a { ... }
}
Expand description

Extension trait for Stream.

Provided Methods§

source

fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>
where Self: Unpin,

A convenience for calling Stream::poll_next() on !Unpin types.

source

fn next(&mut self) -> NextFuture<'_, Self>
where Self: Unpin,

Retrieves the next item in the stream.

Returns None when iteration is finished. Stream implementations may choose to or not to resume iteration after that.

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::iter(1..=3);

assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, Some(3));
assert_eq!(s.next().await, None);
source

fn try_next<T, E>(&mut self) -> TryNextFuture<'_, Self>
where Self: Stream<Item = Result<T, E>> + Unpin,

Retrieves the next item in the stream.

This is similar to the next() method, but returns Result<Option<T>, E> rather than Option<Result<T, E>>.

Note that s.try_next().await is equivalent to s.next().await.transpose().

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::iter(vec![Ok(1), Ok(2), Err("error")]);

assert_eq!(s.try_next().await, Ok(Some(1)));
assert_eq!(s.try_next().await, Ok(Some(2)));
assert_eq!(s.try_next().await, Err("error"));
assert_eq!(s.try_next().await, Ok(None));
source

fn count(self) -> CountFuture<Self>
where Self: Sized,

Counts the number of items in the stream.

§Examples
use futures_lite::stream::{self, StreamExt};

let s1 = stream::iter(vec![0]);
let s2 = stream::iter(vec![1, 2, 3]);

assert_eq!(s1.count().await, 1);
assert_eq!(s2.count().await, 3);
source

fn map<T, F>(self, f: F) -> Map<Self, F>
where Self: Sized, F: FnMut(Self::Item) -> T,

Maps items of the stream to new values using a closure.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![1, 2, 3]);
let mut s = s.map(|x| 2 * x);

assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, Some(4));
assert_eq!(s.next().await, Some(6));
assert_eq!(s.next().await, None);
source

fn flat_map<U, F>(self, f: F) -> FlatMap<Self, U, F>
where Self: Sized, U: Stream, F: FnMut(Self::Item) -> U,

Maps items to streams and then concatenates them.

§Examples
use futures_lite::stream::{self, StreamExt};

let words = stream::iter(vec!["one", "two"]);

let s: String = words
    .flat_map(|s| stream::iter(s.chars()))
    .collect()
    .await;

assert_eq!(s, "onetwo");
source

fn flatten(self) -> Flatten<Self>
where Self: Sized, Self::Item: Stream,

Concatenates inner streams.

§Examples
use futures_lite::stream::{self, StreamExt};

let s1 = stream::iter(vec![1, 2, 3]);
let s2 = stream::iter(vec![4, 5]);

let s = stream::iter(vec![s1, s2]);
let v: Vec<_> = s.flatten().collect().await;
assert_eq!(v, [1, 2, 3, 4, 5]);
source

fn then<F, Fut>(self, f: F) -> Then<Self, F, Fut>
where Self: Sized, F: FnMut(Self::Item) -> Fut, Fut: Future,

Maps items of the stream to new values using an async closure.

§Examples
use futures_lite::pin;
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![1, 2, 3]);
let mut s = s.then(|x| async move { 2 * x });

pin!(s);
assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, Some(4));
assert_eq!(s.next().await, Some(6));
assert_eq!(s.next().await, None);
source

fn filter<P>(self, predicate: P) -> Filter<Self, P>
where Self: Sized, P: FnMut(&Self::Item) -> bool,

Keeps items of the stream for which predicate returns true.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![1, 2, 3, 4]);
let mut s = s.filter(|i| i % 2 == 0);

assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, Some(4));
assert_eq!(s.next().await, None);
source

fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
where Self: Sized, F: FnMut(Self::Item) -> Option<T>,

Filters and maps items of the stream using a closure.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec!["1", "lol", "3", "NaN", "5"]);
let mut s = s.filter_map(|a| a.parse::<u32>().ok());

assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, Some(3));
assert_eq!(s.next().await, Some(5));
assert_eq!(s.next().await, None);
source

fn take(self, n: usize) -> Take<Self>
where Self: Sized,

Takes only the first n items of the stream.

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::repeat(7).take(2);

assert_eq!(s.next().await, Some(7));
assert_eq!(s.next().await, Some(7));
assert_eq!(s.next().await, None);
source

fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
where Self: Sized, P: FnMut(&Self::Item) -> bool,

Takes items while predicate returns true.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![1, 2, 3, 4]);
let mut s = s.take_while(|x| *x < 3);

assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, None);
source

fn map_while<B, P>(self, predicate: P) -> MapWhile<Self, P>
where Self: Sized, P: FnMut(Self::Item) -> Option<B>,

Maps items while predicate returns Some.

This stream is not fused. After the predicate returns None the stream still contains remaining items that can be obtained by subsequent next calls. You can fuse the stream if this behavior is undesirable.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![1, 2, 0, 3]);
let mut s = s.map_while(|x: u32| x.checked_sub(1));

assert_eq!(s.next().await, Some(0));
assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, None);

// Continue to iterate the stream.
assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, None);
source

fn skip(self, n: usize) -> Skip<Self>
where Self: Sized,

Skips the first n items of the stream.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![1, 2, 3]);
let mut s = s.skip(2);

assert_eq!(s.next().await, Some(3));
assert_eq!(s.next().await, None);
source

fn skip_while<P>(self, predicate: P) -> SkipWhile<Self, P>
where Self: Sized, P: FnMut(&Self::Item) -> bool,

Skips items while predicate returns true.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![-1i32, 0, 1]);
let mut s = s.skip_while(|x| x.is_negative());

assert_eq!(s.next().await, Some(0));
assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, None);
source

fn step_by(self, step: usize) -> StepBy<Self>
where Self: Sized,

Yields every stepth item.

§Panics

This method will panic if the step is 0.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![0, 1, 2, 3, 4]);
let mut s = s.step_by(2);

assert_eq!(s.next().await, Some(0));
assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, Some(4));
assert_eq!(s.next().await, None);
source

fn chain<U>(self, other: U) -> Chain<Self, U>
where Self: Sized, U: Stream<Item = Self::Item> + Sized,

Appends another stream to the end of this one.

§Examples
use futures_lite::stream::{self, StreamExt};

let s1 = stream::iter(vec![1, 2]);
let s2 = stream::iter(vec![7, 8]);
let mut s = s1.chain(s2);

assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, Some(7));
assert_eq!(s.next().await, Some(8));
assert_eq!(s.next().await, None);
source

fn cloned<'a, T>(self) -> Cloned<Self>
where Self: Stream<Item = &'a T> + Sized, T: Clone + 'a,

Clones all items.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![&1, &2]);
let mut s = s.cloned();

assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, None);
source

fn copied<'a, T>(self) -> Copied<Self>
where Self: Stream<Item = &'a T> + Sized, T: Copy + 'a,

Copies all items.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![&1, &2]);
let mut s = s.copied();

assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, None);
source

fn collect<C>(self) -> CollectFuture<Self, C>
where Self: Sized, C: Default + Extend<Self::Item>,

Collects all items in the stream into a collection.

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::iter(1..=3);

let items: Vec<_> = s.collect().await;
assert_eq!(items, [1, 2, 3]);
source

fn try_collect<T, E, C>(self) -> TryCollectFuture<Self, C>
where Self: Stream<Item = Result<T, E>> + Sized, C: Default + Extend<T>,

Collects all items in the fallible stream into a collection.

use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![Ok(1), Err(2), Ok(3)]);
let res: Result<Vec<i32>, i32> = s.try_collect().await;
assert_eq!(res, Err(2));

let s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);
let res: Result<Vec<i32>, i32> = s.try_collect().await;
assert_eq!(res, Ok(vec![1, 2, 3]));
source

fn partition<B, P>(self, predicate: P) -> PartitionFuture<Self, P, B>
where Self: Sized, B: Default + Extend<Self::Item>, P: FnMut(&Self::Item) -> bool,

Partitions items into those for which predicate is true and those for which it is false, and then collects them into two collections.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![1, 2, 3]);
let (even, odd): (Vec<_>, Vec<_>) = s.partition(|&n| n % 2 == 0).await;

assert_eq!(even, &[2]);
assert_eq!(odd, &[1, 3]);
source

fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
where Self: Sized, F: FnMut(T, Self::Item) -> T,

Accumulates a computation over the stream.

The computation begins with the accumulator value set to init, and then applies f to the accumulator and each item in the stream. The final accumulator value is returned.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![1, 2, 3]);
let sum = s.fold(0, |acc, x| acc + x).await;

assert_eq!(sum, 6);
source

fn try_fold<T, E, F, B>( &mut self, init: B, f: F, ) -> TryFoldFuture<'_, Self, F, B>
where Self: Stream<Item = Result<T, E>> + Unpin + Sized, F: FnMut(B, T) -> Result<B, E>,

Accumulates a fallible computation over the stream.

The computation begins with the accumulator value set to init, and then applies f to the accumulator and each item in the stream. The final accumulator value is returned, or an error if f failed the computation.

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::iter(vec![Ok(1), Ok(2), Ok(3)]);

let sum = s.try_fold(0, |acc, v| {
    if (acc + v) % 2 == 1 {
        Ok(acc + v)
    } else {
        Err("fail")
    }
})
.await;

assert_eq!(sum, Err("fail"));
source

fn scan<St, B, F>(self, initial_state: St, f: F) -> Scan<Self, St, F>
where Self: Sized, F: FnMut(&mut St, Self::Item) -> Option<B>,

Maps items of the stream to new values using a state value and a closure.

Scanning begins with the initial state set to initial_state, and then applies f to the state and each item in the stream. The stream stops when f returns None.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![1, 2, 3]);
let mut s = s.scan(1, |state, x| {
    *state = *state * x;
    Some(-*state)
});

assert_eq!(s.next().await, Some(-1));
assert_eq!(s.next().await, Some(-2));
assert_eq!(s.next().await, Some(-6));
assert_eq!(s.next().await, None);
source

fn fuse(self) -> Fuse<Self>
where Self: Sized,

Fuses the stream so that it stops yielding items after the first None.

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::once(1).fuse();

assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, None);
assert_eq!(s.next().await, None);
source

fn cycle(self) -> Cycle<Self>
where Self: Clone + Sized,

Repeats the stream from beginning to end, forever.

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::iter(vec![1, 2]).cycle();

assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, Some(2));
assert_eq!(s.next().await, Some(1));
assert_eq!(s.next().await, Some(2));
source

fn enumerate(self) -> Enumerate<Self>
where Self: Sized,

Enumerates items, mapping them to (index, item).

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec!['a', 'b', 'c']);
let mut s = s.enumerate();

assert_eq!(s.next().await, Some((0, 'a')));
assert_eq!(s.next().await, Some((1, 'b')));
assert_eq!(s.next().await, Some((2, 'c')));
assert_eq!(s.next().await, None);
source

fn inspect<F>(self, f: F) -> Inspect<Self, F>
where Self: Sized, F: FnMut(&Self::Item),

Calls a closure on each item and passes it on.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![1, 2, 3, 4, 5]);

let sum = s
   .inspect(|x| println!("about to filter {}", x))
   .filter(|x| x % 2 == 0)
   .inspect(|x| println!("made it through filter: {}", x))
   .fold(0, |sum, i| sum + i)
   .await;
source

fn nth(&mut self, n: usize) -> NthFuture<'_, Self>
where Self: Unpin,

Gets the nth item of the stream.

In the end, n+1 items of the stream will be consumed.

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5, 6, 7]);

assert_eq!(s.nth(2).await, Some(2));
assert_eq!(s.nth(2).await, Some(5));
assert_eq!(s.nth(2).await, None);
source

fn last(self) -> LastFuture<Self>
where Self: Sized,

Returns the last item in the stream.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![1, 2, 3, 4]);
assert_eq!(s.last().await, Some(4));

let s = stream::empty::<i32>();
assert_eq!(s.last().await, None);
source

fn find<P>(&mut self, predicate: P) -> FindFuture<'_, Self, P>
where Self: Unpin, P: FnMut(&Self::Item) -> bool,

Finds the first item of the stream for which predicate returns true.

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::iter(vec![11, 12, 13, 14]);

assert_eq!(s.find(|x| *x % 2 == 0).await, Some(12));
assert_eq!(s.next().await, Some(13));
source

fn find_map<F, B>(&mut self, f: F) -> FindMapFuture<'_, Self, F>
where Self: Unpin, F: FnMut(Self::Item) -> Option<B>,

Applies a closure to items in the stream and returns the first Some result.

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::iter(vec!["lol", "NaN", "2", "5"]);
let number = s.find_map(|s| s.parse().ok()).await;

assert_eq!(number, Some(2));
source

fn position<P>(&mut self, predicate: P) -> PositionFuture<'_, Self, P>
where Self: Unpin, P: FnMut(Self::Item) -> bool,

Finds the index of the first item of the stream for which predicate returns true.

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::iter(vec![0, 1, 2, 3, 4, 5]);

assert_eq!(s.position(|x| x == 2).await, Some(2));
assert_eq!(s.position(|x| x == 3).await, Some(0));
assert_eq!(s.position(|x| x == 9).await, None);
source

fn all<P>(&mut self, predicate: P) -> AllFuture<'_, Self, P>
where Self: Unpin, P: FnMut(Self::Item) -> bool,

Tests if predicate returns true for all items in the stream.

The result is true for an empty stream.

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::iter(vec![1, 2, 3]);
assert!(!s.all(|x| x % 2 == 0).await);

let mut s = stream::iter(vec![2, 4, 6, 8]);
assert!(s.all(|x| x % 2 == 0).await);

let mut s = stream::empty::<i32>();
assert!(s.all(|x| x % 2 == 0).await);
source

fn any<P>(&mut self, predicate: P) -> AnyFuture<'_, Self, P>
where Self: Unpin, P: FnMut(Self::Item) -> bool,

Tests if predicate returns true for any item in the stream.

The result is false for an empty stream.

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::iter(vec![1, 3, 5, 7]);
assert!(!s.any(|x| x % 2 == 0).await);

let mut s = stream::iter(vec![1, 2, 3]);
assert!(s.any(|x| x % 2 == 0).await);

let mut s = stream::empty::<i32>();
assert!(!s.any(|x| x % 2 == 0).await);
source

fn for_each<F>(self, f: F) -> ForEachFuture<Self, F>
where Self: Sized, F: FnMut(Self::Item),

Calls a closure on each item of the stream.

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::iter(vec![1, 2, 3]);
s.for_each(|s| println!("{}", s)).await;
source

fn try_for_each<F, E>(&mut self, f: F) -> TryForEachFuture<'_, Self, F>
where Self: Unpin, F: FnMut(Self::Item) -> Result<(), E>,

Calls a fallible closure on each item of the stream, stopping on first error.

§Examples
use futures_lite::stream::{self, StreamExt};

let mut s = stream::iter(vec![0, 1, 2, 3]);

let mut v = vec![];
let res = s
    .try_for_each(|n| {
        if n < 2 {
            v.push(n);
            Ok(())
        } else {
            Err("too big")
        }
    })
    .await;

assert_eq!(v, &[0, 1]);
assert_eq!(res, Err("too big"));
source

fn zip<U>(self, other: U) -> Zip<Self, U>
where Self: Sized, U: Stream,

Zips up two streams into a single stream of pairs.

The stream of pairs stops when either of the original two streams is exhausted.

§Examples
use futures_lite::stream::{self, StreamExt};

let l = stream::iter(vec![1, 2, 3]);
let r = stream::iter(vec![4, 5, 6, 7]);
let mut s = l.zip(r);

assert_eq!(s.next().await, Some((1, 4)));
assert_eq!(s.next().await, Some((2, 5)));
assert_eq!(s.next().await, Some((3, 6)));
assert_eq!(s.next().await, None);
source

fn unzip<A, B, FromA, FromB>(self) -> UnzipFuture<Self, FromA, FromB>
where FromA: Default + Extend<A>, FromB: Default + Extend<B>, Self: Stream<Item = (A, B)> + Sized,

Collects a stream of pairs into a pair of collections.

§Examples
use futures_lite::stream::{self, StreamExt};

let s = stream::iter(vec![(1, 2), (3, 4)]);
let (left, right): (Vec<_>, Vec<_>) = s.unzip().await;

assert_eq!(left, [1, 3]);
assert_eq!(right, [2, 4]);
source

fn or<S>(self, other: S) -> Or<Self, S>
where Self: Sized, S: Stream<Item = Self::Item>,

Merges with other stream, preferring items from self whenever both streams are ready.

§Examples
use futures_lite::stream::{self, StreamExt};
use futures_lite::stream::{once, pending};

assert_eq!(once(1).or(pending()).next().await, Some(1));
assert_eq!(pending().or(once(2)).next().await, Some(2));

// The first future wins.
assert_eq!(once(1).or(once(2)).next().await, Some(1));
source

fn race<S>(self, other: S) -> Race<Self, S>
where Self: Sized, S: Stream<Item = Self::Item>,

Merges with other stream, with no preference for either stream when both are ready.

§Examples
use futures_lite::stream::{self, StreamExt};
use futures_lite::stream::{once, pending};

assert_eq!(once(1).race(pending()).next().await, Some(1));
assert_eq!(pending().race(once(2)).next().await, Some(2));

// One of the two stream is randomly chosen as the winner.
let res = once(1).race(once(2)).next().await;
source

fn drain(&mut self) -> Drain<'_, Self>

Yields all immediately available values from a stream.

This is intended to be used as a way of polling a stream without waiting, similar to the try_iter function on std::sync::mpsc::Receiver. For instance, running this stream on an async_channel::Receiver will return all messages that are currently in the channel, but will not wait for new messages.

This returns a Stream instead of an Iterator because it still needs access to the polling context in order to poll the underlying stream. Since this stream will never return Poll::Pending, wrapping it in block_on will allow it to be effectively used as an Iterator.

This stream is not necessarily fused. After it returns None, it can return Some(x) in the future if it is polled again.

§Examples
use futures_lite::{future, pin};
use futures_lite::stream::{self, StreamExt};

// A stream that yields two values, returns `Pending`, and then yields one more value.
let pend_once = stream::once_future(async {
    future::yield_now().await;
    3
});
let s = stream::iter(vec![1, 2]).chain(pend_once);
pin!(s);

// This will return the first two values, and then `None` because the stream returns
// `Pending` after that.
let mut iter = stream::block_on(s.drain());
assert_eq!(iter.next(), Some(1));
assert_eq!(iter.next(), Some(2));
assert_eq!(iter.next(), None);

// This will return the last value, because the stream returns `Ready` when polled.
assert_eq!(iter.next(), Some(3));
assert_eq!(iter.next(), None);
source

fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
where Self: Send + Sized + 'a,

Boxes the stream and changes its type to dyn Stream + Send + 'a.

§Examples
use futures_lite::stream::{self, StreamExt};

let a = stream::once(1);
let b = stream::empty();

// Streams of different types can be stored in
// the same collection when they are boxed:
let streams = vec![a.boxed(), b.boxed()];
source

fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
where Self: Sized + 'a,

Boxes the stream and changes its type to dyn Stream + 'a.

§Examples
use futures_lite::stream::{self, StreamExt};

let a = stream::once(1);
let b = stream::empty();

// Streams of different types can be stored in
// the same collection when they are boxed:
let streams = vec![a.boxed_local(), b.boxed_local()];

Object Safety§

This trait is not object safe.

Implementors§

source§

impl<S: Stream + ?Sized> StreamExt for S