diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java index 990a3e8f2d3c..320257ce1057 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TableIndexesTable.java @@ -27,6 +27,7 @@ import org.apache.paimon.disk.IOManager; import org.apache.paimon.fs.FileIO; import org.apache.paimon.index.DeletionVectorMeta; +import org.apache.paimon.index.GlobalIndexMeta; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMetaSerializer; import org.apache.paimon.manifest.IndexManifestEntry; @@ -84,9 +85,11 @@ public class TableIndexesTable implements ReadonlyTable { new DataField(4, "file_size", new BigIntType(false)), new DataField(5, "row_count", new BigIntType(false)), new DataField( - 6, - "dv_ranges", - new ArrayType(true, DeletionVectorMeta.SCHEMA)))); + 6, "dv_ranges", new ArrayType(true, DeletionVectorMeta.SCHEMA)), + new DataField(7, "row_range_start", new BigIntType(true)), + new DataField(8, "row_range_end", new BigIntType(true)), + new DataField(9, "index_field_id", new IntType(true)), + new DataField(10, "index_field_name", newStringType(true)))); private final FileStoreTable dataTable; @@ -201,10 +204,16 @@ public RecordReader createReader(Split split) { CastExecutors.resolveToString( dataTable.schema().logicalPartitionType()); + RowType logicalRowType = dataTable.schema().logicalRowType(); + Iterator rows = Iterators.transform( manifestFileMetas.iterator(), - indexManifestEntry -> toRow(indexManifestEntry, partitionCastExecutor)); + indexManifestEntry -> + toRow( + indexManifestEntry, + partitionCastExecutor, + logicalRowType)); if (readType != null) { rows = Iterators.transform( @@ -218,9 +227,18 @@ public RecordReader createReader(Split split) { private InternalRow toRow( IndexManifestEntry indexManifestEntry, - CastExecutor partitionCastExecutor) { + CastExecutor partitionCastExecutor, + RowType logicalRowType) { LinkedHashMap dvMetas = indexManifestEntry.indexFile().dvRanges(); + GlobalIndexMeta globalMeta = indexManifestEntry.indexFile().globalIndexMeta(); + String indexFieldName = null; + if (globalMeta != null) { + try { + indexFieldName = logicalRowType.getField(globalMeta.indexFieldId()).name(); + } catch (RuntimeException ignored) { + } + } return GenericRow.of( partitionCastExecutor.cast(indexManifestEntry.partition()), indexManifestEntry.bucket(), @@ -230,7 +248,11 @@ private InternalRow toRow( indexManifestEntry.indexFile().rowCount(), dvMetas == null ? null - : IndexFileMetaSerializer.dvMetasToRowArrayData(dvMetas.values())); + : IndexFileMetaSerializer.dvMetasToRowArrayData(dvMetas.values()), + globalMeta != null ? globalMeta.rowRangeStart() : null, + globalMeta != null ? globalMeta.rowRangeEnd() : null, + globalMeta != null ? globalMeta.indexFieldId() : null, + indexFieldName != null ? BinaryString.fromString(indexFieldName) : null); } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala index 211ef23a5eef..e428a28b1e71 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/LuminaVectorIndexTest.scala @@ -68,6 +68,56 @@ class LuminaVectorIndexTest extends PaimonSparkTestBase { } } + test("table_indexes system table - global index metadata") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, v ARRAY) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + |""".stripMargin) + + val values = (0 until 100) + .map( + i => s"($i, array(cast($i as float), cast(${i + 1} as float), cast(${i + 2} as float)))") + .mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + spark + .sql( + s"CALL sys.create_global_index(table => 'test.T', index_column => 'v', index_type => '$indexType', options => '$defaultOptions')") + .collect() + + // Query table_indexes system table + val indexRows = spark + .sql(""" + |SELECT index_type, row_count, row_range_start, row_range_end, + | index_field_id, index_field_name + |FROM `T$table_indexes` + |WHERE index_type = 'lumina-vector-ann' + |""".stripMargin) + .collect() + + assert(indexRows.nonEmpty) + val row = indexRows.head + assert(row.getAs[String]("index_type") == "lumina-vector-ann") + assert(row.getAs[Long]("row_count") == 100L) + assert(row.getAs[Long]("row_range_start") == 0L) + assert(row.getAs[Long]("row_range_end") == 99L) + assert(row.getAs[String]("index_field_name") == "v") + + // Verify max row id matches snapshot next_row_id - 1 + val nextRowId = spark + .sql("SELECT next_row_id FROM `T$snapshots` ORDER BY snapshot_id DESC LIMIT 1") + .collect() + .head + .getAs[Long]("next_row_id") + assert(row.getAs[Long]("row_range_end") == nextRowId - 1) + } + } + test("create lumina vector index - with partitioned table") { withTable("T") { spark.sql("""