Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 93 additions & 42 deletions google/cloud/bigtable/internal/data_connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,12 @@ ResultType MakeStatusOnlyResult(Status status) {
std::make_unique<StatusOnlyResultSetSource>(std::move(status)));
}

std::string_view InstanceNameFromTableName(std::string_view table_name) {
auto pos = table_name.rfind("/tables");
if (pos == std::string_view::npos) return {};
return table_name.substr(0, pos);
}

} // namespace

bigtable::Row TransformReadModifyWriteRowResponse(
Expand All @@ -213,10 +219,10 @@ bigtable::Row TransformReadModifyWriteRowResponse(

DataConnectionImpl::DataConnectionImpl(
std::unique_ptr<BackgroundThreads> background,
std::shared_ptr<BigtableStub> stub,
std::unique_ptr<StubManager> stub_manager,
std::shared_ptr<MutateRowsLimiter> limiter, Options options)
: background_(std::move(background)),
stub_(std::move(stub)),
stub_manager_(std::move(stub_manager)),
limiter_(std::move(limiter)),
options_(internal::MergeOptions(std::move(options),
DataConnection::options())) {
Expand Down Expand Up @@ -244,15 +250,33 @@ DataConnectionImpl::DataConnectionImpl(
DataConnectionImpl::DataConnectionImpl(
std::unique_ptr<BackgroundThreads> background,
std::shared_ptr<BigtableStub> stub,
std::shared_ptr<MutateRowsLimiter> limiter, Options options)
: DataConnectionImpl(std::move(background),
std::make_unique<StubManager>(std::move(stub)),
std::move(limiter), std::move(options)) {}

DataConnectionImpl::DataConnectionImpl(
std::unique_ptr<BackgroundThreads> background,
std::unique_ptr<StubManager> stub_manager,
std::unique_ptr<OperationContextFactory> operation_context_factory,
std::shared_ptr<MutateRowsLimiter> limiter, Options options)
: background_(std::move(background)),
stub_(std::move(stub)),
stub_manager_(std::move(stub_manager)),
operation_context_factory_(std::move(operation_context_factory)),
limiter_(std::move(limiter)),
options_(internal::MergeOptions(std::move(options),
DataConnection::options())) {}

DataConnectionImpl::DataConnectionImpl(
std::unique_ptr<BackgroundThreads> background,
std::shared_ptr<BigtableStub> stub,
std::unique_ptr<OperationContextFactory> operation_context_factory,
std::shared_ptr<MutateRowsLimiter> limiter, Options options)
: DataConnectionImpl(std::move(background),
std::make_unique<StubManager>(std::move(stub)),
std::move(operation_context_factory),
std::move(limiter), std::move(options)) {}

Status DataConnectionImpl::Apply(std::string const& table_name,
bigtable::SingleRowMutation mut) {
auto current = google::cloud::internal::SaveCurrentOptions();
Expand All @@ -270,14 +294,16 @@ Status DataConnectionImpl::Apply(std::string const& table_name,

auto operation_context = operation_context_factory_->MutateRow(
table_name, app_profile_id(*current));

auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name));
auto sor = google::cloud::internal::RetryLoop(
retry_policy(*current), backoff_policy(*current),
is_idempotent ? Idempotency::kIdempotent : Idempotency::kNonIdempotent,
[this, &operation_context](
[stub, &operation_context](
grpc::ClientContext& context, Options const& options,
google::bigtable::v2::MutateRowRequest const& request) {
operation_context->PreCall(context);
auto s = stub_->MutateRow(context, options, request);
auto s = stub->MutateRow(context, options, request);
operation_context->PostCall(context, s.status());
return s;
},
Expand Down Expand Up @@ -311,7 +337,9 @@ future<Status> DataConnectionImpl::AsyncApply(std::string const& table_name,
is_idempotent ? Idempotency::kIdempotent
: Idempotency::kNonIdempotent,
background_->cq(),
[stub = stub_, operation_context](
[stub =
stub_manager_->GetStub(InstanceNameFromTableName(table_name)),
operation_context](
CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
Expand Down Expand Up @@ -350,8 +378,9 @@ std::vector<bigtable::FailedMutation> DataConnectionImpl::BulkApply(
std::unique_ptr<bigtable::DataRetryPolicy> retry;
std::unique_ptr<BackoffPolicy> backoff;
Status status;
auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name));
while (true) {
status = mutator.MakeOneRequest(*stub_, *limiter_, *current);
status = mutator.MakeOneRequest(*stub, *limiter_, *current);
if (!mutator.HasPendingMutations()) break;
if (!retry) retry = retry_policy(*current);
if (!backoff) backoff = backoff_policy(*current);
Expand All @@ -372,18 +401,22 @@ DataConnectionImpl::AsyncBulkApply(std::string const& table_name,
auto operation_context = operation_context_factory_->MutateRows(
table_name, app_profile_id(*current));
return AsyncBulkApplier::Create(
background_->cq(), stub_, limiter_, retry_policy(*current),
backoff_policy(*current), enable_server_retries(*current),
*idempotency_policy(*current), app_profile_id(*current), table_name,
std::move(mut), std::move(operation_context));
background_->cq(),
stub_manager_->GetStub(InstanceNameFromTableName(table_name)), limiter_,
retry_policy(*current), backoff_policy(*current),
enable_server_retries(*current), *idempotency_policy(*current),
app_profile_id(*current), table_name, std::move(mut),
std::move(operation_context));
}

bigtable::RowReader DataConnectionImpl::ReadRowsFull(
bigtable::ReadRowsParams params) {
auto current = google::cloud::internal::SaveCurrentOptions();
auto operation_context = operation_context_factory_->ReadRows(
params.table_name, params.app_profile_id);
return ReadRowsHelper(stub_, current, std::move(params),
auto stub =
stub_manager_->GetStub(InstanceNameFromTableName(params.table_name));
return ReadRowsHelper(stub, current, std::move(params),
std::move(operation_context));
}

Expand All @@ -398,8 +431,10 @@ StatusOr<std::pair<bool, bigtable::Row>> DataConnectionImpl::ReadRow(
// OperationContextFactory::ReadRow to create the operation_context.
auto operation_context =
operation_context_factory_->ReadRow(table_name, app_profile_id(*current));
auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name));

auto reader =
ReadRowsHelper(stub_, current,
ReadRowsHelper(stub, current,
bigtable::ReadRowsParams{
table_name, app_profile_id(*current),
std::move(row_set), rows_limit, std::move(filter)},
Expand Down Expand Up @@ -439,13 +474,14 @@ StatusOr<bigtable::MutationBranch> DataConnectionImpl::CheckAndMutateRow(
: Idempotency::kNonIdempotent;
auto operation_context = operation_context_factory_->CheckAndMutateRow(
table_name, app_profile_id(*current));
auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name));
auto sor = google::cloud::internal::RetryLoop(
retry_policy(*current), backoff_policy(*current), idempotency,
[this, &operation_context](
[stub, &operation_context](
grpc::ClientContext& context, Options const& options,
google::bigtable::v2::CheckAndMutateRowRequest const& request) {
operation_context->PreCall(context);
auto s = stub_->CheckAndMutateRow(context, options, request);
auto s = stub->CheckAndMutateRow(context, options, request);
operation_context->PostCall(context, s.status());
return s;
},
Expand Down Expand Up @@ -483,10 +519,11 @@ DataConnectionImpl::AsyncCheckAndMutateRow(
auto backoff = backoff_policy(*current);
auto operation_context = operation_context_factory_->CheckAndMutateRow(
table_name, app_profile_id(*current));
auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name));
return google::cloud::internal::AsyncRetryLoop(
std::move(retry), std::move(backoff), idempotency,
background_->cq(),
[stub = stub_, operation_context](
[stub, operation_context](
CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
Expand Down Expand Up @@ -529,11 +566,12 @@ StatusOr<std::vector<bigtable::RowKeySample>> DataConnectionImpl::SampleRows(
std::unique_ptr<BackoffPolicy> backoff;
auto operation_context = operation_context_factory_->SampleRowKeys(
table_name, app_profile_id(*current));
auto stub = stub_manager_->GetStub(InstanceNameFromTableName(table_name));
while (true) {
auto context = std::make_shared<grpc::ClientContext>();
internal::ConfigureContext(*context, internal::CurrentOptions());
operation_context->PreCall(*context);
auto stream = stub_->SampleRowKeys(context, Options{}, request);
auto stream = stub->SampleRowKeys(context, Options{}, request);

absl::optional<Status> status;
while (true) {
Expand Down Expand Up @@ -574,24 +612,28 @@ DataConnectionImpl::AsyncSampleRows(std::string const& table_name) {
auto operation_context = operation_context_factory_->SampleRowKeys(
table_name, app_profile_id(*current));
return AsyncRowSampler::Create(
background_->cq(), stub_, retry_policy(*current),
backoff_policy(*current), enable_server_retries(*current),
app_profile_id(*current), table_name, std::move(operation_context));
background_->cq(),
stub_manager_->GetStub(InstanceNameFromTableName(table_name)),
retry_policy(*current), backoff_policy(*current),
enable_server_retries(*current), app_profile_id(*current), table_name,
std::move(operation_context));
}

StatusOr<bigtable::Row> DataConnectionImpl::ReadModifyWriteRow(
google::bigtable::v2::ReadModifyWriteRowRequest request) {
auto current = google::cloud::internal::SaveCurrentOptions();
auto operation_context = operation_context_factory_->ReadModifyWriteRow(
request.table_name(), app_profile_id(*current));
auto stub =
stub_manager_->GetStub(InstanceNameFromTableName(request.table_name()));
auto sor = google::cloud::internal::RetryLoop(
retry_policy(*current), backoff_policy(*current),
Idempotency::kNonIdempotent,
[this, operation_context](
[stub, operation_context](
grpc::ClientContext& context, Options const& options,
google::bigtable::v2::ReadModifyWriteRowRequest const& request) {
operation_context->PreCall(context);
auto result = stub_->ReadModifyWriteRow(context, options, request);
auto result = stub->ReadModifyWriteRow(context, options, request);
operation_context->PostCall(context, result.status());
return result;
},
Expand All @@ -608,10 +650,12 @@ future<StatusOr<bigtable::Row>> DataConnectionImpl::AsyncReadModifyWriteRow(
request.table_name(), app_profile_id(*current));
auto retry = retry_policy(*current);
auto backoff = backoff_policy(*current);
auto stub =
stub_manager_->GetStub(InstanceNameFromTableName(request.table_name()));
return google::cloud::internal::AsyncRetryLoop(
std::move(retry), std::move(backoff), Idempotency::kNonIdempotent,
background_->cq(),
[stub = stub_, operation_context](
[stub, operation_context](
CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
Expand Down Expand Up @@ -648,11 +692,12 @@ void DataConnectionImpl::AsyncReadRowsHelper(
std::shared_ptr<OperationContext> operation_context) {
auto reverse = internal::CurrentOptions().get<bigtable::ReverseScanOption>();
bigtable_internal::AsyncRowReader::Create(
background_->cq(), stub_, app_profile_id(*current), table_name,
std::move(on_row), std::move(on_finish), std::move(row_set), rows_limit,
std::move(filter), reverse, retry_policy(*current),
backoff_policy(*current), enable_server_retries(*current),
std::move(operation_context));
background_->cq(),
stub_manager_->GetStub(InstanceNameFromTableName(table_name)),
app_profile_id(*current), table_name, std::move(on_row),
std::move(on_finish), std::move(row_set), rows_limit, std::move(filter),
reverse, retry_policy(*current), backoff_policy(*current),
enable_server_retries(*current), std::move(operation_context));
}

void DataConnectionImpl::AsyncReadRows(
Expand Down Expand Up @@ -743,14 +788,15 @@ StatusOr<bigtable::PreparedQuery> DataConnectionImpl::PrepareQuery(
}
auto operation_context = operation_context_factory_->PrepareQuery(
instance_full_name, app_profile_id(*current));
auto stub = stub_manager_->GetStub(params.instance.FullName());
auto response = google::cloud::internal::RetryLoop(
retry_policy(*current), backoff_policy(*current),
Idempotency::kIdempotent,
[this, operation_context](
[stub, operation_context](
grpc::ClientContext& context, Options const& options,
google::bigtable::v2::PrepareQueryRequest const& request) {
operation_context->PreCall(context);
auto const& result = stub_->PrepareQuery(context, options, request);
auto const& result = stub->PrepareQuery(context, options, request);
operation_context->PostCall(context, result.status());
return result;
},
Expand Down Expand Up @@ -779,17 +825,18 @@ StatusOr<bigtable::PreparedQuery> DataConnectionImpl::PrepareQuery(
auto backoff = backoff_policy(*current);
auto operation_context = operation_context_factory_->PrepareQuery(
request.instance_name(), app_profile_id(*current));
auto stub = stub_manager_->GetStub(request.instance_name());
return google::cloud::internal::AsyncRetryLoop(
std::move(retry), std::move(backoff), Idempotency::kIdempotent,
background_->cq(),
[this, operation_context](
[stub, operation_context](
CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::bigtable::v2::PrepareQueryRequest const& request) {
operation_context->PreCall(*context);
auto f = stub_->AsyncPrepareQuery(cq, context,
std::move(options), request);
auto f = stub->AsyncPrepareQuery(cq, context,
std::move(options), request);
return f.then(
[operation_context, context = std::move(context)](auto f) {
auto s = f.get();
Expand Down Expand Up @@ -827,17 +874,19 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
auto operation_context = operation_context_factory_->PrepareQuery(
instance_full_name, app_profile_id(*current));
auto const* func = __func__;
auto instance_name = params.instance.FullName();
auto stub = stub_manager_->GetStub(instance_name);
return google::cloud::internal::AsyncRetryLoop(
std::move(retry), std::move(backoff), Idempotency::kIdempotent,
background_->cq(),
[this, operation_context](
[stub, instance_name, operation_context](
CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::bigtable::v2::PrepareQueryRequest const& request) {
operation_context->PreCall(*context);
auto f = stub_->AsyncPrepareQuery(cq, context,
std::move(options), request);
auto f = stub->AsyncPrepareQuery(cq, context, std::move(options),
request);
return f.then(
[operation_context, context = std::move(context)](auto f) {
auto s = f.get();
Expand All @@ -846,7 +895,7 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
});
},
current, request, func)
.then([this, request, operation_context, current,
.then([this, instance_name, request, operation_context, current,
params = std::move(params),
func](future<StatusOr<google::bigtable::v2::PrepareQueryResponse>>
future) -> StatusOr<bigtable::PreparedQuery> {
Expand All @@ -869,23 +918,25 @@ future<StatusOr<bigtable::PreparedQuery>> DataConnectionImpl::AsyncPrepareQuery(
"Column type cannot be empty", GCP_ERROR_INFO()));
}
}
auto refresh_fn = [this, request, func]() mutable {
auto refresh_fn = [this, instance_name, request, func]() mutable {
auto current = google::cloud::internal::SaveCurrentOptions();
auto retry = query_plan_refresh_function_retry_policy(*current);
auto backoff = backoff_policy(*current);
auto operation_context = operation_context_factory_->PrepareQuery(
request.instance_name(), app_profile_id(*current));
// Get a new stub here to take advantage of the pool.
auto stub = stub_manager_->GetStub(instance_name);
return google::cloud::internal::AsyncRetryLoop(
std::move(retry), std::move(backoff),
Idempotency::kIdempotent, background_->cq(),
[this, operation_context](
[stub, instance_name, operation_context](
CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::bigtable::v2::PrepareQueryRequest const&
request) {
operation_context->PreCall(*context);
auto f = stub_->AsyncPrepareQuery(
auto f = stub->AsyncPrepareQuery(
cq, context, std::move(options), request);
return f.then([operation_context,
context = std::move(context)](auto f) {
Expand Down Expand Up @@ -1052,9 +1103,9 @@ bigtable::RowStream DataConnectionImpl::ExecuteQuery(

auto const tracing_enabled = RpcStreamTracingEnabled();
auto const& tracing_options = RpcTracingOptions();

auto stub = stub_manager_->GetStub(params.bound_query.instance().FullName());
auto source_fn =
[stub = stub_, tracing_enabled, tracing_options](
[stub, tracing_enabled, tracing_options](
google::bigtable::v2::ExecuteQueryRequest& request,
google::bigtable::v2::ResultSetMetadata metadata,
std::unique_ptr<bigtable::DataRetryPolicy> retry_policy_prototype,
Expand Down
Loading
Loading