diff --git a/src/Streamly/Internal/Data/Stream/IsStream/Top.hs b/src/Streamly/Internal/Data/Stream/IsStream/Top.hs index a0dd09be0e..f03306c4bd 100644 --- a/src/Streamly/Internal/Data/Stream/IsStream/Top.hs +++ b/src/Streamly/Internal/Data/Stream/IsStream/Top.hs @@ -28,16 +28,16 @@ module Streamly.Internal.Data.Stream.IsStream.Top -- | These are not exactly set operations because streams are not -- necessarily sets, they may have duplicated elements. , intersectBy - , mergeIntersectBy + , intersectBySorted , differenceBy - , mergeDifferenceBy + , differenceBySorted , unionBy - , mergeUnionBy + , unionBySorted -- ** Join operations , crossJoin , innerJoin - , mergeInnerJoin + , joinInnerMerge , hashInnerJoin , leftJoin , mergeLeftJoin @@ -65,6 +65,7 @@ import Streamly.Internal.Data.Stream.IsStream.Common (concatM) import Streamly.Internal.Data.Stream.IsStream.Type (IsStream(..), adapt, foldl', fromList) import Streamly.Internal.Data.Stream.Serial (SerialT) +--import Streamly.Internal.Data.Stream.StreamD (fromStreamD, toStreamD) import Streamly.Internal.Data.Time.Units (NanoSecond64(..), toRelTime64) import qualified Data.List as List @@ -78,6 +79,7 @@ import qualified Streamly.Internal.Data.Stream.IsStream.Expand as Stream import qualified Streamly.Internal.Data.Stream.IsStream.Reduce as Stream import qualified Streamly.Internal.Data.Stream.IsStream.Transform as Stream import qualified Streamly.Internal.Data.Stream.IsStream.Type as IsStream +import qualified Streamly.Internal.Data.Stream.StreamD as StreamD import Prelude hiding (filter, zipWith, concatMap, concat) @@ -306,10 +308,14 @@ hashInnerJoin = undefined -- -- Time: O(m + n) -- --- /Unimplemented/ -{-# INLINE mergeInnerJoin #-} -mergeInnerJoin :: (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b) -mergeInnerJoin = undefined +-- /Pre-release/ +{-# INLINE joinInnerMerge #-} +joinInnerMerge :: (IsStream t, MonadIO m, Eq a, Eq b) => + (a -> b -> Ordering) -> t m a -> t m b -> t m (a, b) +joinInnerMerge eq s1 = + IsStream.fromStreamD + . StreamD.joinInnerMerge eq (IsStream.toStreamD s1) + . IsStream.toStreamD -- XXX We can do this concurrently. -- XXX If the second stream is sorted and passed as an Array or a seek capable @@ -514,11 +520,14 @@ intersectBy eq s1 s2 = -- -- Time: O(m+n) -- --- /Unimplemented/ -{-# INLINE mergeIntersectBy #-} -mergeIntersectBy :: -- (IsStream t, Monad m) => +-- /Pre-release/ +{-# INLINE intersectBySorted #-} +intersectBySorted :: (IsStream t, MonadIO m, Eq a) => (a -> a -> Ordering) -> t m a -> t m a -> t m a -mergeIntersectBy _eq _s1 _s2 = undefined +intersectBySorted eq s1 = + IsStream.fromStreamD + . StreamD.intersectBySorted eq (IsStream.toStreamD s1) + . IsStream.toStreamD -- Roughly leftJoin s1 s2 = s1 `difference` s2 + s1 `intersection` s2 @@ -562,11 +571,14 @@ differenceBy eq s1 s2 = -- -- Space: O(1) -- --- /Unimplemented/ -{-# INLINE mergeDifferenceBy #-} -mergeDifferenceBy :: -- (IsStream t, Monad m) => +-- /Pre-release/ +{-# INLINE differenceBySorted #-} +differenceBySorted :: (IsStream t, MonadIO m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a -mergeDifferenceBy _eq _s1 _s2 = undefined +differenceBySorted eq s1 = + IsStream.fromStreamD + . StreamD.differenceBySorted eq (IsStream.toStreamD s1) + . IsStream.toStreamD -- | This is essentially an append operation that appends all the extra -- occurrences of elements from the second stream that are not already present @@ -610,8 +622,11 @@ unionBy eq s1 s2 = -- -- Space: O(1) -- --- /Unimplemented/ -{-# INLINE mergeUnionBy #-} -mergeUnionBy :: -- (IsStream t, Monad m) => +-- /Pre-release/ +{-# INLINE unionBySorted #-} +unionBySorted :: (IsStream t, MonadAsync m, Ord a) => (a -> a -> Ordering) -> t m a -> t m a -> t m a -mergeUnionBy _eq _s1 _s2 = undefined +unionBySorted cmp s1 = + IsStream.fromStreamD + . StreamD.unionBySorted cmp (IsStream.toStreamD s1) + . IsStream.toStreamD diff --git a/src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs b/src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs index 45471861a1..a95fc3aead 100644 --- a/src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs +++ b/src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs @@ -142,6 +142,10 @@ module Streamly.Internal.Data.Stream.StreamD.Nesting -- | Opposite to compact in ArrayStream , splitInnerBy , splitInnerBySuffix + , intersectBySorted + , unionBySorted + , differenceBySorted + , joinInnerMerge ) where @@ -151,6 +155,7 @@ import Control.Exception (assert) import Control.Monad.Catch (MonadThrow, throwM) import Control.Monad.IO.Class (MonadIO(..)) import Data.Bits (shiftR, shiftL, (.|.), (.&.)) +import Data.IORef #if __GLASGOW_HASKELL__ >= 801 import Data.Functor.Identity ( Identity ) #endif @@ -178,6 +183,7 @@ import Streamly.Internal.Data.Stream.StreamD.Type import Prelude hiding (concatMap, mapM, zipWith) + ------------------------------------------------------------------------------ -- Appending ------------------------------------------------------------------------------ @@ -481,6 +487,82 @@ mergeBy => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a mergeBy cmp = mergeByM (\a b -> return $ cmp a b) +------------------------------------------------------------------------------- +-- Sorted stream joins -------------------------------------------------------- +------------------------------------------------------------------------------- + +data StreamEmptyNess = + LeftEmpty + | RightEmpty + | BothEmpty + | NoneEmpty + deriving (Eq, Show) + +data RunOrder = + LeftRun + | RightRun + | CompareRun + | CompareDupRun + | FastFarwardRun + | RightDupRun + | BuffPrepare + | BuffPair + | BuffReset + deriving (Eq, Show) + +------------------------------------------------------------------------------- +-- Intersection of sorted streams --------------------------------------------- +------------------------------------------------------------------------------- +{-# INLINE_NORMAL intersectBySorted #-} +intersectBySorted + :: (MonadIO m, Eq a) + => (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a +intersectBySorted cmp (Stream stepa ta) (Stream stepb tb) = + Stream step (Just ta, Just tb, Nothing, Nothing, Nothing) + + where + {-# INLINE_LATE step #-} + + -- step 1 + step gst (Just sa, sb, Nothing, b, Nothing) = do + r <- stepa gst sa + return $ case r of + Yield a sa' -> Skip (Just sa', sb, Just a, b, Nothing) + Skip sa' -> Skip (Just sa', sb, Nothing, b, Nothing) + Stop -> Stop + + -- step 2 + step gst (sa, Just sb, a, Nothing, Nothing) = do + r <- stepb gst sb + return $ case r of + Yield b sb' -> Skip (sa, Just sb', a, Just b, Nothing) + Skip sb' -> Skip (sa, Just sb', a, Nothing, Nothing) + Stop -> Stop + + -- step 3 + -- both the values are available compare it + step _ (sa, sb, Just a, Just b, Nothing) = do + let res = cmp a b + return $ case res of + GT -> Skip (sa, sb, Just a, Nothing, Nothing) + LT -> Skip (sa, sb, Nothing, Just b, Nothing) + EQ -> Yield a (sa, sb, Nothing, Just a, Just b) -- step 4 + + -- step 4 + -- Matching element + step gst (Just sa, Just sb, Nothing, Just _, Just b) = do + r1 <- stepa gst sa + return $ case r1 of + Yield a' sa' -> do + if a' == b -- match with prev a + then Yield a' (Just sa', Just sb, Nothing, Just b, Just b) --step 1 + else Skip (Just sa', Just sb, Just a', Nothing, Nothing) + + Skip sa' -> Skip (Just sa', Just sb, Nothing, Nothing, Nothing) + Stop -> Stop + + step _ (_, _, _, _, _) = return Stop + ------------------------------------------------------------------------------ -- Combine N Streams - unfoldMany ------------------------------------------------------------------------------ @@ -2421,3 +2503,1020 @@ splitInnerBySuffix splitter joiner (Stream step1 state1) = step _ (SplitYielding x next) = return $ Yield x next step _ SplitFinishing = return Stop + +------------------------------------------------------------------------------- +-- Union of two sorted streams +------------------------------------------------------------------------------- + +{-# INLINE_NORMAL unionBySorted #-} +unionBySorted :: (MonadIO m, Ord a) => + (a -> a -> Ordering) + -> Stream m a + -> Stream m a + -> Stream m a +unionBySorted cmp (Stream stepa ta) (Stream stepb tb) = + Stream + step + ( Just ta -- State of left stream + , Just tb -- State of right stream + , Nothing -- Current element of left stream + , Nothing -- Current element of right stream + , Nothing -- Previous element of right stream + , LeftRun -- Stream runner indicator + , NoneEmpty -- Stream emptyness indicator + ) + + where + {-# INLINE_LATE step #-} + + -- Initial step when left stream could be empty + step + gst + ( Just sa + , Just sb + , Nothing + , Nothing + , Nothing + , LeftRun + , NoneEmpty + ) = + do + -- liftIO $ print "Step 1" + r <- stepa (adaptState gst) sa + return $ case r of + Yield a' sa' -> + Skip + ( Just sa' + , Just sb + , Just a' + , Nothing + , Nothing + , RightRun + , NoneEmpty + ) + Skip sa' -> + Skip + ( Just sa' + , Just sb + , Nothing + , Nothing + , Nothing + , RightRun + , NoneEmpty + ) + Stop -> + Skip + ( Nothing + , Just sb + , Nothing + , Nothing + , Nothing + , RightRun + , LeftEmpty + ) + + -- Take an element from right stream and compare with previously + -- picked element from left stream + step + gst + ( Just sa + , Just sb + , Just a + , Nothing + , Nothing + , RightRun + , NoneEmpty + ) = + do + -- liftIO $ print "Step 2" + r <- stepb (adaptState gst) sb + return $ + case r of + Yield b' sb' -> + Skip + ( Just sa + , Just sb' + , Just a + , Just b' + , Just b' + , CompareRun + , NoneEmpty + ) + Skip sb' -> + Skip + ( Just sa + , Just sb' + , Just a + , Nothing + , Nothing + , RightRun + , NoneEmpty + ) + Stop -> + Skip + ( Just sa + , Nothing + , Just a + , Nothing + , Nothing + , LeftRun + , RightEmpty + ) + + -- Left stream has finished so take the element from right stream + -- here we don't have any previous element from right stream + step + gst + ( Nothing + , Just sb + , Nothing + , Nothing + , Nothing + , RightRun + , LeftEmpty + ) = + do + -- liftIO $ print "Step 2.1" + r <- stepb (adaptState gst) sb + return $ + case r of + Yield b' sb' -> + Yield + b' + ( Nothing + , Just sb' + , Nothing + , Just b' + , Just b' + , RightRun + , LeftEmpty + ) + Skip sb' -> + Skip + ( Nothing + , Just sb' + , Nothing + , Nothing + , Nothing + , RightRun + , LeftEmpty + ) + Stop -> + Stop + + -- Left stream has finished so take the element from right stream. + -- Here we have a previous element from right stream to compare with current + -- element of the same stream and discard the duplicates from right stream. + step + gst + ( Nothing + , Just sb + , Nothing + , Just _ + , Just pb + , RightRun + , LeftEmpty + ) = + do + -- liftIO $ print "Step 2.2" + r <- stepb (adaptState gst) sb + return $ + case r of + Yield b' sb' -> + if pb == b' -- discard the duplicates from right stream. + then + Skip + ( Nothing + , Just sb' + , Nothing + , Just b' + , Just b' + , RightRun + , LeftEmpty + ) + else + Yield + b' + ( Nothing + , Just sb' + , Nothing + , Just b' + , Just b' + , RightRun + , LeftEmpty + ) + Skip sb' -> + Skip + ( Nothing + , Just sb' + , Nothing + , Nothing + , Nothing + , RightRun + , LeftEmpty + ) + Stop -> + Stop + + -- Compare the elements from both streams, if equals then fast farward the + -- right stream to remove duplicated elements. + step gst (Just sa, Just sb, Just a, Just b, Just _, CompareRun, NoneEmpty) = + do + -- liftIO $ print "Step CompareRun" + let res = cmp a b + case res of + LT -> + return $ + Yield + a + ( Just sa + , Just sb + , Just a + , Just b + , Just b + , LeftRun + , NoneEmpty + ) + EQ -> do + r <- stepa (adaptState gst) sa + case r of + Yield a' sa' -> return $ + Yield + a -- remove duplicated elements from right + ( Just sa' + , Just sb + , Just a' + , Just b + , Just b + , FastFarwardRun + , NoneEmpty + ) + Skip sa' -> return $ + Yield + a -- remove duplicated elements from right + ( Just sa' + , Just sb + , Just a + , Just b + , Just b + , FastFarwardRun + , NoneEmpty + ) + Stop -> return $ + Yield + a -- remove duplicated elements from right + ( Nothing + , Just sb + , Nothing + , Just b + , Just b + , FastFarwardRun + , LeftEmpty + ) + GT -> + return $ + Yield + b + ( Just sa + , Just sb + , Just a + , Just b + , Just b + , RightRun + , NoneEmpty + ) + + -- Compare the elements from both streams, if equals then discard the + -- element from right stream. + step + gst + ( Just sa + , Just sb + , Just a + , Just _ + , Just pb + , RightRun + , NoneEmpty + ) = + do + -- liftIO $ print "Step 3" + r <- stepb (adaptState gst) sb + return $ + case r of + Yield b' sb' -> + if pb == b' + then + Skip -- discard the matching elements + ( Just sa + , Just sb' + , Just a + , Just b' + , Just b' + , RightRun + , NoneEmpty + ) + else + Skip + ( Just sa + , Just sb' + , Just a + , Just b' + , Just b' + , CompareRun + , NoneEmpty + ) + + Skip sb' -> + Skip + ( Just sa + , Just sb' + , Just a + , Nothing + , Nothing + , RightRun + , NoneEmpty + ) + Stop -> + Skip + ( Just sa + , Nothing + , Just a + , Nothing + , Nothing + , LeftRun + , RightEmpty + ) + + -- Fast forward right to remove dups + step + gst + ( sa + , Just sb + , a + , Just b + , Just pb + , FastFarwardRun + , e + ) = + do + -- liftIO $ print $ "Step 3.1 " ++ show e + r <- stepb (adaptState gst) sb + return $ + case r of + Yield b' sb' -> + if b'==pb + then + Skip + ( sa + , Just sb' + , a + , Just b' + , Just b' + , FastFarwardRun + , e + ) + else + case e of + LeftEmpty -> + Yield b' + ( Nothing + , Just sb + , Nothing + , Just b' + , Just b' + , RightRun + , LeftEmpty + ) + _ -> + Skip + ( sa + , Just sb + , a + , Just b' + , Just b' + , CompareRun + , NoneEmpty + ) + + Skip sb' -> + Skip + ( sa + , Just sb' + , a + , Just b + , Just pb + , FastFarwardRun, NoneEmpty) + Stop -> + --Yield a + Skip + ( sa + , Nothing + , a + , Nothing + , Nothing + , LeftRun + , RightEmpty + ) + + + -- Right stream is empty just iterate thru left stream + step + gst + ( Just sa + , Nothing + , a + , Nothing + , Nothing + , LeftRun + , RightEmpty + ) = + do + -- liftIO $ print "Step 4" + r <- stepa (adaptState gst) sa + return $ + case r of + Yield a' sa' -> + case a of + Just v -> Yield + v + ( Just sa' + , Nothing + , Just a' + , Nothing + , Nothing + , LeftRun + , RightEmpty + ) + Nothing -> Skip + ( Just sa' + , Nothing + , Just a' + , Nothing + , Nothing + , LeftRun + , RightEmpty + ) + Skip sa' -> + Skip + ( Just sa' + , Nothing + , a + , Nothing + , Nothing + , LeftRun + , NoneEmpty + ) + Stop -> + case a of + Just v -> Yield + v + ( Nothing + , Nothing + , Nothing + , Nothing + , Nothing + , LeftRun + , BothEmpty + ) + Nothing -> Skip + ( Nothing + , Nothing + , Nothing + , Nothing + , Nothing + , LeftRun + , BothEmpty + ) + + -- Right stream is non-empty just iterate thru left stream. + -- If last element of left stream is matching with current right element + -- ignore it. + step gst (Just sa, Just sb, Just a, Just b, Just _, LeftRun, NoneEmpty) = + do + -- liftIO $ print "Step 5" + r <- stepa (adaptState gst) sa + return $ + case r of + Yield a' sa' -> + Skip + ( Just sa' + , Just sb + , Just a' + , Just b + , Just b + , CompareRun + , NoneEmpty + ) + Skip sa' -> + Skip + ( Just sa' + , Just sb + , Just a + , Just b + , Nothing + , LeftRun + , NoneEmpty + ) + Stop -> + if a==b + then + Skip + ( Nothing + , Just sb + , Nothing + , Just b + , Just b + , RightRun + , LeftEmpty + ) + else + Yield + b + ( Nothing + , Just sb + , Nothing + , Just b + , Just b + , RightRun + , LeftEmpty + ) + + step _ (_, _, _, _, _, _, _) = return Stop + +------------------------------------------------------------------------------- +-- Difference of sorted streams ----------------------------------------------- +------------------------------------------------------------------------------- +{-# INLINE_NORMAL differenceBySorted #-} +differenceBySorted :: (Monad m) => + (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a +differenceBySorted cmp (Stream stepa ta) (Stream stepb tb) = + Stream step (Just ta, Just tb, Nothing, Nothing, Nothing) + + where + {-# INLINE_LATE step #-} + + -- one of the values is missing, and the corresponding stream is running + step gst (Just sa, sb, Nothing, b, Nothing) = do + r <- stepa gst sa + return $ case r of + Yield a sa' -> Skip (Just sa', sb, Just a, b, Nothing) + Skip sa' -> Skip (Just sa', sb, Nothing, b, Nothing) + Stop -> Skip (Nothing, sb, Nothing, b, Nothing) + + step gst (sa, Just sb, a, Nothing, Nothing) = do + r <- stepb gst sb + return $ case r of + Yield b sb' -> Skip (sa, Just sb', a, Just b, Nothing) + Skip sb' -> Skip (sa, Just sb', a, Nothing, Nothing) + Stop -> Skip (sa, Nothing, a, Nothing, Nothing) + + -- Matching element + step gst (Just sa, Just sb, Nothing, _, Just _) = do + r1 <- stepa gst sa + r2 <- stepb gst sb + return $ case r1 of + Yield a sa' -> + case r2 of + Yield c sb' -> + Skip (Just sa', Just sb', Just a, Just c, Nothing) + Skip sb' -> + Skip (Just sa', Just sb', Just a, Just a, Nothing) + Stop -> + Yield a (Just sa', Just sb, Nothing, Nothing, Just a) + Skip sa' -> + case r2 of + Yield c sb' -> + Skip (Just sa', Just sb', Just c, Just c, Nothing) + Skip sb' -> + Skip (Just sa', Just sb', Nothing, Nothing, Nothing) + Stop -> + Stop + Stop -> + Stop + + -- both the values are available + step _ (sa, sb, Just a, Just b, Nothing) = do + let res = cmp a b + return $ case res of + GT -> Skip (sa, sb, Just a, Nothing, Nothing) + LT -> Yield a (sa, sb, Nothing, Just b, Nothing) + EQ -> Skip (sa, sb, Nothing, Just b, Just b) + + -- one of the values is missing, corresponding stream is done + step _ (sa, Nothing, Just a, Nothing, Nothing) = + return $ Yield a (sa, Nothing, Nothing, Nothing , Nothing) + step _ (_, _, _, _, _) = return Stop + +------------------------------------------------------------------------------- +-- Inner Join of sorted streams ----------------------------------------------- +------------------------------------------------------------------------------- + +-- Buffer the list of duplicate elements from right stream that will be +-- iterated with matching elements from the left stream. An index in buffer is +-- reset to 0 once an element from left stream is done so that next matching +-- element from left stream again start with index 0. + +{-# INLINE_NORMAL joinInnerMerge #-} +joinInnerMerge + :: (MonadIO m, Eq a, Eq b) + => (a -> b -> Ordering) -> Stream m a -> Stream m b -> Stream m (a, b) +joinInnerMerge cmp (Stream stepa ta) (Stream stepb tb) = + Stream + step + ( Just ta -- state of left stream + , Just tb -- state of right stream + , Nothing -- current element of left stream + , Nothing -- current element of rigght stream + , Nothing -- previous element of left stream + , Nothing -- previous element of right stream + , Nothing -- list of duplicate elements from right stream + , LeftRun -- stream selector left/right + , 0 -- index of list of duplicate elements from right stream + ) + + where + + {-# INLINE_LATE step #-} + + -- step 1 : Initial Step when left stream could be empty + step gst (Just sa, sb, Nothing, Nothing, pa, pb, _, LeftRun, idx) = + do + -- liftIO $ print "Step 1" + ref <- liftIO $ newIORef [] + r <- stepa (adaptState gst) sa + return $ + case r of + Yield a' sa' -> + Skip + ( Just sa' + , sb + , Just a' + , Nothing + , Just a' + , pb + , Just ref + , RightRun + , idx + ) + + Skip sa' -> + Skip + ( Just sa' + , sb + , Nothing + , Nothing + , pa + , pb + , Just ref + , LeftRun + , idx + ) + + Stop -> + Stop + + -- step 2 : pull element from right stream + step gst (Just sa, Just sb, a, b, pa, pb, buff, RightRun, idx) = + do + -- liftIO $ print "Step 2" + r <- stepb (adaptState gst) sb + return $ + case r of + Yield b' sb' -> + Skip + ( Just sa + , Just sb' + , a + , Just b' + , pa + , Just b' + , buff + , CompareRun + , idx + ) -- go to compare step + + Skip sb' -> + Skip + ( Nothing + , Just sb' + , Nothing + , b + , pa + , pb + , buff + , LeftRun + , idx + ) + + Stop -> + Stop + + -- step 3 : compare step compare left stream data with right stream + step _ (Just sa, Just sb, Just a, Just b, pa, pb, Just buff, CompareRun, idx) = + do + -- liftIO $ print "Step 3" + let res = cmp a b + return $ + case res of + LT -> + Skip + ( Just sa + , Just sb + , Just a + , Just b + , pa + , pb + , Just buff + , LeftRun + , idx + ) -- skip a step pull left + EQ -> + Skip + ( Just sa + , Just sb + , Just a + , Just b + , Just a + , pb + , Just buff + , BuffPrepare + , idx + ) -- step 6 + GT -> + Skip + ( Just sa + , Just sb + , Just a + , Just b + , pa + , Just b + , Just buff + , RightDupRun + , idx + ) -- step right duplicate + + -- step 4 : step pull left pull the data from left stream to compare next data from right stream + -- it is diff from initial step as this step has other states + step gst (Just sa, Just sb, Just a, Just b, pa, pb, buff, LeftRun, idx) = + do + -- liftIO $ print "Step 4" + r <- stepa (adaptState gst) sa + return $ + case r of + Yield a' sa' -> + Skip -- compare step + ( Just sa' + , Just sb + , Just a' + , Just b + , Just a + , pb + , buff + , CompareRun + , idx + ) + Skip sa' -> + Skip + ( Just sa' + , Just sb + , Nothing + , Nothing + , pa + , pb + , buff + , RightRun + , idx + ) + Stop -> Stop + + + -- step 5 : step right duplicate both stream has data, pull data from right + -- stream and in next step compare b with previous b to remove mismatched + -- duplicates from right stream + step + gst + ( Just sa + , Just sb + , Just a + , Just b + , Just pa + , Just pb + , buff + , RightDupRun + , idx + ) = + do + -- liftIO $ print "Step 5" + r <- stepb (adaptState gst) sb + return $ + case r of + Yield b' sb' -> + if b' == pb + then + Skip + ( Just sa + , Just sb' + , Just a + , Just b' + , Just a + , Just b' + , buff + , RightDupRun + , idx + ) -- step right duplicate + else + Skip + ( Just sa + , Just sb' + , Just a + , Just b' + , Just a + , Just b' + , buff + , CompareRun + , idx + ) -- compare step + Skip sb' -> + Skip + ( Nothing + , Just sb' + , Nothing + , Just b + , Just pa + , Nothing + , buff + , LeftRun + , idx + ) + Stop -> + Stop + + -- step 6 : buffer the data from right stream to iterate over + step + gst + ( Just sa + , Just sb + , Just a + , Just b + , pa + , _ + , Just buff + , BuffPrepare + , idx + ) = + do + -- liftIO $ print "Step 6" + liftIO $ modifyIORef' buff (b : ) + r <- stepb (adaptState gst) sb + case r of + Yield b' sb' -> do + if b' == b + then return $ + Skip + ( Just sa + , Just sb' + , Just a + , Just b' + , pa + , Just b' + , Just buff + , BuffPrepare + , idx + ) + else return $ + Skip -- step 7 + ( Just sa + , Just sb' + , Just a + , Just b' + , pa + , Just b' + , Just buff + , BuffPair + , 0 + ) + Skip sb' -> + return $ + Skip + ( Nothing + , Just sb' + , Nothing + , Just b + , pa + , Nothing + , Just buff + , LeftRun + , idx + ) + Stop -> + return $ + Skip + ( Just sa + , Just sb + , Just a + , Just b + , pa + , Just b + , Just buff + , BuffPair + , 0 + ) -- step 7 + + -- step 7 : do pairing with buff (only when repeatation is over) + step + _ + ( Just sa + , Just sb + , Just a + , Just b + , pa + , Just pb + , Just buff + , BuffPair + , idx + ) = + do + -- liftIO $ print "Step 7" + bl <- liftIO $ readIORef buff + if idx < length bl + then return $ + Yield + (a, bl !! idx) + ( Just sa + , Just sb + , Just a + , Just b + , pa + , Just pb + , Just buff + , BuffPair + , idx+1 + ) + else return $ + Skip -- step 8 + ( Just sa + , Just sb + , Just a + , Just b + , Just a + , Just pb + , Just buff + , BuffReset + , 0 + ) + + + -- step 8 : pull the data from left stream to compare next data from + -- right stream + step gst + (Just sa, sb, Just _, Just b, Just pa, pb, Just buff, BuffReset, idx) = + do + -- liftIO $ print "Step 8" + r <- stepa (adaptState gst) sa + case r of + Yield a' sa' -> do + if a' == pa + then return $ + Skip -- step 7 + ( Just sa' + , sb + , Just a' + , Just b + , Just a' + , pb + , Just buff + , BuffPair + , idx + ) + else do + -- clear buff + liftIO $ writeIORef buff [] + return $ + Skip -- compare step + ( Just sa' + , sb + , Just a' + , Just b + , Just a' + , pb + , Just buff + , CompareRun + , idx + ) + Skip sa' -> + return $ + Skip + ( Just sa' + , sb + , Nothing + , Nothing + , Just pa + , pb + , Just buff + , RightRun + , idx + ) + Stop -> + return Stop + + step _ (_, _, _, _, _, _, _, _, _) = return Stop diff --git a/streamly.cabal b/streamly.cabal index 21b86411aa..3bcf5746cb 100644 --- a/streamly.cabal +++ b/streamly.cabal @@ -94,6 +94,7 @@ extra-source-files: test/Streamly/Test/Data/Array/Foreign.hs test/Streamly/Test/Data/Array/Stream/Foreign.hs test/Streamly/Test/Data/Parser/ParserD.hs + test/Streamly/Test/Data/Stream/Top.hs test/Streamly/Test/FileSystem/Event.hs test/Streamly/Test/FileSystem/Event/Common.hs test/Streamly/Test/FileSystem/Event/Darwin.hs diff --git a/test/Streamly/Test/Data/Stream/Top.hs b/test/Streamly/Test/Data/Stream/Top.hs new file mode 100644 index 0000000000..5a18ed997b --- /dev/null +++ b/test/Streamly/Test/Data/Stream/Top.hs @@ -0,0 +1,116 @@ +module Main (main) + where + +import Data.List (intersect, sort, union, (\\)) +import Test.QuickCheck + ( Gen + , Property + , choose + , forAll + , listOf + ) +import Test.QuickCheck.Monadic (monadicIO, assert, run) +import qualified Streamly.Prelude as S +import qualified Streamly.Internal.Data.Stream.IsStream.Top as Top + +import Prelude hiding + (maximum, minimum, elem, notElem, null, product, sum, head, last, take) +import Test.Hspec as H +import Test.Hspec.QuickCheck + +min_value :: Int +min_value = 0 + +max_value :: Int +max_value = 10000 + +chooseInt :: (Int, Int) -> Gen Int +chooseInt = choose + +intersectBySorted :: Property +intersectBySorted = + forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 -> + forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 -> + monadicIO $ action (sort ls0) (sort ls1) + + where + + action ls0 ls1 = do + v1 <- + run + $ S.toList + $ Top.intersectBySorted + compare + (S.fromList ls0) + (S.fromList ls1) + let v2 = intersect ls0 ls1 + assert (v1 == sort v2) + +unionBySorted :: Property +unionBySorted = + forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 -> + forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 -> + monadicIO $ action (sort ls0) (sort ls1) + + where + + action ls0 ls1 = do + v1 <- + run + $ S.toList + $ Top.unionBySorted + compare + (S.fromList ls0) + (S.fromList ls1) + let v2 = sort $ union ls0 ls1 + assert (v1 == v2) + +differenceBySorted :: Property +differenceBySorted = + forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 -> + forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 -> + monadicIO $ action (sort ls0) (sort ls1) + + where + + action ls0 ls1 = do + v1 <- + run + $ S.toList + $ Top.differenceBySorted + compare + (S.fromList ls0) + (S.fromList ls1) + let v2 = ls0 \\ ls1 + assert (v1 == sort v2) + +joinInnerMerge :: Property +joinInnerMerge = + forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 -> + forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 -> + monadicIO $ action (sort ls0) (sort (ls1)) + + where + + action ls0 ls1 = do + v1 <- + run + $ S.toList + $ Top.joinInnerMerge + compare + (S.fromList ls0) + (S.fromList ls1) + let v2 = [ (i,j) | i <- ls0, j <- ls1, i == j ] + assert (v1 == v2) +------------------------------------------------------------------------------- +moduleName :: String +moduleName = "Data.Stream.Top" + +main :: IO () +main = hspec $ do + describe moduleName $ do + -- intersect + prop "intersectBySorted" Main.intersectBySorted + prop "unionBySorted" Main.unionBySorted + prop "differenceBySorted" Main.differenceBySorted + prop "joinInnerMerge" Main.joinInnerMerge diff --git a/test/streamly-tests.cabal b/test/streamly-tests.cabal index b4975b0c77..6338aa4466 100644 --- a/test/streamly-tests.cabal +++ b/test/streamly-tests.cabal @@ -430,3 +430,8 @@ test-suite version-bounds import: test-options type: exitcode-stdio-1.0 main-is: version-bounds.hs + +test-suite Data.Stream.Top + import: test-options + type: exitcode-stdio-1.0 + main-is: Streamly/Test/Data/Stream/Top.hs