Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -246,20 +246,34 @@ public DataFileWriter<D> appendTo(SeekableInput in, OutputStream out) throws IOE
return this;
}

private void init(OutputStream outs) throws IOException {
private void init(OutputStream outs) {
this.underlyingStream = outs;
this.out = new BufferedFileOutputStream(outs);
// Size the output buffer to fit an entire block frame in a single flush:
// maxBlockSize() for compressed data + 20 bytes for two varint-encoded longs
// (up to 10 bytes each) + sync.length for the sync marker
this.out = new BufferedFileOutputStream(outs, maxBlockSize() + 20 + sync.length);
EncoderFactory efactory = new EncoderFactory();
this.vout = efactory.directBinaryEncoder(out, null);
dout.setSchema(schema);
buffer = new NonCopyingByteArrayOutputStream(Math.min((int) (syncInterval * 1.25), Integer.MAX_VALUE / 2 - 1));
buffer = new NonCopyingByteArrayOutputStream(maxBlockSize());
this.bufOut = this.initEncoder.apply(buffer);
if (this.codec == null) {
this.codec = CodecFactory.nullCodec().createInstance();
}
this.isOpen = true;
}

/**
* Returns the estimated maximum compressed block size. Blocks are flushed when
* uncompressed data reaches {@link #syncInterval}, but compression may increase
* size (e.g. uncompressible data with codec framing overhead), so we allow 25%
* headroom. The result is clamped to avoid integer overflow when used for
* buffer allocation.
*/
private int maxBlockSize() {
return Math.min((int) (syncInterval * 1.25), Integer.MAX_VALUE / 2 - 1);
}

private static final SecureRandom RNG = new SecureRandom();

private static byte[] generateSync() {
Expand Down Expand Up @@ -494,6 +508,11 @@ public BufferedFileOutputStream(OutputStream out) {
this.out = new PositionFilter(out);
}

public BufferedFileOutputStream(OutputStream out, int bufferSize) {
super(null, bufferSize);
this.out = new PositionFilter(out);
}

public long tell() {
return position + count;
}
Expand Down
Loading