diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java b/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java index e6d58e46a13..1568971d94a 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DeflateCodec.java @@ -20,12 +20,15 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import java.util.zip.DataFormatException; import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.Inflater; -import java.util.zip.InflaterOutputStream; +import org.apache.avro.AvroRuntimeException; import org.apache.avro.util.NonCopyingByteArrayOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implements DEFLATE (RFC1951) compression and decompression. @@ -37,7 +40,33 @@ */ public class DeflateCodec extends Codec { + private static final Logger LOG = LoggerFactory.getLogger(DeflateCodec.class); + private static final int DEFAULT_BUFFER_SIZE = 8192; + private static final String MAX_DECOMPRESS_LENGTH_PROPERTY = "org.apache.avro.limits.decompress.maxLength"; + private static final long DEFAULT_MAX_DECOMPRESS_LENGTH = 200L * 1024 * 1024; // 200MB default limit + + private static final long MAX_DECOMPRESS_LENGTH; + + static { + String prop = System.getProperty(MAX_DECOMPRESS_LENGTH_PROPERTY); + long limit = DEFAULT_MAX_DECOMPRESS_LENGTH; + if (prop != null) { + try { + long parsed = Long.parseLong(prop); + if (parsed <= 0) { + LOG.warn("Invalid value '{}' for property '{}': must be positive. Using default: {}", prop, + MAX_DECOMPRESS_LENGTH_PROPERTY, DEFAULT_MAX_DECOMPRESS_LENGTH); + } else { + limit = parsed; + } + } catch (NumberFormatException e) { + LOG.warn("Could not parse property '{}' value '{}'. Using default: {}", MAX_DECOMPRESS_LENGTH_PROPERTY, prop, + DEFAULT_MAX_DECOMPRESS_LENGTH); + } + } + MAX_DECOMPRESS_LENGTH = limit; + } static class Option extends CodecFactory { private final int compressionLevel; @@ -79,9 +108,30 @@ public ByteBuffer compress(ByteBuffer data) throws IOException { @Override public ByteBuffer decompress(ByteBuffer data) throws IOException { NonCopyingByteArrayOutputStream baos = new NonCopyingByteArrayOutputStream(DEFAULT_BUFFER_SIZE); - try (OutputStream outputStream = new InflaterOutputStream(baos, getInflater())) { - outputStream.write(data.array(), computeOffset(data), data.remaining()); + byte[] buffer = new byte[DEFAULT_BUFFER_SIZE]; + long totalBytes = 0; + + Inflater inflater = getInflater(); + inflater.setInput(data.array(), computeOffset(data), data.remaining()); + + try { + while (!inflater.finished()) { + int len = inflater.inflate(buffer); + if (len == 0 && inflater.needsInput()) { + break; + } + totalBytes += len; + if (totalBytes > MAX_DECOMPRESS_LENGTH) { + throw new AvroRuntimeException( + "Decompressed size " + totalBytes + " (bytes) exceeds maximum allowed size " + MAX_DECOMPRESS_LENGTH + + ". This can be configured by setting the system property '" + MAX_DECOMPRESS_LENGTH_PROPERTY + "'"); + } + baos.write(buffer, 0, len); + } + } catch (DataFormatException e) { + throw new IOException("Invalid deflate data", e); } + return baos.asByteBuffer(); }