@@ -136,34 +136,31 @@ pub trait Stream {
136136 /// use async_std::prelude::*;
137137 /// use async_std::stream;
138138 ///
139- /// let mut s = stream::repeat(9).take(3);
139+ /// let mut s = stream::repeat::<u32>(42).take(3);
140+ /// assert!(s.all(|x| x == 42).await);
140141 ///
141- /// while let Some(v) = s.next().await {
142- /// assert_eq!(v, 9);
143- /// }
144142 /// #
145143 /// # }) }
146144 /// ```
147- /// ```
148- /// let a = [1, 2, 3];
149145 ///
150- /// assert!(a.iter().all(|&x| x > 0));
146+ /// Empty stream:
151147 ///
152- /// assert!(!a.iter().all(|&x| x > 2));
153148 /// ```
149+ /// # fn main() { async_std::task::block_on(async {
150+ /// #
151+ /// use async_std::prelude::*;
152+ /// use async_std::stream;
154153 ///
155- /// Stopping at the first `false`:
154+ /// let mut s = stream::empty::<u32>();
155+ /// assert!(s.all(|_| false).await);
156156 ///
157+ /// #
158+ /// # }) }
157159 /// ```
158- /// let a = [1, 2, 3];
159- ///
160- /// let mut iter = a.iter();
161- ///
162- /// assert!(!iter.all(|&x| x != 2));
160+ /// Stopping at the first `false`:
163161 ///
164- /// // we can still use `iter`, as there are more elements.
165- /// assert_eq!(iter.next(), Some(&3));
166- /// ```
162+ /// TODO: add example here
163+ /// TODO: add more examples
167164 #[ inline]
168165 fn all < F > ( & mut self , f : F ) -> AllFuture < ' _ , Self , F , Self :: Item >
169166 where
@@ -235,9 +232,9 @@ impl<S: futures::Stream> futures::Stream for Take<S> {
235232 }
236233}
237234
235+ #[ derive( Debug ) ]
238236pub struct AllFuture < ' a , S , F , T >
239237where
240- S : ?Sized ,
241238 F : FnMut ( T ) -> bool ,
242239{
243240 stream : & ' a mut S ,
@@ -248,7 +245,6 @@ where
248245
249246impl < ' a , S , F , T > AllFuture < ' a , S , F , T >
250247where
251- S : ?Sized ,
252248 F : FnMut ( T ) -> bool ,
253249{
254250 pin_utils:: unsafe_pinned!( stream: & ' a mut S ) ;
@@ -258,8 +254,9 @@ where
258254
259255impl < S , F > Future for AllFuture < ' _ , S , F , S :: Item >
260256where
261- S : futures:: Stream + Unpin + ? Sized ,
257+ S : futures:: Stream + Unpin + Sized ,
262258 F : FnMut ( S :: Item ) -> bool ,
259+ S :: Item : std:: fmt:: Debug ,
263260{
264261 type Output = bool ;
265262
@@ -268,9 +265,15 @@ where
268265 let next = futures:: ready!( self . as_mut( ) . stream( ) . poll_next( cx) ) ;
269266 match next {
270267 Some ( v) => {
271- // me: *screams*
272- * self . as_mut ( ) . result ( ) = ( self . as_mut ( ) . f ( ) ) ( v) ;
273- Poll :: Pending
268+ let result = ( self . as_mut ( ) . f ( ) ) ( v) ;
269+ * self . as_mut ( ) . result ( ) = result;
270+ if result {
271+ // don't forget to wake this task again to pull the next item from stream
272+ cx. waker ( ) . wake_by_ref ( ) ;
273+ Poll :: Pending
274+ } else {
275+ Poll :: Ready ( false )
276+ }
274277 }
275278 None => Poll :: Ready ( self . result ) ,
276279 }
0 commit comments