From 50d72b4e10ac7fbb2a50acce73e76d231da062af Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 30 Jan 2026 11:41:28 +0800 Subject: [PATCH] feat: add support to select columns in table scan planning --- src/iceberg/table_scan.cc | 34 +++++++-- src/iceberg/test/table_scan_test.cc | 103 ++++++++++++++++++++++++++++ src/iceberg/util/type_util.cc | 8 +++ src/iceberg/util/type_util.h | 3 + 4 files changed, 143 insertions(+), 5 deletions(-) diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc index 4992b18d5..eeec262e9 100644 --- a/src/iceberg/table_scan.cc +++ b/src/iceberg/table_scan.cc @@ -20,7 +20,9 @@ #include "iceberg/table_scan.h" #include +#include +#include "iceberg/expression/binder.h" #include "iceberg/expression/expression.h" #include "iceberg/file_reader.h" #include "iceberg/manifest/manifest_entry.h" @@ -32,6 +34,7 @@ #include "iceberg/util/macros.h" #include "iceberg/util/snapshot_util_internal.h" #include "iceberg/util/timepoint.h" +#include "iceberg/util/type_util.h" namespace iceberg { @@ -414,11 +417,32 @@ TableScan::ResolveProjectedSchema() const { } if (!context_.selected_columns.empty()) { - /// TODO(gangwu): port Java BaseScan.lazyColumnProjection to collect field ids - /// from selected column names and bound references in the filter, and then create - /// projected schema based on the collected field ids. - return NotImplemented( - "Selecting columns by name to create projected schema is not yet implemented"); + std::unordered_set required_field_ids; + + // Include columns referenced by filter + if (context_.filter != nullptr) { + ICEBERG_ASSIGN_OR_RAISE(auto is_bound, IsBoundVisitor::IsBound(context_.filter)); + if (is_bound) { + ICEBERG_ASSIGN_OR_RAISE(required_field_ids, + ReferenceVisitor::GetReferencedFieldIds(context_.filter)); + } else { + ICEBERG_ASSIGN_OR_RAISE(auto filter, Binder::Bind(*schema_, context_.filter, + context_.case_sensitive)); + ICEBERG_ASSIGN_OR_RAISE(required_field_ids, + ReferenceVisitor::GetReferencedFieldIds(filter)); + } + } + + // Include columns selected by option + ICEBERG_ASSIGN_OR_RAISE(auto selected, schema_->Select(context_.selected_columns, + context_.case_sensitive)); + ICEBERG_ASSIGN_OR_RAISE( + auto selected_field_ids, + GetProjectedIdsVisitor::GetProjectedIds(*selected, /*include_struct_ids=*/true)); + required_field_ids.insert(std::make_move_iterator(selected_field_ids.begin()), + std::make_move_iterator(selected_field_ids.end())); + + ICEBERG_ASSIGN_OR_RAISE(projected_schema_, schema_->Project(required_field_ids)); } else if (context_.projected_schema != nullptr) { projected_schema_ = context_.projected_schema; } else { diff --git a/src/iceberg/test/table_scan_test.cc b/src/iceberg/test/table_scan_test.cc index b496606cf..82e7d73ee 100644 --- a/src/iceberg/test/table_scan_test.cc +++ b/src/iceberg/test/table_scan_test.cc @@ -666,6 +666,109 @@ TEST_P(TableScanTest, PlanFilesWithDeleteFiles) { } } +TEST_P(TableScanTest, SchemaWithSelectedColumnsAndFilter) { + auto schema = std::make_shared(std::vector{ + SchemaField::MakeRequired(/*field_id=*/1, "id", int32()), + SchemaField::MakeRequired(/*field_id=*/2, "data", string()), + SchemaField::MakeRequired(/*field_id=*/3, "value", int64())}); + auto timestamp_ms = TimePointMsFromUnixMs(1609459200000L); + auto metadata = std::make_shared(TableMetadata{ + .format_version = 2, + .table_uuid = "test-table-uuid", + .location = "/tmp/table", + .last_sequence_number = 1L, + .last_updated_ms = timestamp_ms, + .last_column_id = 3, + .schemas = {schema}, + .current_schema_id = schema->schema_id(), + .partition_specs = {unpartitioned_spec_}, + .default_spec_id = unpartitioned_spec_->spec_id(), + .last_partition_id = 1000, + .current_snapshot_id = 1000L, + .snapshots = {std::make_shared(Snapshot{ + .snapshot_id = 1000L, + .parent_snapshot_id = std::nullopt, + .sequence_number = 1L, + .timestamp_ms = timestamp_ms, + .manifest_list = "/tmp/metadata/snap-1000-1-manifest-list.avro", + .schema_id = schema->schema_id(), + })}, + .snapshot_log = {SnapshotLogEntry{.timestamp_ms = timestamp_ms, + .snapshot_id = 1000L}}, + .default_sort_order_id = 0, + .refs = {{"main", std::make_shared(SnapshotRef{ + .snapshot_id = 1000L, + .retention = SnapshotRef::Branch{}, + })}}, + }); + + // Select "data" column, filter on "id" column + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, file_io_)); + builder->Select({"data"}).Filter(Expressions::Equal("id", Literal::Int(42))); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema()); + + ASSERT_EQ(projected_schema->fields().size(), 2); + + ICEBERG_UNWRAP_OR_FAIL(auto id_field, projected_schema->FindFieldByName("id")); + EXPECT_TRUE(id_field.has_value()); + EXPECT_EQ(id_field->get().field_id(), 1); + + ICEBERG_UNWRAP_OR_FAIL(auto data_field, projected_schema->FindFieldByName("data")); + EXPECT_TRUE(data_field.has_value()); + EXPECT_EQ(data_field->get().field_id(), 2); + } + + // Select "id" and "value", filter on "data" + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, file_io_)); + builder->Select({"id", "value"}) + .Filter(Expressions::Equal("data", Literal::String("test"))); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema()); + + ASSERT_EQ(projected_schema->fields().size(), 3); + + ICEBERG_UNWRAP_OR_FAIL(auto id_field, projected_schema->FindFieldByName("id")); + EXPECT_TRUE(id_field.has_value()); + + ICEBERG_UNWRAP_OR_FAIL(auto data_field, projected_schema->FindFieldByName("data")); + EXPECT_TRUE(data_field.has_value()); + + ICEBERG_UNWRAP_OR_FAIL(auto value_field, projected_schema->FindFieldByName("value")); + EXPECT_TRUE(value_field.has_value()); + } + + // Select "id", filter on "id" - should only have "id" once + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, file_io_)); + builder->Select({"id"}).Filter(Expressions::Equal("id", Literal::Int(42))); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema()); + + ASSERT_EQ(projected_schema->fields().size(), 1); + + ICEBERG_UNWRAP_OR_FAIL(auto id_field, projected_schema->FindFieldByName("id")); + EXPECT_TRUE(id_field.has_value()); + EXPECT_EQ(id_field->get().field_id(), 1); + } + + // Select columns without filter + { + ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, file_io_)); + builder->Select({"data"}); + ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build()); + ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema()); + + ASSERT_EQ(projected_schema->fields().size(), 1); + + ICEBERG_UNWRAP_OR_FAIL(auto data_field, projected_schema->FindFieldByName("data")); + EXPECT_TRUE(data_field.has_value()); + EXPECT_EQ(data_field->get().field_id(), 2); + } +} + INSTANTIATE_TEST_SUITE_P(TableScanVersions, TableScanTest, testing::Values(1, 2, 3)); } // namespace iceberg diff --git a/src/iceberg/util/type_util.cc b/src/iceberg/util/type_util.cc index b18d09e45..c6b9bb3ed 100644 --- a/src/iceberg/util/type_util.cc +++ b/src/iceberg/util/type_util.cc @@ -25,6 +25,7 @@ #include "iceberg/schema.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/formatter_internal.h" +#include "iceberg/util/macros.h" #include "iceberg/util/string_util.h" #include "iceberg/util/visit_type.h" @@ -300,6 +301,13 @@ Status GetProjectedIdsVisitor::VisitPrimitive(const PrimitiveType& type) { retur std::unordered_set GetProjectedIdsVisitor::Finish() const { return ids_; } +Result> GetProjectedIdsVisitor::GetProjectedIds( + const Type& type, bool include_struct_ids) { + GetProjectedIdsVisitor visitor(include_struct_ids); + ICEBERG_RETURN_UNEXPECTED(visitor.Visit(type)); + return visitor.Finish(); +} + std::unordered_map IndexParents(const StructType& root_struct) { std::unordered_map id_to_parent; std::stack parent_id_stack; diff --git a/src/iceberg/util/type_util.h b/src/iceberg/util/type_util.h index 0f71494b6..ceb5e62ec 100644 --- a/src/iceberg/util/type_util.h +++ b/src/iceberg/util/type_util.h @@ -132,6 +132,9 @@ class GetProjectedIdsVisitor { Status VisitPrimitive(const PrimitiveType& type); std::unordered_set Finish() const; + static Result> GetProjectedIds( + const Type& type, bool include_struct_ids = false); + private: const bool include_struct_ids_; std::unordered_set ids_;