Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 29 additions & 5 deletions src/iceberg/table_scan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include "iceberg/table_scan.h"

#include <cstring>
#include <iterator>

#include "iceberg/expression/binder.h"
#include "iceberg/expression/expression.h"
#include "iceberg/file_reader.h"
#include "iceberg/manifest/manifest_entry.h"
Expand All @@ -32,6 +34,7 @@
#include "iceberg/util/macros.h"
#include "iceberg/util/snapshot_util_internal.h"
#include "iceberg/util/timepoint.h"
#include "iceberg/util/type_util.h"

namespace iceberg {

Expand Down Expand Up @@ -414,11 +417,32 @@ TableScan::ResolveProjectedSchema() const {
}

if (!context_.selected_columns.empty()) {
/// TODO(gangwu): port Java BaseScan.lazyColumnProjection to collect field ids
/// from selected column names and bound references in the filter, and then create
/// projected schema based on the collected field ids.
return NotImplemented(
"Selecting columns by name to create projected schema is not yet implemented");
std::unordered_set<int32_t> required_field_ids;

// Include columns referenced by filter
if (context_.filter != nullptr) {
ICEBERG_ASSIGN_OR_RAISE(auto is_bound, IsBoundVisitor::IsBound(context_.filter));
if (is_bound) {
ICEBERG_ASSIGN_OR_RAISE(required_field_ids,
ReferenceVisitor::GetReferencedFieldIds(context_.filter));
} else {
ICEBERG_ASSIGN_OR_RAISE(auto filter, Binder::Bind(*schema_, context_.filter,
context_.case_sensitive));
ICEBERG_ASSIGN_OR_RAISE(required_field_ids,
ReferenceVisitor::GetReferencedFieldIds(filter));
}
}

// Include columns selected by option
ICEBERG_ASSIGN_OR_RAISE(auto selected, schema_->Select(context_.selected_columns,
context_.case_sensitive));
ICEBERG_ASSIGN_OR_RAISE(
auto selected_field_ids,
GetProjectedIdsVisitor::GetProjectedIds(*selected, /*include_struct_ids=*/true));
required_field_ids.insert(std::make_move_iterator(selected_field_ids.begin()),
std::make_move_iterator(selected_field_ids.end()));

ICEBERG_ASSIGN_OR_RAISE(projected_schema_, schema_->Project(required_field_ids));
} else if (context_.projected_schema != nullptr) {
projected_schema_ = context_.projected_schema;
} else {
Expand Down
103 changes: 103 additions & 0 deletions src/iceberg/test/table_scan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,109 @@ TEST_P(TableScanTest, PlanFilesWithDeleteFiles) {
}
}

TEST_P(TableScanTest, SchemaWithSelectedColumnsAndFilter) {
auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
SchemaField::MakeRequired(/*field_id=*/1, "id", int32()),
SchemaField::MakeRequired(/*field_id=*/2, "data", string()),
SchemaField::MakeRequired(/*field_id=*/3, "value", int64())});
auto timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
auto metadata = std::make_shared<TableMetadata>(TableMetadata{
.format_version = 2,
.table_uuid = "test-table-uuid",
.location = "/tmp/table",
.last_sequence_number = 1L,
.last_updated_ms = timestamp_ms,
.last_column_id = 3,
.schemas = {schema},
.current_schema_id = schema->schema_id(),
.partition_specs = {unpartitioned_spec_},
.default_spec_id = unpartitioned_spec_->spec_id(),
.last_partition_id = 1000,
.current_snapshot_id = 1000L,
.snapshots = {std::make_shared<Snapshot>(Snapshot{
.snapshot_id = 1000L,
.parent_snapshot_id = std::nullopt,
.sequence_number = 1L,
.timestamp_ms = timestamp_ms,
.manifest_list = "/tmp/metadata/snap-1000-1-manifest-list.avro",
.schema_id = schema->schema_id(),
})},
.snapshot_log = {SnapshotLogEntry{.timestamp_ms = timestamp_ms,
.snapshot_id = 1000L}},
.default_sort_order_id = 0,
.refs = {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
.snapshot_id = 1000L,
.retention = SnapshotRef::Branch{},
})}},
});

// Select "data" column, filter on "id" column
{
ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, file_io_));
builder->Select({"data"}).Filter(Expressions::Equal("id", Literal::Int(42)));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema());

ASSERT_EQ(projected_schema->fields().size(), 2);

ICEBERG_UNWRAP_OR_FAIL(auto id_field, projected_schema->FindFieldByName("id"));
EXPECT_TRUE(id_field.has_value());
EXPECT_EQ(id_field->get().field_id(), 1);

ICEBERG_UNWRAP_OR_FAIL(auto data_field, projected_schema->FindFieldByName("data"));
EXPECT_TRUE(data_field.has_value());
EXPECT_EQ(data_field->get().field_id(), 2);
}

// Select "id" and "value", filter on "data"
{
ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, file_io_));
builder->Select({"id", "value"})
.Filter(Expressions::Equal("data", Literal::String("test")));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema());

ASSERT_EQ(projected_schema->fields().size(), 3);

ICEBERG_UNWRAP_OR_FAIL(auto id_field, projected_schema->FindFieldByName("id"));
EXPECT_TRUE(id_field.has_value());

ICEBERG_UNWRAP_OR_FAIL(auto data_field, projected_schema->FindFieldByName("data"));
EXPECT_TRUE(data_field.has_value());

ICEBERG_UNWRAP_OR_FAIL(auto value_field, projected_schema->FindFieldByName("value"));
EXPECT_TRUE(value_field.has_value());
}

// Select "id", filter on "id" - should only have "id" once
{
ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, file_io_));
builder->Select({"id"}).Filter(Expressions::Equal("id", Literal::Int(42)));
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema());

ASSERT_EQ(projected_schema->fields().size(), 1);

ICEBERG_UNWRAP_OR_FAIL(auto id_field, projected_schema->FindFieldByName("id"));
EXPECT_TRUE(id_field.has_value());
EXPECT_EQ(id_field->get().field_id(), 1);
}

// Select columns without filter
{
ICEBERG_UNWRAP_OR_FAIL(auto builder, TableScanBuilder::Make(metadata, file_io_));
builder->Select({"data"});
ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
ICEBERG_UNWRAP_OR_FAIL(auto projected_schema, scan->schema());

ASSERT_EQ(projected_schema->fields().size(), 1);

ICEBERG_UNWRAP_OR_FAIL(auto data_field, projected_schema->FindFieldByName("data"));
EXPECT_TRUE(data_field.has_value());
EXPECT_EQ(data_field->get().field_id(), 2);
}
}

INSTANTIATE_TEST_SUITE_P(TableScanVersions, TableScanTest, testing::Values(1, 2, 3));

} // namespace iceberg
8 changes: 8 additions & 0 deletions src/iceberg/util/type_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "iceberg/schema.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/formatter_internal.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/string_util.h"
#include "iceberg/util/visit_type.h"

Expand Down Expand Up @@ -300,6 +301,13 @@ Status GetProjectedIdsVisitor::VisitPrimitive(const PrimitiveType& type) { retur

std::unordered_set<int32_t> GetProjectedIdsVisitor::Finish() const { return ids_; }

Result<std::unordered_set<int32_t>> GetProjectedIdsVisitor::GetProjectedIds(
const Type& type, bool include_struct_ids) {
GetProjectedIdsVisitor visitor(include_struct_ids);
ICEBERG_RETURN_UNEXPECTED(visitor.Visit(type));
return visitor.Finish();
}

std::unordered_map<int32_t, int32_t> IndexParents(const StructType& root_struct) {
std::unordered_map<int32_t, int32_t> id_to_parent;
std::stack<int32_t> parent_id_stack;
Expand Down
3 changes: 3 additions & 0 deletions src/iceberg/util/type_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ class GetProjectedIdsVisitor {
Status VisitPrimitive(const PrimitiveType& type);
std::unordered_set<int32_t> Finish() const;

static Result<std::unordered_set<int32_t>> GetProjectedIds(
const Type& type, bool include_struct_ids = false);

private:
const bool include_struct_ids_;
std::unordered_set<int32_t> ids_;
Expand Down
Loading