Skip to content

Commit

Permalink
Merge pull request #192 from haskell-works/topic/warnings
Browse files Browse the repository at this point in the history
Fixup warnings, improve error messages
  • Loading branch information
AlexeyRaga authored May 9, 2023
2 parents 16c7c89 + 3cc8136 commit d363d5e
Show file tree
Hide file tree
Showing 16 changed files with 261 additions and 300 deletions.
2 changes: 2 additions & 0 deletions avro.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ common gauge { if arch(x86_64) || arch(i386) { build-depends:

common config
default-language: Haskell2010
ghc-options: -Wall

if flag(dev)
ghc-options: -Wall -Werror

Expand Down
34 changes: 17 additions & 17 deletions src/Data/Avro.hs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ import Data.Avro.Schema.Schema (Schema)
import qualified Data.Avro.Schema.Schema as Schema
import Data.Binary.Get (runGetOrFail)
import Data.ByteString.Builder (toLazyByteString)
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy as Lazy
import Data.Tagged (untag)

{- HLINT ignore "Use section" -}
Expand All @@ -80,29 +80,29 @@ readSchemaFromSchema = fromSchema
{-# INLINE readSchemaFromSchema #-}

-- | Serialises an individual value into Avro with the schema provided.
encodeValueWithSchema :: ToAvro a => Schema -> a -> BL.ByteString
encodeValueWithSchema :: ToAvro a => Schema -> a -> Lazy.ByteString
encodeValueWithSchema s = toLazyByteString . toAvro s
{-# INLINE encodeValueWithSchema #-}

-- | Serialises an individual value into Avro using the schema
-- from its coresponding 'HasAvroSchema' instance.
encodeValue :: (HasAvroSchema a, ToAvro a) => a -> BL.ByteString
encodeValue :: (HasAvroSchema a, ToAvro a) => a -> Lazy.ByteString
encodeValue a = encodeValueWithSchema (schemaOf a) a
{-# INLINE encodeValue #-}

-- | Deserialises an individual value from Avro.
decodeValueWithSchema :: FromAvro a => ReadSchema -> BL.ByteString -> Either String a
decodeValueWithSchema schema payload =
case runGetOrFail (getValue schema) payload of
Right (bs, _, v) -> fromAvro v
decodeValueWithSchema :: FromAvro a => ReadSchema -> Lazy.ByteString -> Either String a
decodeValueWithSchema readSchema payload =
case runGetOrFail (getValue readSchema) payload of
Right (_, _, v) -> fromAvro v
Left (_, _, e) -> Left e

-- | Deserialises an individual value from Avro using the schema from its coresponding 'HasAvroSchema'.
--
-- __NOTE__: __This function is only to be used when reader and writes schemas are known to be the same.__
-- Because only one schema is known at this point, and it is the reader schema,
-- /no decondlicting/ can be performed.
decodeValue :: forall a. (HasAvroSchema a, FromAvro a) => BL.ByteString -> Either String a
decodeValue :: forall a. (HasAvroSchema a, FromAvro a) => Lazy.ByteString -> Either String a
decodeValue = decodeValueWithSchema (fromSchema (untag @a schema))
{-# INLINE decodeValue #-}

Expand All @@ -112,7 +112,7 @@ decodeValue = decodeValueWithSchema (fromSchema (untag @a schema))
-- error. This means that the consumer will get all the "good" content from
-- the container until the error is detected, then this error and then the list
-- is finished.
decodeContainer :: forall a. (HasAvroSchema a, FromAvro a) => BL.ByteString -> [Either String a]
decodeContainer :: forall a. (HasAvroSchema a, FromAvro a) => Lazy.ByteString -> [Either String a]
decodeContainer = decodeContainerWithReaderSchema (untag @a schema)
{-# INLINE decodeContainer #-}

Expand All @@ -122,7 +122,7 @@ decodeContainer = decodeContainerWithReaderSchema (untag @a schema)
-- error. This means that the consumer will get all the "good" content from
-- the container until the error is detected, then this error and then the list
-- is finished.
decodeContainerWithEmbeddedSchema :: forall a. FromAvro a => BL.ByteString -> [Either String a]
decodeContainerWithEmbeddedSchema :: forall a. FromAvro a => Lazy.ByteString -> [Either String a]
decodeContainerWithEmbeddedSchema payload =
case Container.extractContainerValues (pure . fromSchema) (getValue >=> (either fail pure . fromAvro)) payload of
Left err -> [Left err]
Expand All @@ -137,7 +137,7 @@ decodeContainerWithEmbeddedSchema payload =
-- error. This means that the consumer will get all the "good" content from
-- the container until the error is detected, then this error and then the list
-- is finished.
decodeContainerWithReaderSchema :: forall a. FromAvro a => Schema -> BL.ByteString -> [Either String a]
decodeContainerWithReaderSchema :: forall a. FromAvro a => Schema -> Lazy.ByteString -> [Either String a]
decodeContainerWithReaderSchema readerSchema payload =
case Container.extractContainerValues (flip deconflict readerSchema) (getValue >=> (either fail pure . fromAvro)) payload of
Left err -> [Left err]
Expand All @@ -148,7 +148,7 @@ decodeContainerWithReaderSchema readerSchema payload =
-- This is particularly useful when slicing up containers into one or more
-- smaller files. By extracting the original bytestring it is possible to
-- avoid re-encoding data.
extractContainerValuesBytes :: BL.ByteString -> Either String (Schema, [Either String BL.ByteString])
extractContainerValuesBytes :: Lazy.ByteString -> Either String (Schema, [Either String Lazy.ByteString])
extractContainerValuesBytes =
(fmap . fmap . fmap . fmap) snd . Container.extractContainerValuesBytes (pure . fromSchema) getValue
{-# INLINE extractContainerValuesBytes #-}
Expand All @@ -161,28 +161,28 @@ extractContainerValuesBytes =
-- avoid re-encoding data.
decodeContainerValuesBytes :: forall a. FromAvro a
=> Schema
-> BL.ByteString
-> Either String (Schema, [Either String (a, BL.ByteString)])
-> Lazy.ByteString
-> Either String (Schema, [Either String (a, Lazy.ByteString)])
decodeContainerValuesBytes readerSchema =
Container.extractContainerValuesBytes (flip deconflict readerSchema) (getValue >=> (either fail pure . fromAvro))
{-# INLINE decodeContainerValuesBytes #-}

-- | Encode chunks of values into a container, using 16 random bytes for
-- the synchronization markers and a corresponding 'HasAvroSchema' schema.
-- Blocks are compressed (or not) according to the given 'Codec' ('nullCodec' or 'deflateCodec').
encodeContainer :: forall a. (HasAvroSchema a, ToAvro a) => Codec -> [[a]] -> IO BL.ByteString
encodeContainer :: forall a. (HasAvroSchema a, ToAvro a) => Codec -> [[a]] -> IO Lazy.ByteString
encodeContainer codec = encodeContainerWithSchema codec (untag @a schema)

-- | Encode chunks of values into a container, using 16 random bytes for
-- the synchronization markers. Blocks are compressed (or not) according
-- to the given 'Codec' ('nullCodec' or 'deflateCodec').
encodeContainerWithSchema :: ToAvro a => Codec -> Schema -> [[a]] -> IO BL.ByteString
encodeContainerWithSchema :: ToAvro a => Codec -> Schema -> [[a]] -> IO Lazy.ByteString
encodeContainerWithSchema codec sch xss =
do sync <- Container.newSyncBytes
return $ encodeContainerWithSync codec sch sync xss

-- |Encode chunks of objects into a container, using the provided
-- ByteString as the synchronization markers.
encodeContainerWithSync :: ToAvro a => Codec -> Schema -> BL.ByteString -> [[a]] -> BL.ByteString
encodeContainerWithSync :: ToAvro a => Codec -> Schema -> Lazy.ByteString -> [[a]] -> Lazy.ByteString
encodeContainerWithSync = Container.packContainerValuesWithSync' toAvro
{-# INLINE encodeContainerWithSync #-}
13 changes: 6 additions & 7 deletions src/Data/Avro/Codec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ module Data.Avro.Codec (
import Codec.Compression.Zlib.Internal as Zlib
import qualified Data.Binary.Get as G
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Lazy as Lazy

-- | Block decompression function for blocks of Avro.
type Decompress a = LBS.ByteString -> G.Get a -> Either String a
type Decompress a = Lazy.ByteString -> G.Get a -> Either String a

-- | A `Codec` allows for compression/decompression of a block in an
-- Avro container according to the Avro spec.
Expand All @@ -33,7 +32,7 @@ data Codec = Codec
, codecDecompress :: forall a. Decompress a

-- | Compresses a lazy stream of bytes.
, codecCompress :: LBS.ByteString -> LBS.ByteString
, codecCompress :: Lazy.ByteString -> Lazy.ByteString
}

-- | `nullCodec` specifies @null@ required by Avro spec.
Expand Down Expand Up @@ -61,20 +60,20 @@ deflateCodec =
, codecCompress = deflateCompress
}

deflateCompress :: LBS.ByteString -> LBS.ByteString
deflateCompress :: Lazy.ByteString -> Lazy.ByteString
deflateCompress =
Zlib.compress Zlib.rawFormat Zlib.defaultCompressParams


-- | Internal type to help construct a lazy list of
-- decompressed bytes interleaved with errors if any.
data Chunk
= ChunkRest LBS.ByteString
= ChunkRest Lazy.ByteString
| ChunkBytes ByteString
| ChunkError Zlib.DecompressError


deflateDecompress :: forall a. LBS.ByteString -> G.Get a -> Either String a
deflateDecompress :: forall a. Lazy.ByteString -> G.Get a -> Either String a
deflateDecompress bytes parser = do
let
-- N.B. this list is lazily created which allows us to
Expand Down
Loading

0 comments on commit d363d5e

Please sign in to comment.