From 4acd7385284409cbf6649fd0ebeac001b59ea7dd Mon Sep 17 00:00:00 2001 From: yantian Date: Fri, 10 Apr 2026 18:12:46 +0800 Subject: [PATCH 1/3] [index] system table index add field --- .../paimon/table/system/TableIndexesTable.java | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java index 990a3e8f2d3c..1a87649b5088 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java @@ -27,6 +27,7 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.FileIO; import org.apache.paimon.index.DeletionVectorMeta; +import org.apache.paimon.index.GlobalIndexMeta; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMetaSerializer; import org.apache.paimon.manifest.IndexManifestEntry; @@ -84,9 +85,10 @@ public class TableIndexesTable implements ReadonlyTable { new DataField(4, "file_size", new BigIntType(false)), new DataField(5, "row_count", new BigIntType(false)), new DataField( - 6, - "dv_ranges", - new ArrayType(true, DeletionVectorMeta.SCHEMA)))); + 6, "dv_ranges", new ArrayType(true, DeletionVectorMeta.SCHEMA)), + new DataField(7, "row_range_start", new BigIntType(true)), + new DataField(8, "row_range_end", new BigIntType(true)), + new DataField(9, "index_field_id", new IntType(true)))); private final FileStoreTable dataTable; @@ -221,6 +223,7 @@ private InternalRow toRow( CastExecutor partitionCastExecutor) { LinkedHashMap dvMetas = indexManifestEntry.indexFile().dvRanges(); + GlobalIndexMeta globalMeta = indexManifestEntry.indexFile().globalIndexMeta(); return GenericRow.of( partitionCastExecutor.cast(indexManifestEntry.partition()), indexManifestEntry.bucket(), @@ -230,7 +233,10 @@ private InternalRow toRow( indexManifestEntry.indexFile().rowCount(), dvMetas == null ? null - : IndexFileMetaSerializer.dvMetasToRowArrayData(dvMetas.values())); + : IndexFileMetaSerializer.dvMetasToRowArrayData(dvMetas.values()), + globalMeta != null ? globalMeta.rowRangeStart() : null, + globalMeta != null ? globalMeta.rowRangeEnd() : null, + globalMeta != null ? globalMeta.indexFieldId() : null); } } From e2893b0d3afd15e6e32644cd98b1d0feed32ba9f Mon Sep 17 00:00:00 2001 From: yantian Date: Fri, 10 Apr 2026 19:59:39 +0800 Subject: [PATCH 2/3] fix --- .../table/system/TableIndexesTable.java | 24 +++++++-- .../spark/sql/LuminaVectorIndexTest.scala | 50 +++++++++++++++++++ 2 files changed, 70 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java index 1a87649b5088..320257ce1057 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java @@ -88,7 +88,8 @@ public class TableIndexesTable implements ReadonlyTable { 6, "dv_ranges", new ArrayType(true, DeletionVectorMeta.SCHEMA)), new DataField(7, "row_range_start", new BigIntType(true)), new DataField(8, "row_range_end", new BigIntType(true)), - new DataField(9, "index_field_id", new IntType(true)))); + new DataField(9, "index_field_id", new IntType(true)), + new DataField(10, "index_field_name", newStringType(true)))); private final FileStoreTable dataTable; @@ -203,10 +204,16 @@ public RecordReader createReader(Split split) { CastExecutors.resolveToString( dataTable.schema().logicalPartitionType()); + RowType logicalRowType = dataTable.schema().logicalRowType(); + Iterator rows = Iterators.transform( manifestFileMetas.iterator(), - indexManifestEntry -> toRow(indexManifestEntry, partitionCastExecutor)); + indexManifestEntry -> + toRow( + indexManifestEntry, + partitionCastExecutor, + logicalRowType)); if (readType != null) { rows = Iterators.transform( @@ -220,10 +227,18 @@ public RecordReader createReader(Split split) { private InternalRow toRow( IndexManifestEntry indexManifestEntry, - CastExecutor partitionCastExecutor) { + CastExecutor partitionCastExecutor, + RowType logicalRowType) { LinkedHashMap dvMetas = indexManifestEntry.indexFile().dvRanges(); GlobalIndexMeta globalMeta = indexManifestEntry.indexFile().globalIndexMeta(); + String indexFieldName = null; + if (globalMeta != null) { + try { + indexFieldName = logicalRowType.getField(globalMeta.indexFieldId()).name(); + } catch (RuntimeException ignored) { + } + } return GenericRow.of( partitionCastExecutor.cast(indexManifestEntry.partition()), indexManifestEntry.bucket(), @@ -236,7 +251,8 @@ private InternalRow toRow( : IndexFileMetaSerializer.dvMetasToRowArrayData(dvMetas.values()), globalMeta != null ? globalMeta.rowRangeStart() : null, globalMeta != null ? globalMeta.rowRangeEnd() : null, - globalMeta != null ? globalMeta.indexFieldId() : null); + globalMeta != null ? globalMeta.indexFieldId() : null, + indexFieldName != null ? BinaryString.fromString(indexFieldName) : null); } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala index 211ef23a5eef..ccb2c9d32b5d 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala @@ -68,6 +68,56 @@ class LuminaVectorIndexTest extends PaimonSparkTestBase { } } + test("table_indexes system table - global index metadata") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, v ARRAY) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + |""".stripMargin) + + val values = (0 until 100) + .map( + i => s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i + 2} as float)))") + .mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + spark + .sql( + s"CALL sys.create_global_index(table => 'test.T', index_column => 'v', index_type => '$indexType', options => '$defaultOptions')") + .collect() + + // Query table_indexes system table + val indexRows = spark + .sql(""" + |SELECT index_type, row_count, row_range_start, row_range_end, + | index_field_id, index_field_name + |FROM T$table_indexes + |WHERE index_type = 'lumina-vector-ann' + |""".stripMargin) + .collect() + + assert(indexRows.nonEmpty) + val row = indexRows.head + assert(row.getAs[String]("index_type") == "lumina-vector-ann") + assert(row.getAs[Long]("row_count") == 100L) + assert(row.getAs[Long]("row_range_start") == 0L) + assert(row.getAs[Long]("row_range_end") == 99L) + assert(row.getAs[String]("index_field_name") == "v") + + // Verify max row id matches snapshot next_row_id - 1 + val nextRowId = spark + .sql("SELECT next_row_id FROM T$snapshots ORDER BY snapshot_id DESC LIMIT 1") + .collect() + .head + .getAs[Long]("next_row_id") + assert(row.getAs[Long]("row_range_end") == nextRowId - 1) + } + } + test("create lumina vector index - with partitioned table") { withTable("T") { spark.sql(""" From 9badff53bf09a8012b866f5e2461e05bcef7ea20 Mon Sep 17 00:00:00 2001 From: yantian Date: Fri, 10 Apr 2026 20:54:22 +0800 Subject: [PATCH 3/3] fix --- .../org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala index ccb2c9d32b5d..e428a28b1e71 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala @@ -95,7 +95,7 @@ class LuminaVectorIndexTest extends PaimonSparkTestBase { .sql(""" |SELECT index_type, row_count, row_range_start, row_range_end, | index_field_id, index_field_name - |FROM T$table_indexes + |FROM `T$table_indexes` |WHERE index_type = 'lumina-vector-ann' |""".stripMargin) .collect() @@ -110,7 +110,7 @@ class LuminaVectorIndexTest extends PaimonSparkTestBase { // Verify max row id matches snapshot next_row_id - 1 val nextRowId = spark - .sql("SELECT next_row_id FROM T$snapshots ORDER BY snapshot_id DESC LIMIT 1") + .sql("SELECT next_row_id FROM `T$snapshots` ORDER BY snapshot_id DESC LIMIT 1") .collect() .head .getAs[Long]("next_row_id")