From e9322ad0319a8a981696299e01347c751fa07b2b Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Thu, 19 Mar 2026 12:18:22 -0400 Subject: [PATCH 1/4] impl(bigtable): update DataConnectionImpl to use StubManager --- .../bigtable/internal/data_connection_impl.cc | 132 ++++++++++++------ .../bigtable/internal/data_connection_impl.h | 15 +- 2 files changed, 104 insertions(+), 43 deletions(-) diff --git a/google/cloud/bigtable/internal/data_connection_impl.cc b/google/cloud/bigtable/internal/data_connection_impl.cc index 87aeeadd0f7c4..067e310bb1783 100644 --- a/google/cloud/bigtable/internal/data_connection_impl.cc +++ b/google/cloud/bigtable/internal/data_connection_impl.cc @@ -213,10 +213,10 @@ bigtable::Row TransformReadModifyWriteRowResponse( DataConnectionImpl::DataConnectionImpl( std::unique_ptr background, - std::shared_ptr stub, + std::unique_ptr stub_manager, std::shared_ptr limiter, Options options) : background_(std::move(background)), - stub_(std::move(stub)), + stub_manager_(std::move(stub_manager)), limiter_(std::move(limiter)), options_(internal::MergeOptions(std::move(options), DataConnection::options())) { @@ -244,15 +244,37 @@ DataConnectionImpl::DataConnectionImpl( DataConnectionImpl::DataConnectionImpl( std::unique_ptr background, std::shared_ptr stub, + std::shared_ptr limiter, Options options) + : DataConnectionImpl(std::move(background), + std::make_unique(std::move(stub)), + std::move(limiter), std::move(options)) {} + +DataConnectionImpl::DataConnectionImpl( + std::unique_ptr background, + std::unique_ptr stub_manager, std::unique_ptr operation_context_factory, std::shared_ptr limiter, Options options) : background_(std::move(background)), - stub_(std::move(stub)), + stub_manager_(std::move(stub_manager)), operation_context_factory_(std::move(operation_context_factory)), limiter_(std::move(limiter)), options_(internal::MergeOptions(std::move(options), DataConnection::options())) {} +DataConnectionImpl::DataConnectionImpl( + std::unique_ptr background, + std::shared_ptr stub, + std::unique_ptr operation_context_factory, + std::shared_ptr limiter, Options options) + : DataConnectionImpl(std::move(background), + std::make_unique(std::move(stub)), + std::move(operation_context_factory), + std::move(limiter), std::move(options)) {} + +absl::string_view InstanceNameFromTableName(absl::string_view table_name) { + return table_name.substr(0, table_name.rfind("/tables")); +} + Status DataConnectionImpl::Apply(std::string const& table_name, bigtable::SingleRowMutation mut) { auto current = google::cloud::internal::SaveCurrentOptions(); @@ -270,14 +292,16 @@ Status DataConnectionImpl::Apply(std::string const& table_name, auto operation_context = operation_context_factory_->MutateRow( table_name, app_profile_id(*current)); + + auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name)); auto sor = google::cloud::internal::RetryLoop( retry_policy(*current), backoff_policy(*current), is_idempotent ? Idempotency::kIdempotent : Idempotency::kNonIdempotent, - [this, &operation_context]( + [stub, &operation_context]( grpc::ClientContext& context, Options const& options, google::bigtable::v2::MutateRowRequest const& request) { operation_context->PreCall(context); - auto s = stub_->MutateRow(context, options, request); + auto s = stub->MutateRow(context, options, request); operation_context->PostCall(context, s.status()); return s; }, @@ -311,7 +335,9 @@ future DataConnectionImpl::AsyncApply(std::string const& table_name, is_idempotent ? Idempotency::kIdempotent : Idempotency::kNonIdempotent, background_->cq(), - [stub = stub_, operation_context]( + [stub = + stub_manager_->GetStub(InstanceNameFromTableName(table_name)), + operation_context]( CompletionQueue& cq, std::shared_ptr context, google::cloud::internal::ImmutableOptions options, @@ -350,8 +376,9 @@ std::vector DataConnectionImpl::BulkApply( std::unique_ptr retry; std::unique_ptr backoff; Status status; + auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name)); while (true) { - status = mutator.MakeOneRequest(*stub_, *limiter_, *current); + status = mutator.MakeOneRequest(*stub, *limiter_, *current); if (!mutator.HasPendingMutations()) break; if (!retry) retry = retry_policy(*current); if (!backoff) backoff = backoff_policy(*current); @@ -372,10 +399,12 @@ DataConnectionImpl::AsyncBulkApply(std::string const& table_name, auto operation_context = operation_context_factory_->MutateRows( table_name, app_profile_id(*current)); return AsyncBulkApplier::Create( - background_->cq(), stub_, limiter_, retry_policy(*current), - backoff_policy(*current), enable_server_retries(*current), - *idempotency_policy(*current), app_profile_id(*current), table_name, - std::move(mut), std::move(operation_context)); + background_->cq(), + stub_manager_->GetStub(InstanceNameFromTableName(table_name)), limiter_, + retry_policy(*current), backoff_policy(*current), + enable_server_retries(*current), *idempotency_policy(*current), + app_profile_id(*current), table_name, std::move(mut), + std::move(operation_context)); } bigtable::RowReader DataConnectionImpl::ReadRowsFull( @@ -383,7 +412,9 @@ bigtable::RowReader DataConnectionImpl::ReadRowsFull( auto current = google::cloud::internal::SaveCurrentOptions(); auto operation_context = operation_context_factory_->ReadRows( params.table_name, params.app_profile_id); - return ReadRowsHelper(stub_, current, std::move(params), + auto stub = + stub_manager_->GetStub(InstanceNameFromTableName(params.table_name)); + return ReadRowsHelper(stub, current, std::move(params), std::move(operation_context)); } @@ -398,8 +429,10 @@ StatusOr> DataConnectionImpl::ReadRow( // OperationContextFactory::ReadRow to create the operation_context. auto operation_context = operation_context_factory_->ReadRow(table_name, app_profile_id(*current)); + auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name)); + auto reader = - ReadRowsHelper(stub_, current, + ReadRowsHelper(stub, current, bigtable::ReadRowsParams{ table_name, app_profile_id(*current), std::move(row_set), rows_limit, std::move(filter)}, @@ -439,13 +472,14 @@ StatusOr DataConnectionImpl::CheckAndMutateRow( : Idempotency::kNonIdempotent; auto operation_context = operation_context_factory_->CheckAndMutateRow( table_name, app_profile_id(*current)); + auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name)); auto sor = google::cloud::internal::RetryLoop( retry_policy(*current), backoff_policy(*current), idempotency, - [this, &operation_context]( + [stub, &operation_context]( grpc::ClientContext& context, Options const& options, google::bigtable::v2::CheckAndMutateRowRequest const& request) { operation_context->PreCall(context); - auto s = stub_->CheckAndMutateRow(context, options, request); + auto s = stub->CheckAndMutateRow(context, options, request); operation_context->PostCall(context, s.status()); return s; }, @@ -483,10 +517,11 @@ DataConnectionImpl::AsyncCheckAndMutateRow( auto backoff = backoff_policy(*current); auto operation_context = operation_context_factory_->CheckAndMutateRow( table_name, app_profile_id(*current)); + auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name)); return google::cloud::internal::AsyncRetryLoop( std::move(retry), std::move(backoff), idempotency, background_->cq(), - [stub = stub_, operation_context]( + [stub, operation_context]( CompletionQueue& cq, std::shared_ptr context, google::cloud::internal::ImmutableOptions options, @@ -529,11 +564,12 @@ StatusOr> DataConnectionImpl::SampleRows( std::unique_ptr backoff; auto operation_context = operation_context_factory_->SampleRowKeys( table_name, app_profile_id(*current)); + auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name)); while (true) { auto context = std::make_shared(); internal::ConfigureContext(*context, internal::CurrentOptions()); operation_context->PreCall(*context); - auto stream = stub_->SampleRowKeys(context, Options{}, request); + auto stream = stub->SampleRowKeys(context, Options{}, request); absl::optional status; while (true) { @@ -574,9 +610,11 @@ DataConnectionImpl::AsyncSampleRows(std::string const& table_name) { auto operation_context = operation_context_factory_->SampleRowKeys( table_name, app_profile_id(*current)); return AsyncRowSampler::Create( - background_->cq(), stub_, retry_policy(*current), - backoff_policy(*current), enable_server_retries(*current), - app_profile_id(*current), table_name, std::move(operation_context)); + background_->cq(), + stub_manager_->GetStub(InstanceNameFromTableName(table_name)), + retry_policy(*current), backoff_policy(*current), + enable_server_retries(*current), app_profile_id(*current), table_name, + std::move(operation_context)); } StatusOr DataConnectionImpl::ReadModifyWriteRow( @@ -584,14 +622,16 @@ StatusOr DataConnectionImpl::ReadModifyWriteRow( auto current = google::cloud::internal::SaveCurrentOptions(); auto operation_context = operation_context_factory_->ReadModifyWriteRow( request.table_name(), app_profile_id(*current)); + auto stub = + stub_manager_->GetStub(InstanceNameFromTableName(request.table_name())); auto sor = google::cloud::internal::RetryLoop( retry_policy(*current), backoff_policy(*current), Idempotency::kNonIdempotent, - [this, operation_context]( + [stub, operation_context]( grpc::ClientContext& context, Options const& options, google::bigtable::v2::ReadModifyWriteRowRequest const& request) { operation_context->PreCall(context); - auto result = stub_->ReadModifyWriteRow(context, options, request); + auto result = stub->ReadModifyWriteRow(context, options, request); operation_context->PostCall(context, result.status()); return result; }, @@ -608,10 +648,12 @@ future> DataConnectionImpl::AsyncReadModifyWriteRow( request.table_name(), app_profile_id(*current)); auto retry = retry_policy(*current); auto backoff = backoff_policy(*current); + auto stub = + stub_manager_->GetStub(InstanceNameFromTableName(request.table_name())); return google::cloud::internal::AsyncRetryLoop( std::move(retry), std::move(backoff), Idempotency::kNonIdempotent, background_->cq(), - [stub = stub_, operation_context]( + [stub, operation_context]( CompletionQueue& cq, std::shared_ptr context, google::cloud::internal::ImmutableOptions options, @@ -648,11 +690,12 @@ void DataConnectionImpl::AsyncReadRowsHelper( std::shared_ptr operation_context) { auto reverse = internal::CurrentOptions().get(); bigtable_internal::AsyncRowReader::Create( - background_->cq(), stub_, app_profile_id(*current), table_name, - std::move(on_row), std::move(on_finish), std::move(row_set), rows_limit, - std::move(filter), reverse, retry_policy(*current), - backoff_policy(*current), enable_server_retries(*current), - std::move(operation_context)); + background_->cq(), + stub_manager_->GetStub(InstanceNameFromTableName(table_name)), + app_profile_id(*current), table_name, std::move(on_row), + std::move(on_finish), std::move(row_set), rows_limit, std::move(filter), + reverse, retry_policy(*current), backoff_policy(*current), + enable_server_retries(*current), std::move(operation_context)); } void DataConnectionImpl::AsyncReadRows( @@ -743,14 +786,15 @@ StatusOr DataConnectionImpl::PrepareQuery( } auto operation_context = operation_context_factory_->PrepareQuery( instance_full_name, app_profile_id(*current)); + auto stub = stub_manager_->GetStub(params.instance.FullName()); auto response = google::cloud::internal::RetryLoop( retry_policy(*current), backoff_policy(*current), Idempotency::kIdempotent, - [this, operation_context]( + [stub, operation_context]( grpc::ClientContext& context, Options const& options, google::bigtable::v2::PrepareQueryRequest const& request) { operation_context->PreCall(context); - auto const& result = stub_->PrepareQuery(context, options, request); + auto const& result = stub->PrepareQuery(context, options, request); operation_context->PostCall(context, result.status()); return result; }, @@ -779,17 +823,18 @@ StatusOr DataConnectionImpl::PrepareQuery( auto backoff = backoff_policy(*current); auto operation_context = operation_context_factory_->PrepareQuery( request.instance_name(), app_profile_id(*current)); + auto stub = stub_manager_->GetStub(request.instance_name()); return google::cloud::internal::AsyncRetryLoop( std::move(retry), std::move(backoff), Idempotency::kIdempotent, background_->cq(), - [this, operation_context]( + [stub, operation_context]( CompletionQueue& cq, std::shared_ptr context, google::cloud::internal::ImmutableOptions options, google::bigtable::v2::PrepareQueryRequest const& request) { operation_context->PreCall(*context); - auto f = stub_->AsyncPrepareQuery(cq, context, - std::move(options), request); + auto f = stub->AsyncPrepareQuery(cq, context, + std::move(options), request); return f.then( [operation_context, context = std::move(context)](auto f) { auto s = f.get(); @@ -827,17 +872,19 @@ future> DataConnectionImpl::AsyncPrepareQuery( auto operation_context = operation_context_factory_->PrepareQuery( instance_full_name, app_profile_id(*current)); auto const* func = __func__; + auto instance_name = params.instance.FullName(); return google::cloud::internal::AsyncRetryLoop( std::move(retry), std::move(backoff), Idempotency::kIdempotent, background_->cq(), - [this, operation_context]( + [this, instance_name, operation_context]( CompletionQueue& cq, std::shared_ptr context, google::cloud::internal::ImmutableOptions options, google::bigtable::v2::PrepareQueryRequest const& request) { operation_context->PreCall(*context); - auto f = stub_->AsyncPrepareQuery(cq, context, - std::move(options), request); + auto stub = stub_manager_->GetStub(instance_name); + auto f = stub->AsyncPrepareQuery(cq, context, std::move(options), + request); return f.then( [operation_context, context = std::move(context)](auto f) { auto s = f.get(); @@ -846,7 +893,7 @@ future> DataConnectionImpl::AsyncPrepareQuery( }); }, current, request, func) - .then([this, request, operation_context, current, + .then([this, instance_name, request, operation_context, current, params = std::move(params), func](future> future) -> StatusOr { @@ -869,7 +916,7 @@ future> DataConnectionImpl::AsyncPrepareQuery( "Column type cannot be empty", GCP_ERROR_INFO())); } } - auto refresh_fn = [this, request, func]() mutable { + auto refresh_fn = [this, instance_name, request, func]() mutable { auto current = google::cloud::internal::SaveCurrentOptions(); auto retry = query_plan_refresh_function_retry_policy(*current); auto backoff = backoff_policy(*current); @@ -878,14 +925,15 @@ future> DataConnectionImpl::AsyncPrepareQuery( return google::cloud::internal::AsyncRetryLoop( std::move(retry), std::move(backoff), Idempotency::kIdempotent, background_->cq(), - [this, operation_context]( + [this, instance_name, operation_context]( CompletionQueue& cq, std::shared_ptr context, google::cloud::internal::ImmutableOptions options, google::bigtable::v2::PrepareQueryRequest const& request) { operation_context->PreCall(*context); - auto f = stub_->AsyncPrepareQuery( + auto stub = stub_manager_->GetStub(instance_name); + auto f = stub->AsyncPrepareQuery( cq, context, std::move(options), request); return f.then([operation_context, context = std::move(context)](auto f) { @@ -1052,9 +1100,9 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery( auto const tracing_enabled = RpcStreamTracingEnabled(); auto const& tracing_options = RpcTracingOptions(); - + auto stub = stub_manager_->GetStub(params.bound_query.instance().FullName()); auto source_fn = - [stub = stub_, tracing_enabled, tracing_options]( + [stub, tracing_enabled, tracing_options]( google::bigtable::v2::ExecuteQueryRequest& request, google::bigtable::v2::ResultSetMetadata metadata, std::unique_ptr retry_policy_prototype, diff --git a/google/cloud/bigtable/internal/data_connection_impl.h b/google/cloud/bigtable/internal/data_connection_impl.h index c5f23fc979a37..dffaeb365c993 100644 --- a/google/cloud/bigtable/internal/data_connection_impl.h +++ b/google/cloud/bigtable/internal/data_connection_impl.h @@ -20,6 +20,7 @@ #include "google/cloud/bigtable/internal/mutate_rows_limiter.h" #include "google/cloud/bigtable/internal/operation_context_factory.h" #include "google/cloud/bigtable/internal/partial_result_set_reader.h" +#include "google/cloud/bigtable/internal/stub_manager.h" #include "google/cloud/bigtable/prepared_query.h" #include "google/cloud/bigtable/result_source_interface.h" #include "google/cloud/background_threads.h" @@ -40,6 +41,18 @@ class DataConnectionImpl : public bigtable::DataConnection { public: ~DataConnectionImpl() override = default; + DataConnectionImpl(std::unique_ptr background, + std::unique_ptr stub_manager, + std::shared_ptr limiter, + Options options); + + // This constructor is used for testing. + DataConnectionImpl( + std::unique_ptr background, + std::unique_ptr stub_manager, + std::unique_ptr operation_context_factory, + std::shared_ptr limiter, Options options); + DataConnectionImpl(std::unique_ptr background, std::shared_ptr stub, std::shared_ptr limiter, @@ -120,7 +133,7 @@ class DataConnectionImpl : public bigtable::DataConnection { std::shared_ptr operation_context); std::unique_ptr background_; - std::shared_ptr stub_; + std::unique_ptr stub_manager_; std::unique_ptr operation_context_factory_; std::shared_ptr limiter_; Options options_; From 2623aed07e40de2f455b4f920b33f01618e77f3c Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Thu, 19 Mar 2026 13:08:11 -0400 Subject: [PATCH 2/4] address review comments --- .../bigtable/internal/data_connection_impl.cc | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/google/cloud/bigtable/internal/data_connection_impl.cc b/google/cloud/bigtable/internal/data_connection_impl.cc index 067e310bb1783..36b012216ca25 100644 --- a/google/cloud/bigtable/internal/data_connection_impl.cc +++ b/google/cloud/bigtable/internal/data_connection_impl.cc @@ -189,6 +189,12 @@ ResultType MakeStatusOnlyResult(Status status) { std::make_unique(std::move(status))); } +std::string_view InstanceNameFromTableName(std::string_view table_name) { + auto pos = table_name.rfind("/tables"); + if (pos == std::string_view::npos) return {}; + return table_name.substr(0, pos); +} + } // namespace bigtable::Row TransformReadModifyWriteRowResponse( @@ -271,10 +277,6 @@ DataConnectionImpl::DataConnectionImpl( std::move(operation_context_factory), std::move(limiter), std::move(options)) {} -absl::string_view InstanceNameFromTableName(absl::string_view table_name) { - return table_name.substr(0, table_name.rfind("/tables")); -} - Status DataConnectionImpl::Apply(std::string const& table_name, bigtable::SingleRowMutation mut) { auto current = google::cloud::internal::SaveCurrentOptions(); @@ -873,16 +875,16 @@ future> DataConnectionImpl::AsyncPrepareQuery( instance_full_name, app_profile_id(*current)); auto const* func = __func__; auto instance_name = params.instance.FullName(); + auto stub = stub_manager_->GetStub(instance_name); return google::cloud::internal::AsyncRetryLoop( std::move(retry), std::move(backoff), Idempotency::kIdempotent, background_->cq(), - [this, instance_name, operation_context]( + [this, stub, instance_name, operation_context]( CompletionQueue& cq, std::shared_ptr context, google::cloud::internal::ImmutableOptions options, google::bigtable::v2::PrepareQueryRequest const& request) { operation_context->PreCall(*context); - auto stub = stub_manager_->GetStub(instance_name); auto f = stub->AsyncPrepareQuery(cq, context, std::move(options), request); return f.then( @@ -932,6 +934,7 @@ future> DataConnectionImpl::AsyncPrepareQuery( google::bigtable::v2::PrepareQueryRequest const& request) { operation_context->PreCall(*context); + // Get a new stub here to take advantage of the pool. auto stub = stub_manager_->GetStub(instance_name); auto f = stub->AsyncPrepareQuery( cq, context, std::move(options), request); From 2ceb8697984b7dd73db6be5cd80629eb9861f6e9 Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Thu, 19 Mar 2026 13:22:11 -0400 Subject: [PATCH 3/4] remove unused this capture --- google/cloud/bigtable/internal/data_connection_impl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/bigtable/internal/data_connection_impl.cc b/google/cloud/bigtable/internal/data_connection_impl.cc index 36b012216ca25..0fe943065d462 100644 --- a/google/cloud/bigtable/internal/data_connection_impl.cc +++ b/google/cloud/bigtable/internal/data_connection_impl.cc @@ -879,7 +879,7 @@ future> DataConnectionImpl::AsyncPrepareQuery( return google::cloud::internal::AsyncRetryLoop( std::move(retry), std::move(backoff), Idempotency::kIdempotent, background_->cq(), - [this, stub, instance_name, operation_context]( + [stub, instance_name, operation_context]( CompletionQueue& cq, std::shared_ptr context, google::cloud::internal::ImmutableOptions options, From a3b679f21eeb9b21aa18d8cb844a2043695cc2fb Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Thu, 19 Mar 2026 14:41:23 -0400 Subject: [PATCH 4/4] refresh stub before retry loop --- google/cloud/bigtable/internal/data_connection_impl.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google/cloud/bigtable/internal/data_connection_impl.cc b/google/cloud/bigtable/internal/data_connection_impl.cc index 0fe943065d462..9dd334aac532f 100644 --- a/google/cloud/bigtable/internal/data_connection_impl.cc +++ b/google/cloud/bigtable/internal/data_connection_impl.cc @@ -924,18 +924,18 @@ future> DataConnectionImpl::AsyncPrepareQuery( auto backoff = backoff_policy(*current); auto operation_context = operation_context_factory_->PrepareQuery( request.instance_name(), app_profile_id(*current)); + // Get a new stub here to take advantage of the pool. + auto stub = stub_manager_->GetStub(instance_name); return google::cloud::internal::AsyncRetryLoop( std::move(retry), std::move(backoff), Idempotency::kIdempotent, background_->cq(), - [this, instance_name, operation_context]( + [stub, instance_name, operation_context]( CompletionQueue& cq, std::shared_ptr context, google::cloud::internal::ImmutableOptions options, google::bigtable::v2::PrepareQueryRequest const& request) { operation_context->PreCall(*context); - // Get a new stub here to take advantage of the pool. - auto stub = stub_manager_->GetStub(instance_name); auto f = stub->AsyncPrepareQuery( cq, context, std::move(options), request); return f.then([operation_context,