@@ -27,6 +27,7 @@ use cfg_if::cfg_if;
2727
2828use crate :: future:: Future ;
2929use crate :: task:: { Context , Poll } ;
30+ use std:: marker:: PhantomData ;
3031
3132cfg_if ! {
3233 if #[ cfg( feature = "docs" ) ] {
@@ -111,6 +112,71 @@ pub trait Stream {
111112 remaining : n,
112113 }
113114 }
115+
116+ /// Tests if every element of the stream matches a predicate.
117+ ///
118+ /// `all()` takes a closure that returns `true` or `false`. It applies
119+ /// this closure to each element of the stream, and if they all return
120+ /// `true`, then so does `all()`. If any of them return `false`, it
121+ /// returns `false`.
122+ ///
123+ /// `all()` is short-circuiting; in other words, it will stop processing
124+ /// as soon as it finds a `false`, given that no matter what else happens,
125+ /// the result will also be `false`.
126+ ///
127+ /// An empty stream returns `true`.
128+ ///
129+ /// # Examples
130+ ///
131+ /// Basic usage:
132+ ///
133+ /// ```
134+ /// # fn main() { async_std::task::block_on(async {
135+ /// #
136+ /// use async_std::prelude::*;
137+ /// use async_std::stream;
138+ ///
139+ /// let mut s = stream::repeat(9).take(3);
140+ ///
141+ /// while let Some(v) = s.next().await {
142+ /// assert_eq!(v, 9);
143+ /// }
144+ /// #
145+ /// # }) }
146+ /// ```
147+ /// ```
148+ /// let a = [1, 2, 3];
149+ ///
150+ /// assert!(a.iter().all(|&x| x > 0));
151+ ///
152+ /// assert!(!a.iter().all(|&x| x > 2));
153+ /// ```
154+ ///
155+ /// Stopping at the first `false`:
156+ ///
157+ /// ```
158+ /// let a = [1, 2, 3];
159+ ///
160+ /// let mut iter = a.iter();
161+ ///
162+ /// assert!(!iter.all(|&x| x != 2));
163+ ///
164+ /// // we can still use `iter`, as there are more elements.
165+ /// assert_eq!(iter.next(), Some(&3));
166+ /// ```
167+ #[ inline]
168+ fn all < F > ( & mut self , f : F ) -> AllFuture < ' _ , Self , F , Self :: Item >
169+ where
170+ Self : Sized ,
171+ F : FnMut ( Self :: Item ) -> bool ,
172+ {
173+ AllFuture {
174+ stream : self ,
175+ result : true ,
176+ __item : PhantomData ,
177+ f,
178+ }
179+ }
114180}
115181
116182impl < T : futures:: Stream + Unpin + ?Sized > Stream for T {
@@ -168,3 +234,45 @@ impl<S: futures::Stream> futures::Stream for Take<S> {
168234 }
169235 }
170236}
237+
238+ pub struct AllFuture < ' a , S , F , T >
239+ where
240+ S : ?Sized ,
241+ F : FnMut ( T ) -> bool ,
242+ {
243+ stream : & ' a mut S ,
244+ f : F ,
245+ result : bool ,
246+ __item : PhantomData < T > ,
247+ }
248+
249+ impl < ' a , S , F , T > AllFuture < ' a , S , F , T >
250+ where
251+ S : ?Sized ,
252+ F : FnMut ( T ) -> bool ,
253+ {
254+ pin_utils:: unsafe_pinned!( stream: & ' a mut S ) ;
255+ pin_utils:: unsafe_unpinned!( result: bool ) ;
256+ pin_utils:: unsafe_unpinned!( f: F ) ;
257+ }
258+
259+ impl < S , F > Future for AllFuture < ' _ , S , F , S :: Item >
260+ where
261+ S : futures:: Stream + Unpin + ?Sized ,
262+ F : FnMut ( S :: Item ) -> bool ,
263+ {
264+ type Output = bool ;
265+
266+ fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
267+ use futures:: Stream ;
268+ let next = futures:: ready!( self . as_mut( ) . stream( ) . poll_next( cx) ) ;
269+ match next {
270+ Some ( v) => {
271+ // me: *screams*
272+ * self . as_mut ( ) . result ( ) = ( self . as_mut ( ) . f ( ) ) ( v) ;
273+ Poll :: Pending
274+ }
275+ None => Poll :: Ready ( self . result ) ,
276+ }
277+ }
278+ }
0 commit comments