diff --git a/benchmark/Streamly/Benchmark/Prelude/Serial/NestedStream.hs b/benchmark/Streamly/Benchmark/Prelude/Serial/NestedStream.hs index c6fe765b95..4c496320d7 100644 --- a/benchmark/Streamly/Benchmark/Prelude/Serial/NestedStream.hs +++ b/benchmark/Streamly/Benchmark/Prelude/Serial/NestedStream.hs @@ -461,6 +461,11 @@ o_n_heap_buffering value = $ joinWith Internal.intersectBy sqrtVal , benchIOSrc1 "intersectBySorted" $ joinMapWith (Internal.intersectBySorted compare) halfVal + -- XXX It hangs forever + --, benchIOSrc1 "differenceBy" + -- $ joinMapWith (Internal.differenceBy (==)) halfVal + , benchIOSrc1 "differenceBySorted" + $ joinMapWith (Internal.differenceBySorted compare) sqrtVal ] ] diff --git a/core/src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs b/core/src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs index 821ccb97bd..6fecb60011 100644 --- a/core/src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs +++ b/core/src/Streamly/Internal/Data/Stream/StreamD/Nesting.hs @@ -143,6 +143,7 @@ module Streamly.Internal.Data.Stream.StreamD.Nesting , splitInnerBy , splitInnerBySuffix , intersectBySorted + , differenceBySorted ) where @@ -487,6 +488,25 @@ mergeBy cmp = mergeByM (\a b -> return $ cmp a b) -- Intersection of sorted streams ------------------------------------------------------------------------------- +data StreamEmptyNess = + LeftEmpty + | RightEmpty + | BothEmpty + | NoneEmpty + deriving (Eq, Show) + +data RunOrder = + LeftRun + | RightRun + | CompareRun + | CompareDupRun + | FastFarwardRun + | RightDupRun + | BuffPrepare + | BuffPair + | BuffReset + deriving (Eq, Show) + -- Assuming the streams are sorted in ascending order {-# INLINE_NORMAL intersectBySorted #-} intersectBySorted :: Monad m @@ -2466,3 +2486,67 @@ splitInnerBySuffix splitter joiner (Stream step1 state1) = step _ (SplitYielding x next) = return $ Yield x next step _ SplitFinishing = return Stop + +------------------------------------------------------------------------------- +-- Difference of sorted streams ----------------------------------------------- +------------------------------------------------------------------------------- +{-# INLINE_NORMAL differenceBySorted #-} +differenceBySorted :: (Monad m) => + (a -> a -> Ordering) -> Stream m a -> Stream m a -> Stream m a +differenceBySorted cmp (Stream stepa ta) (Stream stepb tb) = + Stream step (Just ta, Just tb, Nothing, Nothing, Nothing) + + where + {-# INLINE_LATE step #-} + + -- one of the values is missing, and the corresponding stream is running + step gst (Just sa, sb, Nothing, b, Nothing) = do + r <- stepa gst sa + return $ case r of + Yield a sa' -> Skip (Just sa', sb, Just a, b, Nothing) + Skip sa' -> Skip (Just sa', sb, Nothing, b, Nothing) + Stop -> Skip (Nothing, sb, Nothing, b, Nothing) + + step gst (sa, Just sb, a, Nothing, Nothing) = do + r <- stepb gst sb + return $ case r of + Yield b sb' -> Skip (sa, Just sb', a, Just b, Nothing) + Skip sb' -> Skip (sa, Just sb', a, Nothing, Nothing) + Stop -> Skip (sa, Nothing, a, Nothing, Nothing) + + -- Matching element + step gst (Just sa, Just sb, Nothing, _, Just _) = do + r1 <- stepa gst sa + r2 <- stepb gst sb + return $ case r1 of + Yield a sa' -> + case r2 of + Yield c sb' -> + Skip (Just sa', Just sb', Just a, Just c, Nothing) + Skip sb' -> + Skip (Just sa', Just sb', Just a, Just a, Nothing) + Stop -> + Yield a (Just sa', Just sb, Nothing, Nothing, Just a) + Skip sa' -> + case r2 of + Yield c sb' -> + Skip (Just sa', Just sb', Just c, Just c, Nothing) + Skip sb' -> + Skip (Just sa', Just sb', Nothing, Nothing, Nothing) + Stop -> + Stop + Stop -> + Stop + + -- both the values are available + step _ (sa, sb, Just a, Just b, Nothing) = do + let res = cmp a b + return $ case res of + GT -> Skip (sa, sb, Just a, Nothing, Nothing) + LT -> Yield a (sa, sb, Nothing, Just b, Nothing) + EQ -> Skip (sa, sb, Nothing, Just b, Just b) + + -- one of the values is missing, corresponding stream is done + step _ (sa, Nothing, Just a, Nothing, Nothing) = + return $ Yield a (sa, Nothing, Nothing, Nothing , Nothing) + step _ (_, _, _, _, _) = return Stop diff --git a/src/Streamly/Internal/Data/Stream/IsStream/Top.hs b/src/Streamly/Internal/Data/Stream/IsStream/Top.hs index b6992e5c5c..1021d215e8 100644 --- a/src/Streamly/Internal/Data/Stream/IsStream/Top.hs +++ b/src/Streamly/Internal/Data/Stream/IsStream/Top.hs @@ -30,7 +30,7 @@ module Streamly.Internal.Data.Stream.IsStream.Top , intersectBy , intersectBySorted , differenceBy - , mergeDifferenceBy + , differenceBySorted , unionBy , mergeUnionBy @@ -632,11 +632,14 @@ differenceBy eq s1 s2 = -- -- Space: O(1) -- --- /Unimplemented/ -{-# INLINE mergeDifferenceBy #-} -mergeDifferenceBy :: -- (IsStream t, Monad m) => +-- /Pre-release/ +{-# INLINE differenceBySorted #-} +differenceBySorted :: (IsStream t, MonadIO m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a -mergeDifferenceBy _eq _s1 _s2 = undefined +differenceBySorted eq s1 = + IsStream.fromStreamD + . StreamD.differenceBySorted eq (IsStream.toStreamD s1) + . IsStream.toStreamD -- | This is essentially an append operation that appends all the extra -- occurrences of elements from the second stream that are not already present diff --git a/test/Streamly/Test/Prelude/Top.hs b/test/Streamly/Test/Prelude/Top.hs index 84f814d81d..a2b0866144 100644 --- a/test/Streamly/Test/Prelude/Top.hs +++ b/test/Streamly/Test/Prelude/Top.hs @@ -1,6 +1,6 @@ module Main (main) where -import Data.List (elem, intersect, nub, sort) +import Data.List (elem, intersect, nub, sort, (\\)) import Data.Maybe (isNothing) import Streamly.Prelude (SerialT) import Test.QuickCheck @@ -196,6 +196,25 @@ intersectBy srt intersectFunc cmp = let v2 = ls0 `intersect` ls1 assert (sort v1 == sort v2) +differenceBySorted :: Property +differenceBySorted = + forAll (listOf (chooseInt (min_value, max_value))) $ \ls0 -> + forAll (listOf (chooseInt (min_value, max_value))) $ \ls1 -> + monadicIO $ action (sort ls0) (sort ls1) + + where + + action ls0 ls1 = do + v1 <- + run + $ S.toList + $ Top.differenceBySorted + compare + (S.fromList ls0) + (S.fromList ls1) + let v2 = ls0 \\ ls1 + assert (v1 == sort v2) + ------------------------------------------------------------------------------- -- Main ------------------------------------------------------------------------------- @@ -219,3 +238,4 @@ main = hspec $ do (intersectBy id Top.intersectBy (==)) prop "intersectBySorted" (intersectBy sort Top.intersectBySorted compare) + prop "differenceBySorted" Main.differenceBySorted