From 3205d7fea9bd673ced7b2ab50b1a43b568ece919 Mon Sep 17 00:00:00 2001 From: Aditya Kishore Date: Tue, 7 Jan 2014 13:25:39 -0800 Subject: [PATCH 1/4] Add support for multiple column families in Get, Put and Delete requests. --- src/AtomicIncrementRequest.java | 5 + src/BatchableRpc.java | 35 +++- src/CompareAndSetRequest.java | 5 + src/DeleteRequest.java | 338 ++++++++++++++++++++++++-------- src/GetRequest.java | 157 +++++++++++---- src/HBaseRpc.java | 90 ++++++++- src/MultiAction.java | 44 +++-- src/PutRequest.java | 253 ++++++++++++++++++------ 8 files changed, 714 insertions(+), 213 deletions(-) diff --git a/src/AtomicIncrementRequest.java b/src/AtomicIncrementRequest.java index e556a950..96ff425d 100644 --- a/src/AtomicIncrementRequest.java +++ b/src/AtomicIncrementRequest.java @@ -173,6 +173,11 @@ public byte[] family() { return family; } + @Override + public byte[][] getFamilies() { + return new byte[][] { family }; + } + @Override public byte[] qualifier() { return qualifier; diff --git a/src/BatchableRpc.java b/src/BatchableRpc.java index 64157a91..6bafe8fb 100644 --- a/src/BatchableRpc.java +++ b/src/BatchableRpc.java @@ -44,8 +44,8 @@ abstract class BatchableRpc extends HBaseRpc // So instead we make them package-private so that subclasses can still // access them directly. - /** Family affected by this RPC. */ - /*protected*/ final byte[] family; + /** Families affected by this RPC. */ + /*protected*/ final byte[][] families; /** The timestamp to use for {@link KeyValue}s of this RPC. */ /*protected*/ final long timestamp; @@ -72,16 +72,16 @@ abstract class BatchableRpc extends HBaseRpc * Package private constructor. * @param table The name of the table this RPC is for. * @param row The name of the row this RPC is for. - * @param family The column family to edit in that table. Subclass must - * validate, this class doesn't perform any validation on the family. + * @param families The column families to edit in that table. Subclass must + * validate, this class doesn't perform any validation on the families. * @param timestamp The timestamp to use for {@link KeyValue}s of this RPC. * @param lockid Explicit row lock to use, or {@link RowLock#NO_LOCK}. */ BatchableRpc(final byte[] table, - final byte[] key, final byte[] family, + final byte[] key, final byte[][] families, final long timestamp, final long lockid) { super(table, key); - this.family = family; + this.families = families; this.timestamp = timestamp; this.lockid = lockid; } @@ -116,7 +116,12 @@ public final void setDurable(final boolean durable) { @Override public final byte[] family() { - return family; + return families == null ? null : families[0]; + } + + @Override + public final byte[][] getFamilies() { + return families; } @Override @@ -156,18 +161,34 @@ final boolean canBuffer() { /** * How many {@link KeyValue}s will be serialized by {@link #serializePayload}. + * Used with RPCs with single column families. */ abstract int numKeyValues(); /** * An estimate of the number of bytes needed for {@link #serializePayload}. * The estimate is conservative. + * Used with RPCs with single column families. */ abstract int payloadSize(); /** * Serialize the part of this RPC for a {@link MultiAction}. + * Used with RPCs with single column families. */ abstract void serializePayload(final ChannelBuffer buf); + /** + * An estimate of the number of bytes needed for {@link #serializePayloads}. + * The estimate is conservative. + * Used with RPCs with multiple column families. + */ + abstract int payloadsSize(); + + /** + * Serialize the part of this RPC for a {@link MultiAction}. + * Used with RPCs with multiple column families. + */ + abstract void serializePayloads(final ChannelBuffer buf); + } diff --git a/src/CompareAndSetRequest.java b/src/CompareAndSetRequest.java index def883a3..3e599f60 100644 --- a/src/CompareAndSetRequest.java +++ b/src/CompareAndSetRequest.java @@ -102,6 +102,11 @@ public byte[] family() { return put.family(); } + @Override + public byte[][] getFamilies() { + return put.getFamilies(); + } + @Override public byte[] qualifier() { return put.qualifier(); diff --git a/src/DeleteRequest.java b/src/DeleteRequest.java index 5db050f8..fd8f85b2 100644 --- a/src/DeleteRequest.java +++ b/src/DeleteRequest.java @@ -63,10 +63,11 @@ public final class DeleteRequest extends BatchableRpc private static final byte[][] DELETE_FAMILY_MARKER = new byte[][] { HBaseClient.EMPTY_ARRAY }; - /** Special value for {@link #family} when deleting a whole row. */ - static final byte[] WHOLE_ROW = new byte[0]; + /** Special value for {@link #families} when deleting a whole row. */ + static final byte[][] WHOLE_ROW = + new byte[][] { HBaseClient.EMPTY_ARRAY }; - private final byte[][] qualifiers; + private final byte[][][] qualifiers; /** Whether to delete the value only at the specified timestamp. */ private boolean at_timestamp_only = false; @@ -108,7 +109,23 @@ public DeleteRequest(final byte[] table, final byte[] key, public DeleteRequest(final byte[] table, final byte[] key, final byte[] family) { - this(table, key, family, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + this(table, key, new byte[][] { family }, null, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + } + + /** + * Constructor to delete a specific set of families. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column families to edit in that table. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.1 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[][] families) { + this(table, key, families, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -125,7 +142,8 @@ public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, final long timestamp) { - this(table, key, family, null, timestamp, RowLock.NO_LOCK); + this(table, key, new byte[][] { family }, null, + timestamp, RowLock.NO_LOCK); } /** @@ -142,8 +160,8 @@ public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, final byte[] qualifier) { - this(table, key, family, - qualifier == null ? null : new byte[][] { qualifier }, + this(table, key, new byte[][] { family }, + qualifier == null ? null : new byte[][][] { { qualifier } }, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } @@ -164,8 +182,8 @@ public DeleteRequest(final byte[] table, final byte[] family, final byte[] qualifier, final long timestamp) { - this(table, key, family, - qualifier == null ? null : new byte[][] { qualifier }, + this(table, key, new byte[][] { family }, + qualifier == null ? null : new byte[][][] { { qualifier } }, timestamp, RowLock.NO_LOCK); } @@ -183,8 +201,26 @@ public DeleteRequest(final byte[] table, final byte[] key, final byte[] family, final byte[][] qualifiers) { - this(table, key, family, qualifiers, - KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + this(table, key, new byte[][] { family }, + new byte[][][] { qualifiers }, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + } + + /** + * Constructor to delete a specific number of cells of a set of column + * families in a row. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column family to edit in that table. + * @param qualifiers The column qualifiers to delete in that family. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.1 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers) { + this(table, key, families, qualifiers, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -203,7 +239,27 @@ public DeleteRequest(final byte[] table, final byte[] family, final byte[][] qualifiers, final long timestamp) { - this(table, key, family, qualifiers, timestamp, RowLock.NO_LOCK); + this(table, key, new byte[][] { family }, + new byte[][][] { qualifiers }, timestamp, RowLock.NO_LOCK); + } + + /** + * Constructor to delete a specific number of cells in a row. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column families to edit in that table. + * @param qualifiers The column qualifiers to delete in that family. + * @param timestamp The timestamp to set on this edit. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.2 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final long timestamp) { + this(table, key, families, qualifiers, timestamp, RowLock.NO_LOCK); } /** @@ -221,8 +277,8 @@ public DeleteRequest(final byte[] table, final byte[] family, final byte[] qualifier, final RowLock lock) { - this(table, key, family, - qualifier == null ? null : new byte[][] { qualifier }, + this(table, key, new byte[][] { family }, + qualifier == null ? null : new byte[][][] { { qualifier } }, KeyValue.TIMESTAMP_NOW, lock.id()); } @@ -244,8 +300,8 @@ public DeleteRequest(final byte[] table, final byte[] qualifier, final long timestamp, final RowLock lock) { - this(table, key, family, - qualifier == null ? null : new byte[][] { qualifier }, + this(table, key, new byte[][] { family }, + qualifier == null ? null : new byte[][][] { { qualifier } }, timestamp, lock.id()); } @@ -266,7 +322,28 @@ public DeleteRequest(final byte[] table, final byte[] family, final byte[][] qualifiers, final RowLock lock) { - this(table, key, family, qualifiers, KeyValue.TIMESTAMP_NOW, lock.id()); + this(table, key, new byte[][] { family }, + new byte[][][] { qualifiers }, KeyValue.TIMESTAMP_NOW, lock.id()); + } + + /** + * Constructor to delete a specific number of cells in a row. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column families to edit in that table. + * @param qualifiers The column qualifiers to delete in that family. + * Can be {@code null}. + * @param lock An explicit row lock to use with this request. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.1 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final RowLock lock) { + this(table, key, families, qualifiers, KeyValue.TIMESTAMP_NOW, lock.id()); } /** @@ -288,7 +365,30 @@ public DeleteRequest(final byte[] table, final byte[][] qualifiers, final long timestamp, final RowLock lock) { - this(table, key, family, qualifiers, timestamp, lock.id()); + this(table, key, new byte[][] { family }, + new byte[][][] { qualifiers }, timestamp, lock.id()); + } + + /** + * Constructor to delete a specific number of cells in a row with a row lock. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column families to edit in that table. + * @param qualifiers The column qualifiers to delete in that family. + * Can be {@code null}. + * @param timestamp The timestamp to set on this edit. + * @param lock An explicit row lock to use with this request. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.2 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final long timestamp, + final RowLock lock) { + this(table, key, families, qualifiers, timestamp, lock.id()); } /** @@ -313,8 +413,8 @@ public DeleteRequest(final String table, final String key) { public DeleteRequest(final String table, final String key, final String family) { - this(table.getBytes(), key.getBytes(), family.getBytes(), null, - KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + this(table.getBytes(), key.getBytes(), new byte[][] { family.getBytes() }, + null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -330,8 +430,8 @@ public DeleteRequest(final String table, final String key, final String family, final String qualifier) { - this(table.getBytes(), key.getBytes(), family.getBytes(), - qualifier == null ? null : new byte[][] { qualifier.getBytes() }, + this(table.getBytes(), key.getBytes(), new byte[][] { family.getBytes() }, + qualifier == null ? null : new byte[][][] { { qualifier.getBytes() } }, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } @@ -350,8 +450,8 @@ public DeleteRequest(final String table, final String family, final String qualifier, final RowLock lock) { - this(table.getBytes(), key.getBytes(), family.getBytes(), - qualifier == null ? null : new byte[][] { qualifier.getBytes() }, + this(table.getBytes(), key.getBytes(), new byte[][] { family.getBytes() }, + qualifier == null ? null : new byte[][][] { { qualifier.getBytes() } }, KeyValue.TIMESTAMP_NOW, lock.id()); } @@ -364,8 +464,8 @@ public DeleteRequest(final String table, * @since 1.2 */ public DeleteRequest(final byte[] table, final KeyValue kv) { - this(table, kv.key(), kv.family(), new byte[][] { kv.qualifier() }, - kv.timestamp(), RowLock.NO_LOCK); + this(table, kv.key(), new byte[][] { kv.family() }, + new byte[][][] { { kv.qualifier() } }, kv.timestamp(), RowLock.NO_LOCK); } /** @@ -380,38 +480,47 @@ public DeleteRequest(final byte[] table, final KeyValue kv) { public DeleteRequest(final byte[] table, final KeyValue kv, final RowLock lock) { - this(table, kv.key(), kv.family(), new byte[][] { kv.qualifier() }, - kv.timestamp(), lock.id()); + this(table, kv.key(), new byte[][] { kv.family() }, + new byte[][][] { { kv.qualifier() } }, kv.timestamp(), lock.id()); } /** Private constructor. */ private DeleteRequest(final byte[] table, final byte[] key, - final byte[] family, - final byte[][] qualifiers, + final byte[][] families, + final byte[][][] qualifiers, final long timestamp, final long lockid) { - super(table, key, family == null ? WHOLE_ROW : family, timestamp, lockid); - if (family != null) { - KeyValue.checkFamily(family); + super(table, key, families == null ? WHOLE_ROW : families, + timestamp, lockid); + if (families != null) { + for (byte[] family : families) { + KeyValue.checkFamily(family); + } } if (qualifiers != null) { - if (family == null) { + if (families == null) { throw new IllegalArgumentException("You can't delete specific qualifiers" + " without specifying which family they belong to." - + " table=" + Bytes.pretty(table) + + " table=" + Bytes.pretty(table) + ", key=" + Bytes.pretty(key)); + } else if (families.length != qualifiers.length) { + throw new IllegalArgumentException("Length of the qualifier array does" + + " not match that of the family." + + " table=" + Bytes.pretty(table) + + ", key=" + Bytes.pretty(key)); } - for (final byte[] qualifier : qualifiers) { - KeyValue.checkQualifier(qualifier); + for (int i = 0; i < families.length; i++) { + if (qualifiers[i] == null) { + continue; + } + for (final byte[] qualifier : qualifiers[i]) { + KeyValue.checkQualifier(qualifier); + } } - this.qualifiers = qualifiers; - } else { - // No specific qualifier to delete: delete the entire family. Not that - // if `family == null', we'll delete the whole row anyway. - this.qualifiers = DELETE_FAMILY_MARKER; } + this.qualifiers = qualifiers; } /** @@ -452,11 +561,19 @@ public byte[] key() { @Override public byte[][] qualifiers() { + return qualifiers == null ? null : qualifiers[0]; + } + + /** + * {@inheritDoc} + */ + @Override + public byte[][][] getQualifiers() { return qualifiers; } public String toString() { - return super.toStringWithQualifiers("DeleteRequest", family, qualifiers); + return super.toStringWithQualifiers("DeleteRequest", families, qualifiers); } // ---------------------- // @@ -479,25 +596,35 @@ byte code() { @Override int numKeyValues() { - return qualifiers.length; + return (qualifiers != null && qualifiers[0] != null) + ? qualifiers[0].length + : 1; } @Override void serializePayload(final ChannelBuffer buf) { - if (family == null) { + serializePayload(buf, 0); + } + + private void serializePayload(final ChannelBuffer buf, int family_idx) { + if (families == WHOLE_ROW) { return; // No payload when deleting whole rows. } // Are we deleting a whole family at once or just a bunch of columns? - final byte type = (qualifiers == DELETE_FAMILY_MARKER + final boolean has_qualifiers = + (qualifiers != null && qualifiers[family_idx] != null); + final byte[][] family_qualifiers = !has_qualifiers + ? DELETE_FAMILY_MARKER + : qualifiers[family_idx]; + final byte type = (!has_qualifiers ? KeyValue.DELETE_FAMILY : (at_timestamp_only ? KeyValue.DELETE : KeyValue.DELETE_COLUMN)); - // Write the KeyValues - for (final byte[] qualifier : qualifiers) { + for (final byte[] qualifier : family_qualifiers) { KeyValue.serialize(buf, type, timestamp, - key, family, qualifier, null); + key, families[family_idx], qualifier, null); } } @@ -523,19 +650,34 @@ private int predictSerializedSize() { size += 8; // long: Timestamp. size += 8; // long: Lock ID. size += 4; // int: Number of families. - size += 1; // vint: Family length (guaranteed on 1 byte). - if (family == null) { - return size; + + size += payloadsSize(); + + return size; + } + + @Override + int payloadsSize() { + int size = 0; + if (families != WHOLE_ROW) { + for (int i = 0; i < families.length; i++) { + size += 1; // vint: Family length (guaranteed on 1 byte). + size += families[i].length; // The column family. + size += 4; // int: Number of KeyValues for this family. + size += payloadSize(i); + } } - size += family.length; // The column family. - size += 4; // int: Number of KeyValues for this family. - return size + payloadSize(); + return size; } /** Returns the serialized size of all the {@link KeyValue}s in this RPC. */ @Override int payloadSize() { - if (family == WHOLE_ROW) { + return payloadSize(0); + } + + private int payloadSize(int family_idx) { + if (families == WHOLE_ROW) { return 0; // No payload when deleting whole rows. } int size = 0; @@ -545,11 +687,15 @@ int payloadSize() { size += 2; // short:Length of the key. size += key.length; // The row key (again!). size += 1; // byte: Family length (again!). - size += family.length; // The column family (again!). + size += families[family_idx].length; // The column family (again!). size += 8; // long: The timestamp (again!). size += 1; // byte: The type of KeyValue. - size *= qualifiers.length; - for (final byte[] qualifier : qualifiers) { + final byte[][] family_qualifiers = + (qualifiers == null || qualifiers[family_idx] == null) + ? DELETE_FAMILY_MARKER + : qualifiers[family_idx]; + size *= family_qualifiers.length; + for (final byte[] qualifier : family_qualifiers) { size += qualifier.length; // The column qualifier. } return size; @@ -561,29 +707,37 @@ MutationProto toMutationProto() { .setRow(Bytes.wrap(key)) .setMutateType(MutationProto.MutationType.DELETE); - if (family != WHOLE_ROW) { + if (families != WHOLE_ROW) { final MutationProto.ColumnValue.Builder columns = // All columns ... - MutationProto.ColumnValue.newBuilder() - .setFamily(Bytes.wrap(family)); // ... for this family. - - final MutationProto.DeleteType type = - (qualifiers == DELETE_FAMILY_MARKER - ? MutationProto.DeleteType.DELETE_FAMILY - : (at_timestamp_only - ? MutationProto.DeleteType.DELETE_ONE_VERSION - : MutationProto.DeleteType.DELETE_MULTIPLE_VERSIONS)); - - // Now add all the qualifiers to delete. - for (int i = 0; i < qualifiers.length; i++) { - final MutationProto.ColumnValue.QualifierValue column = - MutationProto.ColumnValue.QualifierValue.newBuilder() - .setQualifier(Bytes.wrap(qualifiers[i])) - .setTimestamp(timestamp) - .setDeleteType(type) - .build(); - columns.addQualifierValue(column); + MutationProto.ColumnValue.newBuilder(); + for (int i = 0; i < families.length; i++) { + byte[] family = families[i]; + columns.clear(); + columns.setFamily(Bytes.wrap(family)); // ... for this family. + + if (qualifiers != null) { + final MutationProto.DeleteType type = + (qualifiers[i] == null + ? MutationProto.DeleteType.DELETE_FAMILY + : (at_timestamp_only + ? MutationProto.DeleteType.DELETE_ONE_VERSION + : MutationProto.DeleteType.DELETE_MULTIPLE_VERSIONS)); + + // Now add all the qualifiers to delete. + if (qualifiers[i] != null) { + for (int j = 0; j < qualifiers[i].length; j++) { + final MutationProto.ColumnValue.QualifierValue column = + MutationProto.ColumnValue.QualifierValue.newBuilder() + .setQualifier(Bytes.wrap(qualifiers[i][j])) + .setTimestamp(timestamp) + .setDeleteType(type) + .build(); + columns.addQualifierValue(column); + } + } + } + del.addColumnValue(columns); } - del.addColumnValue(columns); } if (!durable) { @@ -623,19 +777,31 @@ private ChannelBuffer serializeOld(final byte server_version) { buf.writeLong(lockid); // Lock ID. // Families. - if (family == WHOLE_ROW) { + if (families == WHOLE_ROW) { buf.writeInt(0); // Number of families that follow. return buf; } - buf.writeInt(1); // Number of families that follow. - // Each family is then written like so: - writeByteArray(buf, family); // Column family name. - buf.writeInt(qualifiers.length); // How many KeyValues for this family? - serializePayload(buf); + buf.writeInt(families.length); // Number of families that follow. + serializePayloads(buf); // All families + return buf; } + @Override + void serializePayloads(ChannelBuffer buf) { + for (int i = 0; i < families.length; i++) { + // Each family is then written like so: + writeByteArray(buf, families[i]); // Column family name. + final byte[][] family_qualifiers = + (qualifiers != null && qualifiers[i] != null) + ? qualifiers[i] + : DELETE_FAMILY_MARKER; + buf.writeInt(family_qualifiers.length); // How many KeyValues for this family? + serializePayload(buf, i); + } + } + @Override Object deserialize(final ChannelBuffer buf, int cell_size) { HBaseRpc.ensureNoCell(cell_size); diff --git a/src/GetRequest.java b/src/GetRequest.java index 240ebbc6..aeb0d88b 100644 --- a/src/GetRequest.java +++ b/src/GetRequest.java @@ -50,8 +50,8 @@ public final class GetRequest extends HBaseRpc private static final byte[] EXISTS = new byte[] { 'e', 'x', 'i', 's', 't', 's' }; - private byte[] family; // TODO(tsuna): Handle multiple families? - private byte[][] qualifiers; + private byte[][] families; + private byte[][][] qualifiers; private long lockid = RowLock.NO_LOCK; /** @@ -142,6 +142,24 @@ public GetRequest(final byte[] table, this.qualifier(qualifier); } + /** + * Constructor. + * These byte arrays will NOT be copied. + * @param table The non-empty name of the table to use. + * @param key The row key to get in that table. + * @param families The column families. + * @param qualifiers The column qualifiers. + * @since 1.5 + */ + public GetRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers) { + super(table, key); + this.families(families); + this.qualifiers(qualifiers); + } + /** * Constructor. * @param table The non-empty name of the table to use. @@ -211,7 +229,7 @@ private boolean isGetRequest() { */ public GetRequest family(final byte[] family) { KeyValue.checkFamily(family); - this.family = family; + this.families = new byte[][] { family }; return this; } @@ -220,6 +238,33 @@ public GetRequest family(final String family) { return family(family.getBytes()); } + /** + * Specifies the set of column families to get. + * @param families The column families. + * This byte array will NOT be copied. + * @return {@code this}, always. + */ + public GetRequest families(final byte[][] families) { + for (byte[] family : families) { + KeyValue.checkFamily(family); + } + this.families = families; + return this; + } + + /** Specifies the set of column families to get. */ + public GetRequest families(final String[] families) { + for (String family : families) { + KeyValue.checkFamily(family.getBytes()); + } + this.families = new byte[families.length][]; + int i = 0; + for (String family : families) { + this.families[i++] = family.getBytes(); + } + return this; + } + /** * Specifies a particular column qualifier to get. * @param qualifier The column qualifier. @@ -231,7 +276,7 @@ public GetRequest qualifier(final byte[] qualifier) { throw new NullPointerException("qualifier"); } KeyValue.checkQualifier(qualifier); - this.qualifiers = new byte[][] { qualifier }; + this.qualifiers = new byte[][][] { { qualifier } }; return this; } @@ -249,6 +294,27 @@ public GetRequest qualifiers(final byte[][] qualifiers) { for (final byte[] qualifier : qualifiers) { KeyValue.checkQualifier(qualifier); } + this.qualifiers = new byte[][][] { qualifiers }; + return this; + } + + /** + * Specifies a particular set of column qualifiers to get. + * @param qualifiers The column qualifiers. + * This byte array will NOT be copied. + * @return {@code this}, always. + * @since 1.1 + */ + public GetRequest qualifiers(final byte[][][] qualifiers) { + if (qualifiers == null) { + throw new NullPointerException("qualifiers"); + } + for (final byte[][] family_qualifiers : qualifiers) { + if (family_qualifiers == null) continue; + for (final byte[] qualifier : family_qualifiers) { + KeyValue.checkQualifier(qualifier); + } + } this.qualifiers = qualifiers; return this; } @@ -313,17 +379,27 @@ public byte[] key() { @Override public byte[] family() { - return family; + return families[0]; + } + + @Override + public byte[][] getFamilies() { + return families; } @Override public byte[][] qualifiers() { + return qualifiers[0]; + } + + @Override + public byte[][][] getQualifiers() { return qualifiers; } public String toString() { final String klass = isGetRequest() ? "GetRequest" : "Exists"; - return super.toStringWithQualifiers(klass, family, qualifiers); + return super.toStringWithQualifiers(klass, families, qualifiers); } // ---------------------- // @@ -359,16 +435,20 @@ private int predictSerializedSize(final byte server_version) { size += 8; // long: Maximum timestamp. size += 1; // byte: Boolean: "all time". size += 4; // int: Number of families. - if (family != null) { - size += 1; // vint: Family length (guaranteed on 1 byte). - size += family.length; // The family. - size += 1; // byte: Boolean: do we want specific qualifiers? - if (qualifiers != null) { - size += 4; // int: How many qualifiers follow? - for (final byte[] qualifier : qualifiers) { - size += 3; // vint: Qualifier length. - size += qualifier.length; // The qualifier. + if (families != null) { + int family_idx = 0; + for (byte family[] : families) { + size += 1; // vint: Family length (guaranteed on 1 byte). + size += family.length; // The family. + size += 1; // byte: Boolean: do we want specific qualifiers? + if (qualifiers != null && qualifiers[family_idx] != null) { + size += 4; // int: How many qualifiers follow? + for (final byte[] qualifier : qualifiers[family_idx]) { + size += 3; // vint: Qualifier length. + size += qualifier.length; // The qualifier. + } } + ++family_idx; } } if (server_version >= RegionClient.SERVER_VERSION_092_OR_ABOVE) { @@ -385,15 +465,18 @@ ChannelBuffer serialize(final byte server_version) { final ClientPB.Get.Builder getpb = ClientPB.Get.newBuilder() .setRow(Bytes.wrap(key)); - if (family != null) { - final ClientPB.Column.Builder column = ClientPB.Column.newBuilder(); - column.setFamily(Bytes.wrap(family)); - if (qualifiers != null) { - for (final byte[] qualifier : qualifiers) { - column.addQualifier(Bytes.wrap(qualifier)); - } + if (families != null) { + for (int family_idx = 0; family_idx < families.length; ++family_idx) { + byte[] family = families[family_idx]; + final ClientPB.Column.Builder column = ClientPB.Column.newBuilder(); + column.setFamily(Bytes.wrap(family)); + if (qualifiers != null && qualifiers[family_idx] != null) { + for (final byte[] qualifier : qualifiers[family_idx]) { + column.addQualifier(Bytes.wrap(qualifier)); + } + } + getpb.addColumn(column.build()); } - getpb.addColumn(column.build()); } // TODO: Filters. @@ -446,19 +529,23 @@ private ChannelBuffer serializeOld(final byte server_version) { // all possible times. Not sure why it's part of the serialized RPC... // Families. - buf.writeInt(family != null ? 1 : 0); // Number of families that follow. - - if (family != null) { - // Each family is then written like so: - writeByteArray(buf, family); // Column family name. - if (qualifiers != null) { - buf.writeByte(0x01); // Boolean: We want specific qualifiers. - buf.writeInt(qualifiers.length); // How many qualifiers do we want? - for (final byte[] qualifier : qualifiers) { - writeByteArray(buf, qualifier); // Column qualifier name. + buf.writeInt(families != null ? families.length : 0); // Number of families that follow. + + if (families != null) { + for (int family_idx = 0; family_idx < families.length; ++family_idx) { + byte[] family = families[family_idx]; + + // Each family is then written like so: + writeByteArray(buf, family); // Column family name. + if (qualifiers != null && qualifiers[family_idx] != null) { + buf.writeByte(0x01); // Boolean: We want specific qualifiers. + buf.writeInt(qualifiers[family_idx].length); // How many qualifiers do we want? + for (final byte[] qualifier : qualifiers[family_idx]) { + writeByteArray(buf, qualifier); // Column qualifier name. + } + } else { + buf.writeByte(0x00); // Boolean: we don't want specific qualifiers. } - } else { - buf.writeByte(0x00); // Boolean: we don't want specific qualifiers. } } if (server_version >= RegionClient.SERVER_VERSION_092_OR_ABOVE) { diff --git a/src/HBaseRpc.java b/src/HBaseRpc.java index 5d8ab93b..e1d5ed46 100644 --- a/src/HBaseRpc.java +++ b/src/HBaseRpc.java @@ -27,6 +27,7 @@ package org.hbase.async; import java.io.IOException; +import java.util.Arrays; import com.google.protobuf.AbstractMessageLite; import com.google.protobuf.CodedOutputStream; @@ -88,11 +89,18 @@ public interface HasKey { */ public interface HasFamily { /** - * Returns the family this RPC is for. + * Returns the first family of this RPC. *

* DO NOT MODIFY THE CONTENTS OF THE ARRAY RETURNED. */ public byte[] family(); + /** + * Returns all families of this RPC. + *

+ * DO NOT MODIFY THE CONTENTS OF THE ARRAY RETURNED. + * @since 1.5 + */ + public byte[][] getFamilies(); } /** @@ -114,11 +122,18 @@ public interface HasQualifier { */ public interface HasQualifiers { /** - * Returns the column qualifiers this RPC is for. + * Returns the column qualifiers for the first column family of this RPC. *

* DO NOT MODIFY THE CONTENTS OF THE ARRAY RETURNED. */ public byte[][] qualifiers(); + /** + * Returns the column qualifiers all column families of this RPC. + *

+ * DO NOT MODIFY THE CONTENTS OF THE ARRAY RETURNED. + * @since 1.5 + */ + public byte[][][] getQualifiers(); } /** @@ -145,6 +160,12 @@ public interface HasValues { * DO NOT MODIFY THE CONTENTS OF THE ARRAY RETURNED. */ public byte[][] values(); + /** + * Returns the values all column families of this RPC. + *

+ * DO NOT MODIFY THE CONTENTS OF THE ARRAY RETURNED. + */ + public byte[][][] getValues(); } /** @@ -568,19 +589,70 @@ final String toStringWithQualifiers(final String classname, final byte[][] qualifiers, final byte[][] values, final String fields) { + return toStringWithQualifiers(classname, new byte[][] { family }, + new byte[][][] { qualifiers }, new byte[][][] { values }, fields); + } + + /** + * Helper for subclass's {@link #toString} implementations. + *

+ * This is used by subclasses such as {@link DeleteRequest} + * or {@link GetRequest}, to avoid code duplication. + * @param classname The name of the class of the caller. + * @param families A non-empty list of families or null. + * @param qualifiers A non-empty list of qualifiers or null. + */ + final String toStringWithQualifiers(final String classname, + final byte[][] families, + final byte[][][] qualifiers) { + return toStringWithQualifiers(classname, families, qualifiers, null, ""); + } + + /** + * Helper for subclass's {@link #toString} implementations. + *

+ * This is used by subclasses such as {@link DeleteRequest} + * or {@link GetRequest}, to avoid code duplication. + * @param classname The name of the class of the caller. + * @param families A non-empty list of families or null. + * @param qualifiers A non-empty list of qualifiers or null. + * @param values A non-empty list of values or null. + * @param fields Additional fields to include in the output. + */ + final String toStringWithQualifiers(final String classname, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, + final String fields) { final StringBuilder buf = new StringBuilder(256 // min=182 + fields.length()); buf.append(classname).append("(table="); Bytes.pretty(buf, table); buf.append(", key="); Bytes.pretty(buf, key); - buf.append(", family="); - Bytes.pretty(buf, family); - buf.append(", qualifiers="); - Bytes.pretty(buf, qualifiers); - if (values != null) { - buf.append(", values="); - Bytes.pretty(buf, values); + buf.append(", families="); + if (families == null + || families.length == 0 + || families == DeleteRequest.WHOLE_ROW) { + buf.append("null"); + } else { + buf.append("["); + for (int family_idx = 0; family_idx < families.length; ++family_idx) { + buf.append("{name="); + byte[] family = families[family_idx]; + Bytes.pretty(buf, family); + if (qualifiers != null && qualifiers[family_idx] != null) { + buf.append(", qualifiers="); + Bytes.pretty(buf, qualifiers[family_idx]); + } + if (values != null && values[family_idx] != null) { + buf.append(", values="); + Bytes.pretty(buf, values[family_idx]); + } + buf.append("}, "); + } + buf.setLength(buf.length()-2); + buf.append("]"); } buf.append(fields); buf.append(", attempt=").append(attempt) diff --git a/src/MultiAction.java b/src/MultiAction.java index 6dae3d0d..6b4c46c4 100644 --- a/src/MultiAction.java +++ b/src/MultiAction.java @@ -151,13 +151,15 @@ private int predictSerializedSize(final byte server_version) { final byte[] region_name = rpc.getRegion().name(); final boolean new_region = !Bytes.equals(prev.getRegion().name(), region_name); - final byte[] family = rpc.family(); + final byte[][] families = rpc.getFamilies(); final boolean new_key = (new_region || prev.code() != rpc.code() || !Bytes.equals(prev.key, rpc.key) - || family == DeleteRequest.WHOLE_ROW); + // do not coalesce RPCs with multi CFs + || families.length > 1 || prev.getFamilies().length > 1 + || families == DeleteRequest.WHOLE_ROW); final boolean new_family = new_key || !Bytes.equals(prev.family(), - family); + families[0]); if (new_region) { size += 3; // vint: region name length (3 bytes => max length = 32768). @@ -190,8 +192,13 @@ private int predictSerializedSize(final byte server_version) { } if (new_family) { + if (families.length > 1) { + size += rpc.payloadsSize(); + prev = rpc; + continue; + } size += 1; // vint: Family length (guaranteed on 1 byte). - size += family.length; // The family. + size += families[0].length; // The family. size += 4; // int: Number of KeyValues that follow. if (rpc.code() == PutRequest.CODE) { size += 4; // int: Total number of bytes for all those KeyValues. @@ -281,13 +288,15 @@ private ChannelBuffer serializeOld(final byte server_version) { final byte[] region_name = rpc.getRegion().name(); final boolean new_region = !Bytes.equals(prev.getRegion().name(), region_name); - final byte[] family = rpc.family(); + final byte[][] families = rpc.getFamilies(); final boolean new_key = (new_region || prev.code() != rpc.code() || !Bytes.equals(prev.key, rpc.key) - || family == DeleteRequest.WHOLE_ROW); + // do not coalesce RPCs with multi CFs + || families.length > 1 || prev.getFamilies().length > 1 + || families == DeleteRequest.WHOLE_ROW); final boolean new_family = new_key || !Bytes.equals(prev.family(), - family); + families[0]); if (new_key && use_multi && nkeys_index > 0) { buf.writeInt(0); // Number of "attributes" for the last key (none). @@ -374,13 +383,20 @@ private ChannelBuffer serializeOld(final byte server_version) { nbytes_per_family = 0; } - if (family == DeleteRequest.WHOLE_ROW) { + if (families == DeleteRequest.WHOLE_ROW) { prev = rpc; // Short circuit. We have no KeyValue to write. continue; // So loop again directly. + } else if (families.length > 1) { + // do not coalesces RPCs with multiple families + nfamilies = families.length; + rpc.serializePayloads(buf); + nrpcs_per_key++; + prev = rpc; + continue; } nfamilies++; - writeByteArray(buf, family); // The column family. + writeByteArray(buf, families[0]); // The column family. nkeys_per_family_index = buf.writerIndex(); // Number of "KeyValues" that follow. @@ -408,7 +424,7 @@ private ChannelBuffer serializeOld(final byte server_version) { // Note: the only case where nkeys_per_family_index remained -1 throughout // this whole ordeal is where we didn't have any KV to serialize because - // every RPC was a `DeleteRequest.WHOLE_ROW'. + // every RPC was a `DeleteRequest.WHOLE_ROW' or a multi-CF RPC. if (nkeys_per_family_index > 0) { // Monkey-patch everything for the last set of edits. buf.setInt(nkeys_per_family_index, nkeys_per_family); @@ -495,10 +511,14 @@ public int compare(final BatchableRpc a, final BatchableRpc b) { return d; } else if ((d = Bytes.memcmp(a.key, b.key)) != 0) { return d; + } else if (a.getFamilies().length == 1 && b.getFamilies().length == 1) { + // within a row, group all actions with single CF + // in the front so that they can possibly coalesce + return Bytes.memcmp(a.family(), b.family()); } - return Bytes.memcmp(a.family(), b.family()); - } + return (a.getFamilies().length - b.getFamilies().length); + } } /** diff --git a/src/PutRequest.java b/src/PutRequest.java index 186340a8..e451b338 100644 --- a/src/PutRequest.java +++ b/src/PutRequest.java @@ -89,8 +89,8 @@ public final class PutRequest extends BatchableRpc * - qualifiers.length == values.length * - qualifiers.length > 0 */ - private final byte[][] qualifiers; - private final byte[][] values; + private final byte[][][] qualifiers; + private final byte[][][] values; /** * Constructor using current time. @@ -139,8 +139,35 @@ public PutRequest(final byte[] table, final byte[] family, final byte[][] qualifiers, final byte[][] values) { - this(table, key, family, qualifiers, values, - KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + this(table, key, new byte[][] { family }, new byte[][][] { qualifiers }, + new byte[][][] { values }, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + } + + /** + * Constructor for multiple columns in multiple families using current time. + * These byte arrays will NOT be copied. + *

+ * Note: If you want to set your own timestamp, use + * {@link #PutRequest(byte[], byte[], byte[], byte[][], byte[][], long)} + * instead. This constructor will let the RegionServer assign the timestamp + * to this write at the time using {@link System#currentTimeMillis} right + * before the write is persisted to the WAL. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column families to edit in that table. + * @param qualifiers The column qualifiers to edit in that family. + * @param values The corresponding values to store. + * @throws IllegalArgumentException if {@code qualifiers.length == 0} + * or if {@code qualifiers.length != values.length} + * @since 1.3 + */ + public PutRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values) { + this(table, key, families , qualifiers, values, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -182,7 +209,8 @@ public PutRequest(final byte[] table, final byte[][] qualifiers, final byte[][] values, final long timestamp) { - this(table, key, family, qualifiers, values, timestamp, RowLock.NO_LOCK); + this(table, key, new byte[][] { family }, new byte[][][] { qualifiers }, + new byte[][][] { values }, timestamp, RowLock.NO_LOCK); } /** @@ -254,7 +282,32 @@ public PutRequest(final byte[] table, final byte[][] values, final long timestamp, final RowLock lock) { - this(table, key, family, qualifiers, values, timestamp, lock.id()); + this(table, key, new byte[][] { family }, new byte[][][] { qualifiers }, + new byte[][][] { values }, timestamp, lock.id()); + } + + /** + * Constructor for multiple columns with current time and explicit row lock. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column families to edit in that table. + * @param qualifiers The column qualifiers to edit in that family. + * @param values The corresponding values to store. + * @param timestamp The timestamp to set on this edit. + * @param lock An explicit row lock to use with this request. + * @throws IllegalArgumentException if {@code qualifiers.length == 0} + * or if {@code qualifiers.length != values.length} + * @since 1.3 + */ + public PutRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, + final RowLock lock) { + this(table, key, families, qualifiers, values, + KeyValue.TIMESTAMP_NOW, lock.id()); } /** @@ -335,9 +388,8 @@ public PutRequest(final byte[] table, private PutRequest(final byte[] table, final KeyValue kv, final long lockid) { - super(table, kv.key(), kv.family(), kv.timestamp(), lockid); - this.qualifiers = new byte[][] { kv.qualifier() }; - this.values = new byte[][] { kv.value() }; + this(table, kv.key(), kv.family(), + kv.qualifier(), kv.value(), kv.timestamp(), lockid); } /** Private constructor. */ @@ -348,34 +400,60 @@ private PutRequest(final byte[] table, final byte[] value, final long timestamp, final long lockid) { - this(table, key, family, new byte[][] { qualifier }, new byte[][] { value }, - timestamp, lockid); + this(table, key, new byte[][] { family }, new byte[][][] { { qualifier } }, + new byte[][][] { { value } }, timestamp, lockid); } /** Private constructor. */ private PutRequest(final byte[] table, final byte[] key, - final byte[] family, - final byte[][] qualifiers, - final byte[][] values, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, final long timestamp, final long lockid) { - super(table, key, family, timestamp, lockid); - KeyValue.checkFamily(family); - if (qualifiers.length != values.length) { - throw new IllegalArgumentException("Have " + qualifiers.length - + " qualifiers and " + values.length + " values. Should be equal."); - } else if (qualifiers.length == 0) { - throw new IllegalArgumentException("Need at least one qualifier/value."); - } - for (int i = 0; i < qualifiers.length; i++) { - KeyValue.checkQualifier(qualifiers[i]); - KeyValue.checkValue(values[i]); - } + super(table, key, families, timestamp, lockid); + checkParams(families, qualifiers, values); this.qualifiers = qualifiers; this.values = values; } + private void checkParams(final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values) { + if (families.length != qualifiers.length) { + throw new IllegalArgumentException(String.format( + "Mismatch in number of families(%d) and qualifiers(%d) array size.", + families.length, qualifiers.length)); + } else if (families.length != values.length) { + throw new IllegalArgumentException(String.format( + "Mismatch in number of families(%d) and values(%d) array size.", + families.length, values.length)); + } + + for (int idx = 0; idx < families.length; idx++) { + KeyValue.checkFamily(families[idx]); + if (qualifiers[idx] == null || qualifiers[idx].length == 0) { + throw new IllegalArgumentException( + "No qualifiers are specifed for family " + + families[idx] + " at index " + idx); + } else if (values[idx] == null || values[idx].length == 0) { + throw new IllegalArgumentException( + "No values are specifed for family " + + families[idx] + " at index " + idx); + } else if (qualifiers[idx].length != values[idx].length) { + throw new IllegalArgumentException("Found " + + qualifiers[idx].length + " qualifiers and " + + values[idx].length + " values for family " + + families[idx] + " at index " + idx + ". Should be equal."); + } + for (int i = 0; i < qualifiers[idx].length; i++) { + KeyValue.checkQualifier(qualifiers[idx][i]); + KeyValue.checkValue(values[idx][i]); + } + } + } + @Override byte[] method(final byte server_version) { if (server_version >= RegionClient.SERVER_VERSION_095_OR_ABOVE) { @@ -400,15 +478,25 @@ public byte[] key() { */ @Override public byte[] qualifier() { - return qualifiers[0]; + return qualifiers[0][0]; } /** - * {@inheritDoc} + * Returns the qualifiers of first column family in the set of + * edits in this RPC. * @since 1.3 */ @Override public byte[][] qualifiers() { + return qualifiers[0]; + } + + /** + * {@inheritDoc} + * @since 1.5 + */ + @Override + public byte[][][] getQualifiers() { return qualifiers; } @@ -418,7 +506,7 @@ public byte[][] qualifiers() { */ @Override public byte[] value() { - return values[0]; + return values[0][0]; } /** @@ -427,12 +515,21 @@ public byte[] value() { */ @Override public byte[][] values() { + return values[0]; + } + + /** + * {@inheritDoc} + * @since 1.5 + */ + @Override + public byte[][][] getValues() { return values; } public String toString() { return super.toStringWithQualifiers("PutRequest", - family, qualifiers, values, + families, qualifiers, values, ", timestamp=" + timestamp + ", lockid=" + lockid + ", durable=" + durable @@ -462,23 +559,32 @@ byte code() { @Override int numKeyValues() { - return qualifiers.length; + return qualifiers[0].length; } @Override int payloadSize() { + return payloadSize(0); + } + + int payloadSize(int idx) { int size = 0; - for (int i = 0; i < qualifiers.length; i++) { - size += KeyValue.predictSerializedSize(key, family, qualifiers[i], values[i]); + for (int i = 0; i < qualifiers[idx].length; i++) { + size += KeyValue.predictSerializedSize( + key, families[idx], qualifiers[idx][i], values[idx][i]); } return size; } @Override void serializePayload(final ChannelBuffer buf) { - for (int i = 0; i < qualifiers.length; i++) { - KeyValue.serialize(buf, KeyValue.PUT, timestamp, key, family, - qualifiers[i], values[i]); + serializePayload(buf, 0); + } + + private void serializePayload(final ChannelBuffer buf, int idx) { + for (int i = 0; i < qualifiers[idx].length; i++) { + KeyValue.serialize(buf, KeyValue.PUT, timestamp, + key, families[idx], qualifiers[idx][i], values[idx][i]); } } @@ -514,40 +620,52 @@ int predictPutSize() { size += 1; // bool: Whether or not to write to the WAL. size += 4; // int: Number of families for which we have edits. - size += 1; // vint: Family length (guaranteed on 1 byte). - size += family.length; // The family. - size += 4; // int: Number of KeyValues that follow. - size += 4; // int: Total number of bytes for all those KeyValues. - - size += payloadSize(); + size += payloadsSize(); return size; } @Override - MutationProto toMutationProto() { - final MutationProto.ColumnValue.Builder columns = // All columns ... - MutationProto.ColumnValue.newBuilder() - .setFamily(Bytes.wrap(family)); // ... for this family. - - // Now add all the qualifier-value pairs. - for (int i = 0; i < qualifiers.length; i++) { - final MutationProto.ColumnValue.QualifierValue column = - MutationProto.ColumnValue.QualifierValue.newBuilder() - .setQualifier(Bytes.wrap(qualifiers[i])) - .setValue(Bytes.wrap(values[i])) - .setTimestamp(timestamp) - .build(); - columns.addQualifierValue(column); + int payloadsSize() { + int size = 0; + for (int i = 0; i < families.length; i++) { + size += 1; // vint: Family length (guaranteed on 1 byte). + size += families[i].length; // The family. + size += 4; // int: Number of KeyValues that follow. + size += 4; // int: Total number of bytes for all those KeyValues. + size += payloadSize(i); } + return size; + } + @Override + MutationProto toMutationProto() { final MutationProto.Builder put = MutationProto.newBuilder() - .setRow(Bytes.wrap(key)) - .setMutateType(MutationProto.MutationType.PUT) - .addColumnValue(columns); + .setRow(Bytes.wrap(key)) + .setMutateType(MutationProto.MutationType.PUT); if (!durable) { put.setDurability(MutationProto.Durability.SKIP_WAL); } + + final MutationProto.ColumnValue.Builder columns = + MutationProto.ColumnValue.newBuilder(); + for (int family_idx = 0; family_idx < families.length; family_idx++) { + columns.clear(); + columns.setFamily(Bytes.wrap(families[family_idx])); // ... for this family. + + // Now add all the qualifier-value pairs. + for (int i = 0; i < qualifiers[family_idx].length; i++) { + final MutationProto.ColumnValue.QualifierValue column = + MutationProto.ColumnValue.QualifierValue.newBuilder() + .setQualifier(Bytes.wrap(qualifiers[family_idx][i])) + .setValue(Bytes.wrap(values[family_idx][i])) + .setTimestamp(timestamp) + .build(); + columns.addQualifierValue(column); + } + put.addColumnValue(columns); + } + return put.build(); } @@ -599,12 +717,19 @@ void serializeInto(final ChannelBuffer buf) { buf.writeLong(lockid); // Lock ID. buf.writeByte(durable ? 0x01 : 0x00); // Whether or not to use the WAL. - buf.writeInt(1); // Number of families that follow. - writeByteArray(buf, family); // The column family. + buf.writeInt(families.length); // Number of families that follow. + serializePayloads(buf); + } - buf.writeInt(qualifiers.length); // Number of "KeyValues" that follow. - buf.writeInt(payloadSize()); // Size of the KV that follows. - serializePayload(buf); + @Override + void serializePayloads(ChannelBuffer buf) { + for (int i = 0; i < families.length; i++) { + writeByteArray(buf, families[i]); // The column family. + + buf.writeInt(qualifiers[i].length); // Number of "KeyValues" that follow. + buf.writeInt(payloadSize(i)); // Size of the KV that follows. + serializePayload(buf, i); + } } } From bc2951924212ee8203861bf0162ff8c368aa548d Mon Sep 17 00:00:00 2001 From: Aditya Kishore Date: Fri, 17 Jan 2014 13:48:58 -0800 Subject: [PATCH 2/4] Add support for independent timestamps for each cell in a Put request. --- src/HBaseRpc.java | 25 ++++++++ src/PutRequest.java | 140 ++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 154 insertions(+), 11 deletions(-) diff --git a/src/HBaseRpc.java b/src/HBaseRpc.java index e1d5ed46..e9bfa1e5 100644 --- a/src/HBaseRpc.java +++ b/src/HBaseRpc.java @@ -619,10 +619,31 @@ final String toStringWithQualifiers(final String classname, * @param values A non-empty list of values or null. * @param fields Additional fields to include in the output. */ + final String toStringWithQualifiers(final String classname, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, + final String fields) { + return toStringWithQualifiers(classname, families, qualifiers, null, null, ""); + } + + /** + * Helper for subclass's {@link #toString} implementations. + *

+ * This is used by subclasses such as {@link DeleteRequest} + * or {@link GetRequest}, to avoid code duplication. + * @param classname The name of the class of the caller. + * @param families A non-empty list of families or null. + * @param qualifiers A non-empty list of qualifiers or null. + * @param values A non-empty list of values or null. + * @param timestamps A non-empty list of timestamps or null. + * @param fields Additional fields to include in the output. + */ final String toStringWithQualifiers(final String classname, final byte[][] families, final byte[][][] qualifiers, final byte[][][] values, + final long[][] timestamps, final String fields) { final StringBuilder buf = new StringBuilder(256 // min=182 + fields.length()); @@ -649,6 +670,10 @@ final String toStringWithQualifiers(final String classname, buf.append(", values="); Bytes.pretty(buf, values[family_idx]); } + if (timestamps != null && timestamps[family_idx] != null) { + buf.append(", timestamps="); + buf.append(Arrays.toString(timestamps[family_idx])); + } buf.append("}, "); } buf.setLength(buf.length()-2); diff --git a/src/PutRequest.java b/src/PutRequest.java index e451b338..ae0b97e5 100644 --- a/src/PutRequest.java +++ b/src/PutRequest.java @@ -91,6 +91,7 @@ public final class PutRequest extends BatchableRpc */ private final byte[][][] qualifiers; private final byte[][][] values; + private final long[][] timestamps; /** * Constructor using current time. @@ -140,7 +141,37 @@ public PutRequest(final byte[] table, final byte[][] qualifiers, final byte[][] values) { this(table, key, new byte[][] { family }, new byte[][][] { qualifiers }, - new byte[][][] { values }, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + new byte[][][] { values }, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + } + + /** + * Constructor for multiple columns using current time. + * These byte arrays will NOT be copied. + *

+ * Note: If you want to set your own timestamp, use + * {@link #PutRequest(byte[], byte[], byte[], byte[][], byte[][], long)} + * instead. This constructor will let the RegionServer assign the timestamp + * to this write at the time using {@link System#currentTimeMillis} right + * before the write is persisted to the WAL. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param family The column family to edit in that table. + * @param qualifiers The column qualifiers to edit in that family. + * @param values The corresponding values to store. + * @param timestamps The corresponding timestamps to store. + * @throws IllegalArgumentException if {@code qualifiers.length == 0} + * or if {@code qualifiers.length != values.length} + * @since 1.3 + */ + public PutRequest(final byte[] table, + final byte[] key, + final byte[] family, + final byte[][] qualifiers, + final byte[][] values, + final long[] timestamps) { + this(table, key, new byte[][] { family }, new byte[][][] { qualifiers }, + new byte[][][] { values }, new long[][] { timestamps }, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -166,7 +197,7 @@ public PutRequest(final byte[] table, final byte[][] families, final byte[][][] qualifiers, final byte[][][] values) { - this(table, key, families , qualifiers, values, + this(table, key, families , qualifiers, values, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } @@ -210,7 +241,53 @@ public PutRequest(final byte[] table, final byte[][] values, final long timestamp) { this(table, key, new byte[][] { family }, new byte[][][] { qualifiers }, - new byte[][][] { values }, timestamp, RowLock.NO_LOCK); + new byte[][][] { values }, null, timestamp, RowLock.NO_LOCK); + } + + /** + * Constructor for multiple columns with a specific timestamp. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column family to edit in that table. + * @param qualifiers The column qualifiers to edit in that family. + * @param values The corresponding values to store. + * @param timestamp The timestamp to set on this edit. + * @throws IllegalArgumentException if {@code qualifiers.length == 0} + * or if {@code qualifiers.length != values.length} + * @since 1.3 + */ + public PutRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, + final long timestamp) { + this(table, key, families, qualifiers, values, null, + timestamp, RowLock.NO_LOCK); + } + + /** + * Constructor for multiple columns with a specific timestamp. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column family to edit in that table. + * @param qualifiers The column qualifiers to edit in that family. + * @param values The corresponding values to store. + * @param timestamps The corresponding timestamps to store. + * @throws IllegalArgumentException if {@code qualifiers.length == 0} + * or if {@code qualifiers.length != values.length} + * @since 1.3 + */ + public PutRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, + final long[][] timestamps) { + this(table, key, families, qualifiers, values, timestamps, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -283,7 +360,7 @@ public PutRequest(final byte[] table, final long timestamp, final RowLock lock) { this(table, key, new byte[][] { family }, new byte[][][] { qualifiers }, - new byte[][][] { values }, timestamp, lock.id()); + new byte[][][] { values }, null, timestamp, lock.id()); } /** @@ -305,8 +382,34 @@ public PutRequest(final byte[] table, final byte[][] families, final byte[][][] qualifiers, final byte[][][] values, + final long timestamp, final RowLock lock) { - this(table, key, families, qualifiers, values, + this(table, key, families, qualifiers, values, null, + timestamp, lock.id()); + } + + /** + * Constructor for multiple columns with current time and explicit row lock. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column family to edit in that table. + * @param qualifiers The column qualifiers to edit in that family. + * @param values The corresponding values to store. + * @param timestamps The corresponding timestamps to store. + * @param lock An explicit row lock to use with this request. + * @throws IllegalArgumentException if {@code qualifiers.length == 0} + * or if {@code qualifiers.length != values.length} + * @since 1.3 + */ + public PutRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final byte[][][] values, + final long[][] timestamps, + final RowLock lock) { + this(table, key, families, qualifiers, values, timestamps, KeyValue.TIMESTAMP_NOW, lock.id()); } @@ -401,7 +504,7 @@ private PutRequest(final byte[] table, final long timestamp, final long lockid) { this(table, key, new byte[][] { family }, new byte[][][] { { qualifier } }, - new byte[][][] { { value } }, timestamp, lockid); + new byte[][][] { { value } }, null, timestamp, lockid); } /** Private constructor. */ @@ -410,17 +513,20 @@ private PutRequest(final byte[] table, final byte[][] families, final byte[][][] qualifiers, final byte[][][] values, + final long[][] timestamps, final long timestamp, final long lockid) { super(table, key, families, timestamp, lockid); - checkParams(families, qualifiers, values); + checkParams(families, qualifiers, values, timestamps); this.qualifiers = qualifiers; this.values = values; + this.timestamps = timestamps; } private void checkParams(final byte[][] families, final byte[][][] qualifiers, - final byte[][][] values) { + final byte[][][] values, + final long[][] timestamps) { if (families.length != qualifiers.length) { throw new IllegalArgumentException(String.format( "Mismatch in number of families(%d) and qualifiers(%d) array size.", @@ -429,6 +535,10 @@ private void checkParams(final byte[][] families, throw new IllegalArgumentException(String.format( "Mismatch in number of families(%d) and values(%d) array size.", families.length, values.length)); + } else if (timestamps != null && families.length != timestamps.length) { + throw new IllegalArgumentException(String.format( + "Mismatch in number of families(%d) and timestamps(%d) array size.", + families.length, timestamps.length)); } for (int idx = 0; idx < families.length; idx++) { @@ -446,6 +556,13 @@ private void checkParams(final byte[][] families, + qualifiers[idx].length + " qualifiers and " + values[idx].length + " values for family " + families[idx] + " at index " + idx + ". Should be equal."); + } else if (timestamps != null) { // check timestamps if specified + if (qualifiers[idx].length != timestamps[idx].length) { + throw new IllegalArgumentException("Found " + + qualifiers[idx].length + " qualifiers and " + + timestamps[idx].length + " timestamps for family " + + families[idx] + " at index " + idx + ". Should be equal."); + } } for (int i = 0; i < qualifiers[idx].length; i++) { KeyValue.checkQualifier(qualifiers[idx][i]); @@ -529,7 +646,7 @@ public byte[][][] getValues() { public String toString() { return super.toStringWithQualifiers("PutRequest", - families, qualifiers, values, + families, qualifiers, values, timestamps, ", timestamp=" + timestamp + ", lockid=" + lockid + ", durable=" + durable @@ -583,7 +700,8 @@ void serializePayload(final ChannelBuffer buf) { private void serializePayload(final ChannelBuffer buf, int idx) { for (int i = 0; i < qualifiers[idx].length; i++) { - KeyValue.serialize(buf, KeyValue.PUT, timestamp, + KeyValue.serialize(buf, KeyValue.PUT, + timestamps == null ? timestamp : timestamps[idx][i], key, families[idx], qualifiers[idx][i], values[idx][i]); } } @@ -659,7 +777,7 @@ MutationProto toMutationProto() { MutationProto.ColumnValue.QualifierValue.newBuilder() .setQualifier(Bytes.wrap(qualifiers[family_idx][i])) .setValue(Bytes.wrap(values[family_idx][i])) - .setTimestamp(timestamp) + .setTimestamp(timestamps == null ? timestamp : timestamps[family_idx][i]) .build(); columns.addQualifierValue(column); } From 6389e7f14a133d61fe3b790bf42000ccea363822 Mon Sep 17 00:00:00 2001 From: Aditya Kishore Date: Mon, 3 Feb 2014 11:50:07 -0800 Subject: [PATCH 3/4] Add support for independent timestamps for each cell in a Delete request. --- src/DeleteRequest.java | 186 ++++++++++++++++++++++++++++++++--------- 1 file changed, 145 insertions(+), 41 deletions(-) diff --git a/src/DeleteRequest.java b/src/DeleteRequest.java index fd8f85b2..488f5ef8 100644 --- a/src/DeleteRequest.java +++ b/src/DeleteRequest.java @@ -68,6 +68,7 @@ public final class DeleteRequest extends BatchableRpc new byte[][] { HBaseClient.EMPTY_ARRAY }; private final byte[][][] qualifiers; + private final long[][] timestamps; /** Whether to delete the value only at the specified timestamp. */ private boolean at_timestamp_only = false; @@ -80,7 +81,8 @@ public final class DeleteRequest extends BatchableRpc * @throws IllegalArgumentException if any argument is malformed. */ public DeleteRequest(final byte[] table, final byte[] key) { - this(table, key, null, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + this(table, key, null, null, null, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -94,7 +96,7 @@ public DeleteRequest(final byte[] table, final byte[] key) { */ public DeleteRequest(final byte[] table, final byte[] key, final long timestamp) { - this(table, key, null, null, timestamp, RowLock.NO_LOCK); + this(table, key, null, null, null, timestamp, RowLock.NO_LOCK); } /** @@ -110,7 +112,7 @@ public DeleteRequest(final byte[] table, final byte[] key, final byte[] family) { this(table, key, new byte[][] { family }, null, - KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -125,7 +127,8 @@ public DeleteRequest(final byte[] table, public DeleteRequest(final byte[] table, final byte[] key, final byte[][] families) { - this(table, key, families, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + this(table, key, families, null, null, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -143,7 +146,7 @@ public DeleteRequest(final byte[] table, final byte[] family, final long timestamp) { this(table, key, new byte[][] { family }, null, - timestamp, RowLock.NO_LOCK); + null, timestamp, RowLock.NO_LOCK); } /** @@ -162,7 +165,7 @@ public DeleteRequest(final byte[] table, final byte[] qualifier) { this(table, key, new byte[][] { family }, qualifier == null ? null : new byte[][][] { { qualifier } }, - KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -184,7 +187,7 @@ public DeleteRequest(final byte[] table, final long timestamp) { this(table, key, new byte[][] { family }, qualifier == null ? null : new byte[][][] { { qualifier } }, - timestamp, RowLock.NO_LOCK); + null, timestamp, RowLock.NO_LOCK); } /** @@ -202,7 +205,28 @@ public DeleteRequest(final byte[] table, final byte[] family, final byte[][] qualifiers) { this(table, key, new byte[][] { family }, - new byte[][][] { qualifiers }, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + new byte[][][] { qualifiers }, null, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + } + + /** + * Constructor to delete a specific number of cells in a row. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param family The column family to edit in that table. + * @param qualifiers The column qualifiers to delete in that family. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.1 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[] family, + final byte[][] qualifiers, + final long[] timestamps) { + this(table, key, new byte[][] { family }, + new byte[][][] { qualifiers }, new long[][] {timestamps}, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -220,7 +244,52 @@ public DeleteRequest(final byte[] table, final byte[] key, final byte[][] families, final byte[][][] qualifiers) { - this(table, key, families, qualifiers, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + this(table, key, families, qualifiers, null, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + } + + /** + * Constructor to delete a specific number of cells of a set of column + * families in a row. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column family to edit in that table. + * @param qualifiers The column qualifiers to delete in that family. + * @param timestamps The corresponding timestamps to use in delete. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.1 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final long[][] timestamps) { + this(table, key, families, qualifiers, timestamps, + KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + } + + /** + * Constructor to delete a specific number of cells of a set of column + * families in a row. + * These byte arrays will NOT be copied. + * @param table The table to edit. + * @param key The key of the row to edit in that table. + * @param families The column family to edit in that table. + * @param qualifiers The column qualifiers to delete in that family. + * @param timestamps The corresponding timestamps to use in delete. + * @param timestamp The row timestamp to set on this edit. + * @throws IllegalArgumentException if any argument is malformed. + * @since 1.1 + */ + public DeleteRequest(final byte[] table, + final byte[] key, + final byte[][] families, + final byte[][][] qualifiers, + final long[][] timestamps, + final long timestamp) { + this(table, key, families, qualifiers, timestamps, + timestamp, RowLock.NO_LOCK); } /** @@ -240,7 +309,8 @@ public DeleteRequest(final byte[] table, final byte[][] qualifiers, final long timestamp) { this(table, key, new byte[][] { family }, - new byte[][][] { qualifiers }, timestamp, RowLock.NO_LOCK); + new byte[][][] { qualifiers }, null, + timestamp, RowLock.NO_LOCK); } /** @@ -259,7 +329,8 @@ public DeleteRequest(final byte[] table, final byte[][] families, final byte[][][] qualifiers, final long timestamp) { - this(table, key, families, qualifiers, timestamp, RowLock.NO_LOCK); + this(table, key, families, qualifiers, + null, timestamp, RowLock.NO_LOCK); } /** @@ -279,7 +350,7 @@ public DeleteRequest(final byte[] table, final RowLock lock) { this(table, key, new byte[][] { family }, qualifier == null ? null : new byte[][][] { { qualifier } }, - KeyValue.TIMESTAMP_NOW, lock.id()); + null, KeyValue.TIMESTAMP_NOW, lock.id()); } /** @@ -302,7 +373,7 @@ public DeleteRequest(final byte[] table, final RowLock lock) { this(table, key, new byte[][] { family }, qualifier == null ? null : new byte[][][] { { qualifier } }, - timestamp, lock.id()); + null, timestamp, lock.id()); } /** @@ -323,7 +394,8 @@ public DeleteRequest(final byte[] table, final byte[][] qualifiers, final RowLock lock) { this(table, key, new byte[][] { family }, - new byte[][][] { qualifiers }, KeyValue.TIMESTAMP_NOW, lock.id()); + new byte[][][] { qualifiers }, null, + KeyValue.TIMESTAMP_NOW, lock.id()); } /** @@ -343,7 +415,8 @@ public DeleteRequest(final byte[] table, final byte[][] families, final byte[][][] qualifiers, final RowLock lock) { - this(table, key, families, qualifiers, KeyValue.TIMESTAMP_NOW, lock.id()); + this(table, key, families, qualifiers, null, + KeyValue.TIMESTAMP_NOW, lock.id()); } /** @@ -365,8 +438,8 @@ public DeleteRequest(final byte[] table, final byte[][] qualifiers, final long timestamp, final RowLock lock) { - this(table, key, new byte[][] { family }, - new byte[][][] { qualifiers }, timestamp, lock.id()); + this(table, key, new byte[][] { family }, new byte[][][] { qualifiers }, + null, timestamp, lock.id()); } /** @@ -388,7 +461,7 @@ public DeleteRequest(final byte[] table, final byte[][][] qualifiers, final long timestamp, final RowLock lock) { - this(table, key, families, qualifiers, timestamp, lock.id()); + this(table, key, families, qualifiers, null, timestamp, lock.id()); } /** @@ -399,7 +472,7 @@ public DeleteRequest(final byte[] table, */ public DeleteRequest(final String table, final String key) { this(table.getBytes(), key.getBytes(), null, null, - KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -414,7 +487,7 @@ public DeleteRequest(final String table, final String key, final String family) { this(table.getBytes(), key.getBytes(), new byte[][] { family.getBytes() }, - null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + null, null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -432,7 +505,7 @@ public DeleteRequest(final String table, final String qualifier) { this(table.getBytes(), key.getBytes(), new byte[][] { family.getBytes() }, qualifier == null ? null : new byte[][][] { { qualifier.getBytes() } }, - KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); + null, KeyValue.TIMESTAMP_NOW, RowLock.NO_LOCK); } /** @@ -452,7 +525,7 @@ public DeleteRequest(final String table, final RowLock lock) { this(table.getBytes(), key.getBytes(), new byte[][] { family.getBytes() }, qualifier == null ? null : new byte[][][] { { qualifier.getBytes() } }, - KeyValue.TIMESTAMP_NOW, lock.id()); + null, KeyValue.TIMESTAMP_NOW, lock.id()); } /** @@ -465,7 +538,8 @@ public DeleteRequest(final String table, */ public DeleteRequest(final byte[] table, final KeyValue kv) { this(table, kv.key(), new byte[][] { kv.family() }, - new byte[][][] { { kv.qualifier() } }, kv.timestamp(), RowLock.NO_LOCK); + new byte[][][] { { kv.qualifier() } }, + null, kv.timestamp(), RowLock.NO_LOCK); } /** @@ -481,7 +555,8 @@ public DeleteRequest(final byte[] table, final KeyValue kv, final RowLock lock) { this(table, kv.key(), new byte[][] { kv.family() }, - new byte[][][] { { kv.qualifier() } }, kv.timestamp(), lock.id()); + new byte[][][] { { kv.qualifier() } }, + null, kv.timestamp(), lock.id()); } /** Private constructor. */ @@ -489,10 +564,19 @@ private DeleteRequest(final byte[] table, final byte[] key, final byte[][] families, final byte[][][] qualifiers, - final long timestamp, + final long[][] timestamps, + final long row_timestamp, final long lockid) { super(table, key, families == null ? WHOLE_ROW : families, - timestamp, lockid); + row_timestamp, lockid); + checkParams(families, qualifiers, timestamps); + this.qualifiers = qualifiers; + this.timestamps = timestamps; + } + + private void checkParams(final byte[][] families, + final byte[][][] qualifiers, + final long[][] timestamps) { if (families != null) { for (byte[] family : families) { KeyValue.checkFamily(family); @@ -510,17 +594,32 @@ private DeleteRequest(final byte[] table, + " not match that of the family." + " table=" + Bytes.pretty(table) + ", key=" + Bytes.pretty(key)); + } else if (timestamps != null && families.length != timestamps.length) { + throw new IllegalArgumentException(String.format( + "Mismatch in number of families(%d) and timestamps(%d) array size.", + families.length, timestamps.length)); } - for (int i = 0; i < families.length; i++) { - if (qualifiers[i] == null) { + + for (int idx = 0; idx < families.length; idx++) { + if (qualifiers[idx] == null) { continue; } - for (final byte[] qualifier : qualifiers[i]) { + if (timestamps != null) { + if (qualifiers[idx].length != timestamps[idx].length) { + throw new IllegalArgumentException("Found " + + qualifiers[idx].length + " qualifiers and " + + timestamps[idx].length + " timestamps for family " + + families[idx] + " at index " + idx + ". Should be equal."); + } + } + for (final byte[] qualifier : qualifiers[idx]) { KeyValue.checkQualifier(qualifier); } } + } else if (timestamps != null) { + throw new IllegalArgumentException("Timestamps have been specified " + + "without specifying qualifiers."); } - this.qualifiers = qualifiers; } /** @@ -610,21 +709,25 @@ private void serializePayload(final ChannelBuffer buf, int family_idx) { if (families == WHOLE_ROW) { return; // No payload when deleting whole rows. } - // Are we deleting a whole family at once or just a bunch of columns? final boolean has_qualifiers = (qualifiers != null && qualifiers[family_idx] != null); - final byte[][] family_qualifiers = !has_qualifiers - ? DELETE_FAMILY_MARKER - : qualifiers[family_idx]; + final boolean has_timestamps = + (timestamps != null && timestamps[family_idx] != null); + // Are we deleting a whole family at once or just a bunch of columns? final byte type = (!has_qualifiers ? KeyValue.DELETE_FAMILY : (at_timestamp_only - ? KeyValue.DELETE - : KeyValue.DELETE_COLUMN)); + ? KeyValue.DELETE + : KeyValue.DELETE_COLUMN)); + final byte[][] family_qualifiers = !has_qualifiers + ? DELETE_FAMILY_MARKER + : qualifiers[family_idx]; // Write the KeyValues - for (final byte[] qualifier : family_qualifiers) { - KeyValue.serialize(buf, type, timestamp, - key, families[family_idx], qualifier, null); + for (int i = 0; i < family_qualifiers.length; i++) { + final byte[] qualifier = family_qualifiers[i]; + KeyValue.serialize(buf, type, + (has_timestamps ? timestamps[family_idx][i] : timestamp), + key, families[family_idx], qualifier, null); } } @@ -716,20 +819,21 @@ MutationProto toMutationProto() { columns.setFamily(Bytes.wrap(family)); // ... for this family. if (qualifiers != null) { + final boolean has_timestamps = + (timestamps != null && timestamps[i] != null); final MutationProto.DeleteType type = (qualifiers[i] == null ? MutationProto.DeleteType.DELETE_FAMILY : (at_timestamp_only ? MutationProto.DeleteType.DELETE_ONE_VERSION : MutationProto.DeleteType.DELETE_MULTIPLE_VERSIONS)); - // Now add all the qualifiers to delete. if (qualifiers[i] != null) { for (int j = 0; j < qualifiers[i].length; j++) { final MutationProto.ColumnValue.QualifierValue column = MutationProto.ColumnValue.QualifierValue.newBuilder() .setQualifier(Bytes.wrap(qualifiers[i][j])) - .setTimestamp(timestamp) + .setTimestamp((has_timestamps ? timestamps[i][j] : timestamp)) .setDeleteType(type) .build(); columns.addQualifierValue(column); From b9d66c7e426f51729689c3ce86261ae2c327709a Mon Sep 17 00:00:00 2001 From: Aditya Kishore Date: Sun, 2 Mar 2014 13:09:29 -0800 Subject: [PATCH 4/4] MultiAction failure does not clone the HBaseException correctly --- src/MultiAction.java | 6 +++++- src/NoSuchColumnFamilyException.java | 4 ++++ src/NotServingRegionException.java | 4 ++++ src/RemoteException.java | 2 +- src/UnknownRowLockException.java | 4 ++++ src/UnknownScannerException.java | 4 ++++ src/VersionMismatchException.java | 4 ++++ 7 files changed, 26 insertions(+), 2 deletions(-) diff --git a/src/MultiAction.java b/src/MultiAction.java index 6b4c46c4..b23afba2 100644 --- a/src/MultiAction.java +++ b/src/MultiAction.java @@ -703,7 +703,7 @@ Response deserializeMultiResponse(final ChannelBuffer buf) { // cloning "e", so instead we just abuse its `make' factory method // slightly to duplicate it. This mangles the message a bit, but // that's mostly harmless. - resps[n + k] = e.make(e.getMessage(), batch.get(n + k)); + resps[n + k] = e.make(e, batch.get(n + k)); } } else { for (int k = 0; k < nrpcs_per_key; k++) { @@ -803,6 +803,10 @@ public HBaseRpc getFailedRpc() { @Override MultiPutFailedException make(final Object msg, final HBaseRpc rpc) { + if (msg == this || msg instanceof MultiPutFailedException) { + final MultiPutFailedException e = (MultiPutFailedException) msg; + return new MultiPutFailedException(e.getMessage(), rpc); + } return new MultiPutFailedException(msg.toString(), rpc); } diff --git a/src/NoSuchColumnFamilyException.java b/src/NoSuchColumnFamilyException.java index 988f5e09..9d1feae8 100644 --- a/src/NoSuchColumnFamilyException.java +++ b/src/NoSuchColumnFamilyException.java @@ -53,6 +53,10 @@ public HBaseRpc getFailedRpc() { @Override NoSuchColumnFamilyException make(final Object msg, final HBaseRpc rpc) { + if (msg == this || msg instanceof NoSuchColumnFamilyException) { + final NoSuchColumnFamilyException e = (NoSuchColumnFamilyException) msg; + return new NoSuchColumnFamilyException(e.getMessage(), rpc); + } return new NoSuchColumnFamilyException(msg.toString(), rpc); } diff --git a/src/NotServingRegionException.java b/src/NotServingRegionException.java index dde348ae..00847a27 100644 --- a/src/NotServingRegionException.java +++ b/src/NotServingRegionException.java @@ -65,6 +65,10 @@ public HBaseRpc getFailedRpc() { @Override NotServingRegionException make(final Object msg, final HBaseRpc rpc) { + if (msg == this || msg instanceof NotServingRegionException) { + final NotServingRegionException e = (NotServingRegionException) msg; + return new NotServingRegionException(e.getMessage(), rpc); + } return new NotServingRegionException(msg.toString(), rpc); } diff --git a/src/RemoteException.java b/src/RemoteException.java index 06d43c47..5160749f 100644 --- a/src/RemoteException.java +++ b/src/RemoteException.java @@ -53,7 +53,7 @@ public String getType() { @Override RemoteException make(final Object msg, final HBaseRpc rpc) { - if (msg instanceof RemoteException) { + if (msg == this || msg instanceof RemoteException) { final RemoteException e = (RemoteException) msg; return new RemoteException(e.getType(), e.getMessage()); } diff --git a/src/UnknownRowLockException.java b/src/UnknownRowLockException.java index a0acd4bc..aa0a569e 100644 --- a/src/UnknownRowLockException.java +++ b/src/UnknownRowLockException.java @@ -53,6 +53,10 @@ public HBaseRpc getFailedRpc() { @Override UnknownRowLockException make(final Object msg, final HBaseRpc rpc) { + if (msg == this || msg instanceof UnknownRowLockException) { + final UnknownRowLockException e = (UnknownRowLockException) msg; + return new UnknownRowLockException(e.getMessage(), rpc); + } return new UnknownRowLockException(msg.toString(), rpc); } diff --git a/src/UnknownScannerException.java b/src/UnknownScannerException.java index e5150492..a983ac98 100644 --- a/src/UnknownScannerException.java +++ b/src/UnknownScannerException.java @@ -53,6 +53,10 @@ public HBaseRpc getFailedRpc() { @Override UnknownScannerException make(final Object msg, final HBaseRpc rpc) { + if (msg == this || msg instanceof UnknownScannerException) { + final UnknownScannerException e = (UnknownScannerException) msg; + return new UnknownScannerException(e.getMessage(), rpc); + } return new UnknownScannerException(msg.toString(), rpc); } diff --git a/src/VersionMismatchException.java b/src/VersionMismatchException.java index eedca0c5..75d94a1d 100644 --- a/src/VersionMismatchException.java +++ b/src/VersionMismatchException.java @@ -55,6 +55,10 @@ public HBaseRpc getFailedRpc() { @Override VersionMismatchException make(final Object msg, final HBaseRpc rpc) { + if (msg == this || msg instanceof VersionMismatchException) { + final VersionMismatchException e = (VersionMismatchException) msg; + return new VersionMismatchException(e.getMessage(), rpc); + } return new VersionMismatchException(msg.toString(), rpc); }