diff --git a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java index c4e031b75d9..466fa28135c 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java +++ b/lang/java/avro/src/main/java/org/apache/avro/file/DataFileWriter.java @@ -246,13 +246,16 @@ public DataFileWriter appendTo(SeekableInput in, OutputStream out) throws IOE return this; } - private void init(OutputStream outs) throws IOException { + private void init(OutputStream outs) { this.underlyingStream = outs; - this.out = new BufferedFileOutputStream(outs); + // Size the output buffer to fit an entire block frame in a single flush: + // maxBlockSize() for compressed data + 20 bytes for two varint-encoded longs + // (up to 10 bytes each) + sync.length for the sync marker + this.out = new BufferedFileOutputStream(outs, maxBlockSize() + 20 + sync.length); EncoderFactory efactory = new EncoderFactory(); this.vout = efactory.directBinaryEncoder(out, null); dout.setSchema(schema); - buffer = new NonCopyingByteArrayOutputStream(Math.min((int) (syncInterval * 1.25), Integer.MAX_VALUE / 2 - 1)); + buffer = new NonCopyingByteArrayOutputStream(maxBlockSize()); this.bufOut = this.initEncoder.apply(buffer); if (this.codec == null) { this.codec = CodecFactory.nullCodec().createInstance(); @@ -260,6 +263,17 @@ private void init(OutputStream outs) throws IOException { this.isOpen = true; } + /** + * Returns the estimated maximum compressed block size. Blocks are flushed when + * uncompressed data reaches {@link #syncInterval}, but compression may increase + * size (e.g. uncompressible data with codec framing overhead), so we allow 25% + * headroom. The result is clamped to avoid integer overflow when used for + * buffer allocation. + */ + private int maxBlockSize() { + return Math.min((int) (syncInterval * 1.25), Integer.MAX_VALUE / 2 - 1); + } + private static final SecureRandom RNG = new SecureRandom(); private static byte[] generateSync() { @@ -494,6 +508,11 @@ public BufferedFileOutputStream(OutputStream out) { this.out = new PositionFilter(out); } + public BufferedFileOutputStream(OutputStream out, int bufferSize) { + super(null, bufferSize); + this.out = new PositionFilter(out); + } + public long tell() { return position + count; }