Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMetaSerializer;
import org.apache.paimon.manifest.IndexManifestEntry;
Expand Down Expand Up @@ -84,9 +85,11 @@ public class TableIndexesTable implements ReadonlyTable {
new DataField(4, "file_size", new BigIntType(false)),
new DataField(5, "row_count", new BigIntType(false)),
new DataField(
6,
"dv_ranges",
new ArrayType(true, DeletionVectorMeta.SCHEMA))));
6, "dv_ranges", new ArrayType(true, DeletionVectorMeta.SCHEMA)),
new DataField(7, "row_range_start", new BigIntType(true)),
new DataField(8, "row_range_end", new BigIntType(true)),
new DataField(9, "index_field_id", new IntType(true)),
new DataField(10, "index_field_name", newStringType(true))));

private final FileStoreTable dataTable;

Expand Down Expand Up @@ -201,10 +204,16 @@ public RecordReader<InternalRow> createReader(Split split) {
CastExecutors.resolveToString(
dataTable.schema().logicalPartitionType());

RowType logicalRowType = dataTable.schema().logicalRowType();

Iterator<InternalRow> rows =
Iterators.transform(
manifestFileMetas.iterator(),
indexManifestEntry -> toRow(indexManifestEntry, partitionCastExecutor));
indexManifestEntry ->
toRow(
indexManifestEntry,
partitionCastExecutor,
logicalRowType));
if (readType != null) {
rows =
Iterators.transform(
Expand All @@ -218,9 +227,18 @@ public RecordReader<InternalRow> createReader(Split split) {

private InternalRow toRow(
IndexManifestEntry indexManifestEntry,
CastExecutor<InternalRow, BinaryString> partitionCastExecutor) {
CastExecutor<InternalRow, BinaryString> partitionCastExecutor,
RowType logicalRowType) {
LinkedHashMap<String, DeletionVectorMeta> dvMetas =
indexManifestEntry.indexFile().dvRanges();
GlobalIndexMeta globalMeta = indexManifestEntry.indexFile().globalIndexMeta();
String indexFieldName = null;
if (globalMeta != null) {
try {
indexFieldName = logicalRowType.getField(globalMeta.indexFieldId()).name();
} catch (RuntimeException ignored) {
}
}
return GenericRow.of(
partitionCastExecutor.cast(indexManifestEntry.partition()),
indexManifestEntry.bucket(),
Expand All @@ -230,7 +248,11 @@ private InternalRow toRow(
indexManifestEntry.indexFile().rowCount(),
dvMetas == null
? null
: IndexFileMetaSerializer.dvMetasToRowArrayData(dvMetas.values()));
: IndexFileMetaSerializer.dvMetasToRowArrayData(dvMetas.values()),
globalMeta != null ? globalMeta.rowRangeStart() : null,
globalMeta != null ? globalMeta.rowRangeEnd() : null,
globalMeta != null ? globalMeta.indexFieldId() : null,
indexFieldName != null ? BinaryString.fromString(indexFieldName) : null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,56 @@ class LuminaVectorIndexTest extends PaimonSparkTestBase {
}
}

test("table_indexes system table - global index metadata") {
withTable("T") {
spark.sql("""
|CREATE TABLE T (id INT, v ARRAY<FLOAT>)
|TBLPROPERTIES (
| 'bucket' = '-1',
| 'global-index.row-count-per-shard' = '10000',
| 'row-tracking.enabled' = 'true',
| 'data-evolution.enabled' = 'true')
|""".stripMargin)

val values = (0 until 100)
.map(
i => s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i + 2} as float)))")
.mkString(",")
spark.sql(s"INSERT INTO T VALUES $values")

spark
.sql(
s"CALL sys.create_global_index(table => 'test.T', index_column => 'v', index_type => '$indexType', options => '$defaultOptions')")
.collect()

// Query table_indexes system table
val indexRows = spark
.sql("""
|SELECT index_type, row_count, row_range_start, row_range_end,
| index_field_id, index_field_name
|FROM `T$table_indexes`
|WHERE index_type = 'lumina-vector-ann'
|""".stripMargin)
.collect()

assert(indexRows.nonEmpty)
val row = indexRows.head
assert(row.getAs[String]("index_type") == "lumina-vector-ann")
assert(row.getAs[Long]("row_count") == 100L)
assert(row.getAs[Long]("row_range_start") == 0L)
assert(row.getAs[Long]("row_range_end") == 99L)
assert(row.getAs[String]("index_field_name") == "v")

// Verify max row id matches snapshot next_row_id - 1
val nextRowId = spark
.sql("SELECT next_row_id FROM `T$snapshots` ORDER BY snapshot_id DESC LIMIT 1")
.collect()
.head
.getAs[Long]("next_row_id")
assert(row.getAs[Long]("row_range_end") == nextRowId - 1)
}
}

test("create lumina vector index - with partitioned table") {
withTable("T") {
spark.sql("""
Expand Down