diff --git a/columnar_format_qat_wrapper/apply_hive_jars.sh b/columnar_format_qat_wrapper/apply_hive_jars.sh
old mode 100644
new mode 100755
index c28ad58..f3f1231
--- a/columnar_format_qat_wrapper/apply_hive_jars.sh
+++ b/columnar_format_qat_wrapper/apply_hive_jars.sh
@@ -17,11 +17,11 @@
# */
#!/bin/bash
-declare -a supported_CDH_versions=("5.14.2" "6.2.1" "7.0.0")
-declare -A cdh_parquet_format_version_m=( ["5.14.2"]="2.1.0" ["7.0.0"]="2.4.0")
-declare -A cdh_parquet_mr_version_m=( ["5.14.2"]="1.5.0" ["7.0.0"]="1.10.0")
-declare -A cdp_orc_version_m=(["7.0.0"]="1.5.1")
-declare -A cdh_hive_version_m=( ["5.14.2"]="1.1.0" ["7.0.0"]="3.1.0")
+declare -a supported_CDH_versions=("5.14.2" "6.2.1" "7.0.0" "spark3.0.0")
+declare -A cdh_parquet_format_version_m=( ["5.14.2"]="2.1.0" ["7.0.0"]="2.4.0" ["spark3.0.0"]="2.4.0")
+declare -A cdh_parquet_mr_version_m=( ["5.14.2"]="1.5.0" ["7.0.0"]="1.10.0" ["spark3.0.0"]="1.10.1")
+declare -A cdp_orc_version_m=(["7.0.0"]="1.5.1" ["spark3.0.0"]="1.5.10")
+declare -A cdh_hive_version_m=( ["5.14.2"]="1.1.0" ["7.0.0"]="3.1.0" ["spark3.0.0"]="2.3.7")
# Repo Address
PARQUET_MR_REPO=https://github.com/cloudera/parquet-mr
PARQUET_FORMAT_REPO=https://github.com/cloudera/parquet-format
@@ -69,14 +69,16 @@ function check_CDH_version(){
}
apply_patch_to_cdp_orc(){
- pushd $TARGET_DIR
CDH_major_version=$(echo $CDH_release_version | cut -d '.' -f 1)
- if [ "$CDH_major_version" = "7" ]; then
- ORC_BRANCH="rel/release-${cdp_orc_version_m[$CDH_release_version]}"
+ if [ "$CDH_major_version" != "spark3" ]; then
+ pushd $TARGET_DIR
+ if [ "$CDH_major_version" = "7" ]; then
+ ORC_BRANCH="rel/release-${cdp_orc_version_m[$CDH_release_version]}"
+ fi
+ clone_repo $ORC_BRANCH $UPSTREAM_ORC_REPO
+ popd
+ apply_diff_to_orc
fi
- clone_repo $ORC_BRANCH $UPSTREAM_ORC_REPO
- popd
- apply_diff_to_orc
}
apply_diff_to_orc(){
@@ -89,22 +91,24 @@ apply_diff_to_orc(){
}
apply_patch_to_cdh_hive(){
- pushd $TARGET_DIR
CDH_major_version=$(echo $CDH_release_version | cut -d '.' -f 1)
- if [ "$CDH_major_version" = "6" ]; then
- HIVE_BRANCH="cdh$CDH_release_version"
- elif [ "$CDH_major_version" = "7" ]; then
- HIVE_BRANCH="rel/release-${cdh_hive_version_m[$CDH_release_version]}"
- clone_repo $HIVE_BRANCH $UPSTREAM_HIVE_REPO
- popd
- apply_diff_to_hive
- return
- else
- HIVE_BRANCH="cdh$CDH_major_version-${cdh_hive_version_m[$CDH_release_version]}_$CDH_release_version"
+ if [ "$CDH_major_version" != "spark3" ]; then
+ pushd $TARGET_DIR
+ if [ "$CDH_major_version" = "6" ]; then
+ HIVE_BRANCH="cdh$CDH_release_version"
+ elif [ "$CDH_major_version" = "7" ]; then
+ HIVE_BRANCH="rel/release-${cdh_hive_version_m[$CDH_release_version]}"
+ clone_repo $HIVE_BRANCH $UPSTREAM_HIVE_REPO
+ popd
+ apply_diff_to_hive
+ return
+ else
+ HIVE_BRANCH="cdh$CDH_major_version-${cdh_hive_version_m[$CDH_release_version]}_$CDH_release_version"
+ fi
+ clone_repo $HIVE_BRANCH $HIVE_REPO
+ echo yes | cp -rf $HIVE_QAT_DIR/$CDH_release_version/hive $TARGET_DIR/
+ popd
fi
- clone_repo $HIVE_BRANCH $HIVE_REPO
- echo yes | cp -rf $HIVE_QAT_DIR/$CDH_release_version/hive $TARGET_DIR/
- popd
}
apply_diff_to_hive(){
@@ -121,7 +125,7 @@ apply_patch_to_cdh_parquet_format(){
CDH_major_version=$(echo $CDH_release_version | cut -d '.' -f 1)
if [ "$CDH_major_version" = "6" ]; then
PARQUET_FORMAT_BRANCH="cdh$CDH_release_version"
- elif [ "$CDH_major_version" = "7" ]; then
+ elif [ "$CDH_major_version" = "7" ] || [ "$CDH_major_version" = "spark3" ]; then
PARQUET_FORMAT_BRANCH="apache-parquet-format-${cdh_parquet_format_version_m[$CDH_release_version]}"
clone_repo $PARQUET_FORMAT_BRANCH $UPSTREAM_PARQUET_FORMAT_REPO
popd
@@ -137,7 +141,7 @@ apply_patch_to_cdh_parquet_format(){
apply_diff_to_parquet_format(){
CDH_major_version=$(echo $CDH_release_version | cut -d '.' -f 1)
- if [ "$CDH_major_version" = "7" ]; then
+ if [ "$CDH_major_version" = "7" ] || [ "$CDH_major_version" = "spark3" ]; then
pushd $PARQUET_FORMAT_SRC_DIR
git apply --reject --whitespace=fix $HIVE_QAT_DIR/$CDH_release_version/parquet-format/*.diff
popd
@@ -149,7 +153,7 @@ apply_patch_to_cdh_parquet_mr(){
CDH_major_version=$(echo $CDH_release_version | cut -d '.' -f 1)
if [ "$CDH_major_version" = "6" ]; then
PARQUET_MR_BRANCH="cdh$CDH_release_version"
- elif [ "$CDH_major_version" = "7" ]; then
+ elif [ "$CDH_major_version" = "7" ] || [ "$CDH_major_version" = "spark3" ]; then
PARQUET_MR_BRANCH="apache-parquet-${cdh_parquet_mr_version_m[$CDH_release_version]}"
clone_repo $PARQUET_MR_BRANCH $UPSTREAM_PARQUET_MR_REPO
popd
@@ -165,7 +169,7 @@ apply_patch_to_cdh_parquet_mr(){
apply_diff_to_parquet_mr(){
CDH_major_version=$(echo $CDH_release_version | cut -d '.' -f 1)
- if [ "$CDH_major_version" = "7" ]; then
+ if [ "$CDH_major_version" = "7" ] || [ "$CDH_major_version" = "spark3" ]; then
pushd $PARQUET_MR_SRC_DIR
git apply --reject --whitespace=fix $HIVE_QAT_DIR/$CDH_release_version/parquet-mr/*.diff
popd
diff --git a/columnar_format_qat_wrapper/spark3.0.0/orc/orc1.5.10_qat.diff b/columnar_format_qat_wrapper/spark3.0.0/orc/orc1.5.10_qat.diff
new file mode 100644
index 0000000..8d07af4
--- /dev/null
+++ b/columnar_format_qat_wrapper/spark3.0.0/orc/orc1.5.10_qat.diff
@@ -0,0 +1,426 @@
+diff --git a/java/core/pom.xml b/java/core/pom.xml
+index 42e039c..d5a721f 100644
+--- a/java/core/pom.xml
++++ b/java/core/pom.xml
+@@ -31,6 +31,10 @@
+ for the in memory representation.
+
+
++
++ 2.3.0
++
++
+
+
+
+@@ -97,6 +101,11 @@
+ mockito-core
+ test
+
++
++ org.apache.hadoop
++ hadoop_qat_wrapper
++ ${qat.compression.version}
++
+
+
+
+diff --git a/java/core/src/java/org/apache/orc/CompressionKind.java b/java/core/src/java/org/apache/orc/CompressionKind.java
+index 3cffe57..8ae895a 100644
+--- a/java/core/src/java/org/apache/orc/CompressionKind.java
++++ b/java/core/src/java/org/apache/orc/CompressionKind.java
+@@ -23,5 +23,5 @@ package org.apache.orc;
+ * can be applied to ORC files.
+ */
+ public enum CompressionKind {
+- NONE, ZLIB, SNAPPY, LZO, LZ4
++ NONE, ZLIB, SNAPPY, LZO, LZ4, QAT
+ }
+diff --git a/java/core/src/java/org/apache/orc/impl/QATCodec.java b/java/core/src/java/org/apache/orc/impl/QATCodec.java
+new file mode 100644
+index 0000000..33c1ffe
+--- /dev/null
++++ b/java/core/src/java/org/apache/orc/impl/QATCodec.java
+@@ -0,0 +1,135 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.orc.impl;
++
++import org.apache.orc.CompressionCodec;
++import org.apache.hadoop.io.compress.qat.QatCompressor;
++import org.apache.hadoop.io.compress.qat.QatDecompressor;
++
++import java.io.IOException;
++import java.nio.ByteBuffer;
++import java.util.EnumSet;
++
++public class QATCodec implements CompressionCodec, DirectDecompressionCodec {
++ private static final HadoopShims SHIMS = HadoopShimsFactory.get();
++ Boolean direct = null;
++ HadoopShims.DirectDecompressor decompressShim = null;
++
++ private int bufferSize;
++
++ public QATCodec(int bufferSize) {
++ this.bufferSize = bufferSize;
++ }
++
++ @Override
++ public boolean compress(ByteBuffer in, ByteBuffer out,
++ ByteBuffer overflow) throws IOException {
++ QatCompressor compressor = new QatCompressor(bufferSize);
++ int length = in.remaining();
++ compressor.setInput(in.array(), in.arrayOffset() + in.position(), length);
++ compressor.finish();
++ int outSize = 0;
++ int offset = out.arrayOffset() + out.position();
++ while (!compressor.finished() && (length > outSize)) {
++ int size = compressor.compress(out.array(), offset, out.remaining());
++ out.position(size + out.position());
++ outSize += size;
++ offset += size;
++ // if we run out of space in the out buffer, use the overflow
++ if (out.remaining() == 0) {
++ if (overflow == null) {
++ compressor.end();
++ return false;
++ }
++ out = overflow;
++ offset = out.arrayOffset() + out.position();
++ }
++ }
++ compressor.end();
++ return length > outSize;
++ }
++
++ @Override
++ public void decompress(ByteBuffer in, ByteBuffer out) throws IOException {
++ if(in.isDirect() && out.isDirect()) {
++ directDecompress(in, out);
++ return;
++ }
++
++ QatDecompressor decompressor = new QatDecompressor(bufferSize);
++ decompressor.setInput(in.array(), in.arrayOffset() + in.position(), in.remaining());
++ while (!(decompressor.finished() || decompressor.needsDictionary() ||
++ decompressor.needsInput())) {
++ int count =
++ decompressor.decompress(out.array(), out.arrayOffset() + out.position(), out.remaining());
++ out.position(count + out.position());
++ }
++ out.flip();
++ decompressor.end();
++ in.position(in.limit());
++ }
++
++ @Override
++ public boolean isAvailable() {
++ if (direct == null) {
++ try {
++ ensureShim();
++ direct = (decompressShim != null);
++ } catch (UnsatisfiedLinkError ule) {
++ direct = Boolean.valueOf(false);
++ }
++ }
++ return direct.booleanValue();
++ }
++
++ @Override
++ public void directDecompress(ByteBuffer in, ByteBuffer out)
++ throws IOException {
++ ensureShim();
++ decompressShim.decompress(in, out);
++ out.flip(); // flip for read
++ }
++
++ private void ensureShim() {
++ if (decompressShim == null) {
++ decompressShim = SHIMS.getDirectDecompressor(HadoopShims.DirectCompressionType.QAT);
++ }
++ }
++
++ @Override
++ public CompressionCodec modify(EnumSet modifiers) {
++ // QAT allows no modifications
++ return this;
++ }
++
++ @Override
++ public void reset() {
++ if (decompressShim != null) {
++ decompressShim.reset();
++ }
++ }
++
++ @Override
++ public void close() {
++ if (decompressShim != null) {
++ decompressShim.end();
++ }
++ }
++
++}
+diff --git a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+index 34da133..84da988 100644
+--- a/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
++++ b/java/core/src/java/org/apache/orc/impl/ReaderImpl.java
+@@ -462,6 +462,7 @@ public class ReaderImpl implements Reader {
+ case SNAPPY:
+ case LZO:
+ case LZ4:
++ case QAT:
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown compression");
+diff --git a/java/core/src/java/org/apache/orc/impl/WriterImpl.java b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+index 6859c87..99ae2dd 100644
+--- a/java/core/src/java/org/apache/orc/impl/WriterImpl.java
++++ b/java/core/src/java/org/apache/orc/impl/WriterImpl.java
+@@ -78,8 +78,9 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
+ private static final Logger LOG = LoggerFactory.getLogger(WriterImpl.class);
+
+ private static final int MIN_ROW_INDEX_STRIDE = 1000;
+-
+- private final Path path;
++ private static final int DIRECT_BUFFER_SIZE = 300 * 1024;
++
++ private final Path path;
+ private final long stripeSize;
+ private final int rowIndexStride;
+ private final CompressionKind compress;
+@@ -247,6 +248,29 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
+ }
+
+ public static CompressionCodec createCodec(CompressionKind kind) {
++ int bufferSize = DIRECT_BUFFER_SIZE;
++ switch (kind) {
++ case NONE:
++ return null;
++ case ZLIB:
++ return new ZlibCodec();
++ case SNAPPY:
++ return new SnappyCodec();
++ case LZO:
++ return new AircompressorCodec(new LzoCompressor(),
++ new LzoDecompressor());
++ case LZ4:
++ return new AircompressorCodec(new Lz4Compressor(),
++ new Lz4Decompressor());
++ case QAT:
++ return new QATCodec(bufferSize);
++ default:
++ throw new IllegalArgumentException("Unknown compression codec: " +
++ kind);
++ }
++ }
++
++ public static CompressionCodec createCodec(CompressionKind kind, int bufferSize) {
+ switch (kind) {
+ case NONE:
+ return null;
+@@ -260,6 +284,8 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
+ case LZ4:
+ return new AircompressorCodec(new Lz4Compressor(),
+ new Lz4Decompressor());
++ case QAT:
++ return new QATCodec(bufferSize);
+ default:
+ throw new IllegalArgumentException("Unknown compression codec: " +
+ kind);
+@@ -507,6 +533,7 @@ public class WriterImpl implements WriterInternal, MemoryManager.Callback {
+ case SNAPPY: return OrcProto.CompressionKind.SNAPPY;
+ case LZO: return OrcProto.CompressionKind.LZO;
+ case LZ4: return OrcProto.CompressionKind.LZ4;
++ case QAT: return OrcProto.CompressionKind.QAT;
+ default:
+ throw new IllegalArgumentException("Unknown compression " + kind);
+ }
+diff --git a/java/core/src/test/org/apache/orc/impl/TestOrcQATCodec.java b/java/core/src/test/org/apache/orc/impl/TestOrcQATCodec.java
+new file mode 100644
+index 0000000..f691590
+--- /dev/null
++++ b/java/core/src/test/org/apache/orc/impl/TestOrcQATCodec.java
+@@ -0,0 +1,58 @@
++/**
++ * Licensed to the Apache Software Foundation (ASF) under one
++ * or more contributor license agreements. See the NOTICE file
++ * distributed with this work for additional information
++ * regarding copyright ownership. The ASF licenses this file
++ * to you under the Apache License, Version 2.0 (the
++ * "License"); you may not use this file except in compliance
++ * with the License. You may obtain a copy of the License at
++ *
++ * http://www.apache.org/licenses/LICENSE-2.0
++ *
++ * Unless required by applicable law or agreed to in writing, software
++ * distributed under the License is distributed on an "AS IS" BASIS,
++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ * See the License for the specific language governing permissions and
++ * limitations under the License.
++ */
++
++package org.apache.orc.impl;
++
++import org.junit.Test;
++
++import java.io.IOException;
++import java.nio.ByteBuffer;
++
++import static junit.framework.Assert.assertEquals;
++import static junit.framework.Assert.fail;
++
++import org.apache.orc.CompressionCodec;
++
++public class TestOrcQATCodec {
++ final static int DIRECT_BUFFER_SIZE = 256 * 1024;
++
++ @Test
++ public void testNoOverflow() throws Exception {
++ ByteBuffer in = ByteBuffer.allocate(10);
++ ByteBuffer out = ByteBuffer.allocate(10);
++ in.put(new byte[]{1,2,3,4,5,6,7,10});
++ in.flip();
++ CompressionCodec codec = new QATCodec(DIRECT_BUFFER_SIZE);
++ assertEquals(false, codec.compress(in, out, null));
++ }
++
++ @Test
++ public void testCorrupt() throws Exception {
++ ByteBuffer buf = ByteBuffer.allocate(1000);
++ buf.put(new byte[] {31, -117, 8, 4, 0, 0, 0, 0, 0, -1, 12, 0, 81, 90, 8, 0, 0, 0, 0, 0, 0, 0, 0, 0, -85, 111, 96, 72, 78, -6, 15, 0, -12, 61, 8, -54, 6, 0, 0, 0});
++ buf.flip();
++ CompressionCodec codec = new QATCodec(DIRECT_BUFFER_SIZE);
++ ByteBuffer out = ByteBuffer.allocate(1000);
++ try {
++ codec.decompress(buf, out);
++ fail();
++ } catch (IOException ioe) {
++ // EXPECTED
++ }
++ }
++}
+diff --git a/java/shims/pom.xml b/java/shims/pom.xml
+index eb093f6..446c1e2 100644
+--- a/java/shims/pom.xml
++++ b/java/shims/pom.xml
+@@ -34,6 +34,10 @@
+ on the latest version.
+
+
++
++ 2.3.0
++
++
+
+
+
+@@ -59,6 +63,11 @@
+ junit
+ test
+
++
++ org.apache.hadoop
++ hadoop_qat_wrapper
++ ${qat.compression.version}
++
+
+
+
+diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShims.java b/java/shims/src/java/org/apache/orc/impl/HadoopShims.java
+index a2093c6..3260aaa 100644
+--- a/java/shims/src/java/org/apache/orc/impl/HadoopShims.java
++++ b/java/shims/src/java/org/apache/orc/impl/HadoopShims.java
+@@ -37,6 +37,7 @@ public interface HadoopShims {
+ ZLIB_NOHEADER,
+ ZLIB,
+ SNAPPY,
++ QAT,
+ }
+
+ interface DirectDecompressor {
+diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java
+index 618e4c8..05f3c9b 100644
+--- a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java
++++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java
+@@ -22,6 +22,7 @@ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.FSDataInputStream;
+ import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
+ import org.apache.hadoop.io.compress.zlib.ZlibDecompressor;
++import org.apache.hadoop.io.compress.qat.QatDecompressor;
+
+ import java.io.IOException;
+ import java.io.OutputStream;
+@@ -95,6 +96,34 @@ public class HadoopShimsPre2_6 implements HadoopShims {
+ }
+ }
+
++ static class QatDirectDecompressWrapper implements DirectDecompressor {
++ private final QatDecompressor.QatDirectDecompressor root;
++ private boolean isFirstCall = true;
++
++ QatDirectDecompressWrapper(QatDecompressor.QatDirectDecompressor root) {
++ this.root = root;
++ }
++
++ public void decompress(ByteBuffer input, ByteBuffer output) throws IOException {
++ if (!isFirstCall) {
++ root.reset();
++ } else {
++ isFirstCall = false;
++ }
++ root.decompress(input, output);
++ }
++
++ @Override
++ public void reset() {
++ root.reset();
++ }
++
++ @Override
++ public void end() {
++ root.end();
++ }
++ }
++
+ static DirectDecompressor getDecompressor( DirectCompressionType codec) {
+ switch (codec) {
+ case ZLIB:
+@@ -107,6 +136,9 @@ public class HadoopShimsPre2_6 implements HadoopShims {
+ case SNAPPY:
+ return new SnappyDirectDecompressWrapper
+ (new SnappyDirectDecompressor());
++ case QAT:
++ return new QatDirectDecompressWrapper
++ (new QatDecompressor.QatDirectDecompressor());
+ default:
+ return null;
+ }
+diff --git a/proto/orc_proto.proto b/proto/orc_proto.proto
+index 24a62a4..2e6aaa1 100644
+--- a/proto/orc_proto.proto
++++ b/proto/orc_proto.proto
+@@ -230,6 +230,7 @@ enum CompressionKind {
+ LZO = 3;
+ LZ4 = 4;
+ ZSTD = 5;
++ QAT = 6;
+ }
+
+ // Serialized length must be less that 255 bytes
diff --git a/columnar_format_qat_wrapper/spark3.0.0/parquet-format/parquet_format2.4.0_qat.diff b/columnar_format_qat_wrapper/spark3.0.0/parquet-format/parquet_format2.4.0_qat.diff
new file mode 100644
index 0000000..f1907e4
--- /dev/null
+++ b/columnar_format_qat_wrapper/spark3.0.0/parquet-format/parquet_format2.4.0_qat.diff
@@ -0,0 +1,12 @@
+diff --git a/src/main/thrift/parquet.thrift b/src/main/thrift/parquet.thrift
+index fbca9b2..0d23f80 100644
+--- a/src/main/thrift/parquet.thrift
++++ b/src/main/thrift/parquet.thrift
+@@ -465,6 +465,7 @@ enum CompressionCodec {
+ BROTLI = 4; // Added in 2.3.2
+ LZ4 = 5; // Added in 2.3.2
+ ZSTD = 6; // Added in 2.3.2
++ QAT = 7;
+ }
+
+ enum PageType {
diff --git a/columnar_format_qat_wrapper/7.0.0/parquet-mr/parquet-mr1.10.1_qat.diff b/columnar_format_qat_wrapper/spark3.0.0/parquet-mr/parquet-mr1.10.1_qat.diff
similarity index 100%
rename from columnar_format_qat_wrapper/7.0.0/parquet-mr/parquet-mr1.10.1_qat.diff
rename to columnar_format_qat_wrapper/spark3.0.0/parquet-mr/parquet-mr1.10.1_qat.diff