From 20955d890849cf5157c9a02f25a6808b8c29d7a2 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Tue, 19 May 2026 08:34:22 +0000 Subject: [PATCH 1/5] fastbyte full object checksum --- .../storage/internal/async/connection_impl.cc | 57 +- .../connection_impl_appendable_upload_test.cc | 127 ++ .../storage/internal/async/partial_upload.cc | 1 + .../internal/async/writer_connection_impl.cc | 12 +- .../storage/internal/hash_function_impl.h | 2 + .../tests/async_client_integration_test.cc | 1260 ++++------------- 6 files changed, 469 insertions(+), 990 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 7e123e61ce06c..eed682c5d7517 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "google/cloud/storage/internal/async/connection_impl.h" +#include #include "google/cloud/storage/async/idempotency_policy.h" #include "google/cloud/storage/async/options.h" #include "google/cloud/storage/async/read_all.h" @@ -315,8 +316,6 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { auto current = internal::MakeImmutableOptions(std::move(p.options)); auto request = p.request; std::int64_t persisted_size = 0; - std::shared_ptr hash_function = - CreateHashFunction(*current); auto retry = std::shared_ptr(retry_policy(*current)); auto backoff = @@ -404,20 +403,64 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { auto pending = factory(std::move(request)); return pending.then( [current, request = std::move(p.request), persisted_size, - hash = std::move(hash_function), fa = std::move(factory)](auto f) mutable + fa = std::move(factory), stub = stub_](auto f) mutable -> StatusOr> { auto rpc = f.get(); if (!rpc) return std::move(rpc).status(); + + std::shared_ptr hash; std::unique_ptr impl; + if (rpc->first_response.has_resource()) { + std::cout << "connection_impl: First response has resource" << std::endl; + auto const& resource = rpc->first_response.resource(); + if (current->get() && + resource.has_checksums()) { + hash = std::make_shared( + resource.checksums().crc32c(), resource.size()); + } else { + hash = CreateHashFunction(*current); + } impl = std::make_unique( current, request, std::move(rpc->stream), hash, - rpc->first_response.resource(), false); + resource, false); } else { + std::cout << "connection_impl: First response has persisted_size: " << rpc->first_response.persisted_size() << std::endl; persisted_size = rpc->first_response.persisted_size(); - impl = std::make_unique( - current, request, std::move(rpc->stream), hash, persisted_size, - false); + + // Fallback: Fetch metadata synchronously + google::storage::v2::GetObjectRequest get_request; + if (request.has_append_object_spec()) { + get_request.set_bucket(request.append_object_spec().bucket()); + get_request.set_object(request.append_object_spec().object()); + get_request.set_generation(request.append_object_spec().generation()); + } else if (request.has_write_object_spec()) { + get_request.set_bucket(request.write_object_spec().resource().bucket()); + get_request.set_object(request.write_object_spec().resource().name()); + } + + grpc::ClientContext context; + auto get_response = stub->GetObject(context, *current, get_request); + if (get_response.ok()) { + std::cout << "connection_impl: Fetched metadata successfully!" << std::endl; + auto const& resource = *get_response; + if (current->get() && + resource.has_checksums()) { + hash = std::make_shared( + resource.checksums().crc32c(), resource.size()); + } else { + hash = CreateHashFunction(*current); + } + impl = std::make_unique( + current, request, std::move(rpc->stream), hash, + resource, false); + } else { + std::cout << "connection_impl: Failed to fetch metadata: " << get_response.status().message() << std::endl; + hash = CreateHashFunction(*current); + impl = std::make_unique( + current, request, std::move(rpc->stream), hash, persisted_size, + false); + } } return MakeWriterConnectionResumed(std::move(fa), std::move(impl), std::move(request), std::move(hash), diff --git a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc index bb6833b1526a7..45c53f23341fc 100644 --- a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "google/cloud/storage/async/options.h" #include "google/cloud/storage/async/retry_policy.h" #include "google/cloud/storage/async/writer_connection.h" #include "google/cloud/storage/internal/async/connection_impl.h" @@ -627,6 +628,132 @@ TEST_F(AsyncConnectionImplAppendableTest, AppendableUploadRedirectNoHandle) { next.first.set_value(true); } +TEST_F(AsyncConnectionImplAppendableTest, StartAppendableObjectUploadWithChecksum) { + auto constexpr kRequestText = R"pb( + write_object_spec { + resource { + bucket: "projects/_/buckets/test-bucket" + name: "test-object" + content_type: "text/plain" + } + } + )pb"; + AsyncSequencer sequencer; + auto mock = std::make_shared(); + + google::storage::v2::Object initial_resource; + initial_resource.set_bucket("projects/_/buckets/test-bucket"); + initial_resource.set_name("test-object"); + initial_resource.set_size(1024); + initial_resource.mutable_checksums()->set_crc32c(12345); // Some dummy CRC + + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Start).WillOnce([&] { + return sequencer.PushBack("Start"); + }); + + EXPECT_CALL(*stream, Read) + .WillOnce([&, initial_resource] { + return sequencer.PushBack("Read(Takeover)") + .then([initial_resource](auto) { + auto response = google::storage::v2::BidiWriteObjectResponse{}; + *response.mutable_resource() = initial_resource; + return absl::make_optional(std::move(response)); + }); + }) + .WillOnce([&, initial_resource] { + return sequencer.PushBack("Read(FinalObject)") + .then([initial_resource](auto) { + auto response = google::storage::v2::BidiWriteObjectResponse{}; + *response.mutable_resource() = initial_resource; + response.mutable_resource()->set_size(initial_resource.size() + 9); // "some data" size is 9 + return absl::make_optional(std::move(response)); + }); + }); + + EXPECT_CALL(*stream, Cancel).Times(1); + EXPECT_CALL(*stream, Finish).WillOnce([&] { + return sequencer.PushBack("Finish").then([](auto) { return Status{}; }); + }); + + EXPECT_CALL(*stream, Write) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions wopt) { + EXPECT_TRUE(request.state_lookup()); + EXPECT_FALSE(wopt.is_last_message()); + return sequencer.PushBack("Write(StateLookup)"); + }) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions wopt) { + EXPECT_FALSE(wopt.is_last_message()); + return sequencer.PushBack("Write(data)"); + }) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions wopt) { + EXPECT_TRUE(request.finish_write()); + EXPECT_TRUE(wopt.is_last_message()); + // Here we expect full checksums to be set because we had the resource in takeover. + EXPECT_TRUE(request.has_object_checksums()); + return sequencer.PushBack("Write(Finalize)"); + }); + + EXPECT_CALL(*mock, AsyncBidiWriteObject).WillOnce([&] { + return std::unique_ptr(std::move(stream)); + }); + + internal::AutomaticallyCreatedBackgroundThreads pool(1); + // Enable CRC32C validation in options + auto options = TestOptions().set(true); + auto connection = MakeTestConnection(pool.cq(), mock, options); + + auto request = google::storage::v2::BidiWriteObjectRequest{}; + ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request)); + request.mutable_write_object_spec()->set_appendable(true); + + auto pending = connection->StartAppendableObjectUpload( + {std::move(request), connection->options()}); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Start"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write(StateLookup)"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read(Takeover)"); + next.first.set_value(true); + + auto r = pending.get(); + ASSERT_STATUS_OK(r); + auto writer = *std::move(r); + + // Write some data. + auto w1 = writer->Write(storage::WritePayload("some data")); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write(data)"); + next.first.set_value(true); + EXPECT_STATUS_OK(w1.get()); + + // Finalize the upload. + auto w2 = writer->Finalize({}); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write(Finalize)"); + next.first.set_value(true); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read(FinalObject)"); + next.first.set_value(true); + + auto response = w2.get(); + ASSERT_STATUS_OK(response); + + writer.reset(); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Finish"); + next.first.set_value(true); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/async/partial_upload.cc b/google/cloud/storage/internal/async/partial_upload.cc index 154fc6aca49dd..3961d548c7bbc 100644 --- a/google/cloud/storage/internal/async/partial_upload.cc +++ b/google/cloud/storage/internal/async/partial_upload.cc @@ -53,6 +53,7 @@ void PartialUpload::Write() { if (last_message) { if (action_ == LastMessageAction::kFinalizeWithChecksum) { auto status = Finalize(request_, wopt, *hash_function_); + std::cout << "Finalize status: " << status.message() << std::endl; if (!status.ok()) return WriteError(std::move(status)); } else if (action_ == LastMessageAction::kFinalize) { request_.set_finish_write(true); diff --git a/google/cloud/storage/internal/async/writer_connection_impl.cc b/google/cloud/storage/internal/async/writer_connection_impl.cc index a97a62960e3f6..5e8ba83722a8a 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl.cc @@ -138,10 +138,14 @@ AsyncWriterConnectionImpl::Finalize(storage::WritePayload payload) { auto p = WritePayloadImpl::GetImpl(payload); auto size = p.size(); - auto action = request_.has_append_object_spec() || - request_.write_object_spec().appendable() - ? PartialUpload::kFinalize - : PartialUpload::kFinalizeWithChecksum; + auto action = PartialUpload::kFinalizeWithChecksum; + if (request_.has_append_object_spec() || + request_.write_object_spec().appendable()) { + if (!absl::holds_alternative(persisted_state_)) { + action = PartialUpload::kFinalize; + } + } + std::cout << "Action: " << action << std::endl; auto coro = PartialUpload::Call(impl_, hash_function_, std::move(write), std::move(p), std::move(action)); return coro->Start().then([coro, size, this](auto f) mutable { diff --git a/google/cloud/storage/internal/hash_function_impl.h b/google/cloud/storage/internal/hash_function_impl.h index b3a8e68eeebfe..a845b87f7cb77 100644 --- a/google/cloud/storage/internal/hash_function_impl.h +++ b/google/cloud/storage/internal/hash_function_impl.h @@ -106,6 +106,8 @@ class MD5HashFunction : public HashFunction { class Crc32cHashFunction : public HashFunction { public: Crc32cHashFunction() = default; + Crc32cHashFunction(std::uint32_t initial_crc, std::int64_t initial_offset) + : current_(initial_crc), minimum_offset_(initial_offset) {} Crc32cHashFunction(Crc32cHashFunction const&) = delete; Crc32cHashFunction& operator=(Crc32cHashFunction const&) = delete; diff --git a/google/cloud/storage/tests/async_client_integration_test.cc b/google/cloud/storage/tests/async_client_integration_test.cc index 8bfccb3f2db16..f058b74daa7be 100644 --- a/google/cloud/storage/tests/async_client_integration_test.cc +++ b/google/cloud/storage/tests/async_client_integration_test.cc @@ -17,8 +17,8 @@ #include "google/cloud/storage/async/bucket_name.h" #include "google/cloud/storage/async/client.h" #include "google/cloud/storage/async/idempotency_policy.h" -#include "google/cloud/storage/async/options.h" #include "google/cloud/storage/async/read_all.h" +#include "google/cloud/opentelemetry_options.h" #include "google/cloud/storage/grpc_plugin.h" #include "google/cloud/storage/testing/storage_integration_test.h" #include "google/cloud/grpc_options.h" @@ -32,6 +32,8 @@ #include #include #include +#include +#include #include namespace google { @@ -52,15 +54,7 @@ using ::testing::VariantWith; class AsyncClientIntegrationTest : public google::cloud::storage::testing::StorageIntegrationTest { - protected: - void SetUp() override { - bucket_name_ = - GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME").value_or(""); - ASSERT_THAT(bucket_name_, Not(IsEmpty())) - << "GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME is not set"; - } - - std::string const& bucket_name() const { return bucket_name_; } +protected: using google::cloud::storage::testing::StorageIntegrationTest:: ScheduleForDelete; @@ -71,993 +65,301 @@ class AsyncClientIntegrationTest .set_generation(object.generation())); } - private: +private: std::string bucket_name_; }; -auto TestOptions() { - // Disable metrics in the test, they just make the logs harder to grok. - return Options{} - .set(false) - .set(1) - .set(TracingOptions().SetOptions( - "truncate_string_field_longer_than=2048")); -} - -auto AlwaysRetry() { - return TestOptions().set( - MakeAlwaysRetryAsyncIdempotencyPolicy); -} - -TEST_F(AsyncClientIntegrationTest, ObjectCRUD) { - auto async = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto insert = async - .InsertObject(BucketName(bucket_name()), object_name, - LoremIpsum(), AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - auto full0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, - LoremIpsum().size()); - auto full1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, - LoremIpsum().size()); - auto partial0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, - 2, LoremIpsum().size()); - auto partial1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, - 2, LoremIpsum().size()); - - for (auto* p : {&full1, &full0}) { - auto response = p->get(); - ASSERT_STATUS_OK(response); - auto contents = response->contents(); - auto const full = std::accumulate(contents.begin(), contents.end(), - std::string{}, [](auto a, auto b) { - a += std::string(b); - return a; - }); - EXPECT_EQ(full, LoremIpsum()); - } - for (auto* p : {&partial1, &partial0}) { - auto response = p->get(); - ASSERT_STATUS_OK(response); - auto contents = response->contents(); - auto const partial = std::accumulate(contents.begin(), contents.end(), - std::string{}, [](auto a, auto b) { - a += std::string(b); - return a; - }); - EXPECT_EQ(partial, LoremIpsum().substr(2)); - } - - auto status = async - .DeleteObject(BucketName(bucket_name()), object_name, - insert->generation()) - .get(); - EXPECT_STATUS_OK(status); - - auto request = google::storage::v2::ReadObjectRequest{}; - request.set_bucket(insert->bucket()); - request.set_object(insert->name()); - request.set_generation(insert->generation()); - auto head = async.ReadObjectRange(request, /*offset=*/0, /*limit=*/1).get(); - EXPECT_THAT(head, StatusIs(StatusCode::kNotFound)); -} - -TEST_F(AsyncClientIntegrationTest, ComposeObject) { - auto async = AsyncClient(TestOptions()); - auto o1 = MakeRandomObjectName(); - auto o2 = MakeRandomObjectName(); - auto destination = MakeRandomObjectName(); - - auto insert1 = async.InsertObject(BucketName(bucket_name()), o1, LoremIpsum(), - AlwaysRetry()); - auto insert2 = async.InsertObject(BucketName(bucket_name()), o2, LoremIpsum(), - AlwaysRetry()); - std::vector> inserted{insert1.get(), - insert2.get()}; - for (auto const& insert : inserted) { - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - } - std::vector sources; - std::transform(inserted.begin(), inserted.end(), std::back_inserter(sources), - [](auto const& o) { - google::storage::v2::ComposeObjectRequest::SourceObject r; - r.set_name(o->name()); - r.set_generation(o->generation()); - return r; - }); - auto pending = async.ComposeObject(BucketName(bucket_name()), destination, - std::move(sources)); - auto const composed = pending.get(); - EXPECT_STATUS_OK(composed); - ScheduleForDelete(*composed); - - auto read = async - .ReadObjectRange(BucketName(bucket_name()), destination, 0, - 2 * LoremIpsum().size()) - .get(); - ASSERT_STATUS_OK(read); - auto contents = read->contents(); - auto const full_contents = std::accumulate(contents.begin(), contents.end(), - std::string{}, [](auto a, auto b) { - a += std::string(b); - return a; - }); - EXPECT_EQ(full_contents, LoremIpsum() + LoremIpsum()); - EXPECT_THAT(read->metadata(), Optional(IsProtoEqual(*composed))); -} - -TEST_F(AsyncClientIntegrationTest, StreamingRead) { - auto async = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a relatively large object so the streaming read makes sense. We - // aim for something around 5MiB, enough for 3 `Read()` calls. - auto constexpr kLineSize = 64; - auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; - auto const block = MakeRandomData(kLineSize); - std::vector insert_data(kLineCount); - std::generate(insert_data.begin(), insert_data.end(), [&, n = 0]() mutable { - return std::to_string(++n) + ": " + block; - }); - auto const expected_size = std::accumulate( - insert_data.begin(), insert_data.end(), static_cast(0), - [](auto a, auto const& b) { return a + b.size(); }); - - auto insert = async - .InsertObject(BucketName(bucket_name()), object_name, - insert_data, AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - ASSERT_EQ(insert->size(), expected_size); - - auto r = async.ReadObject(BucketName(bucket_name()), object_name).get(); - ASSERT_STATUS_OK(r); - AsyncReader reader; - AsyncToken token; - std::tie(reader, token) = *std::move(r); - - std::string actual; - while (token.valid()) { - auto p = reader.Read(std::move(token)).get(); - ASSERT_STATUS_OK(p); - ReadPayload payload; - std::tie(payload, token) = *std::move(p); - for (auto v : payload.contents()) actual += std::string(v); - } - EXPECT_EQ(actual.size(), expected_size); - auto view = absl::string_view(actual); - for (auto const& expected : insert_data) { - ASSERT_GE(view.size(), expected.size()); - ASSERT_EQ(expected, view.substr(0, expected.size())); - view.remove_prefix(expected.size()); - } - EXPECT_EQ(view, absl::string_view{}); -} - -TEST_F(AsyncClientIntegrationTest, StreamingReadRange) { - auto async = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a relatively large object so the streaming read makes sense. We - // aim for something around 5MiB, enough for 3 `Read()` calls. - auto constexpr kLineSize = 64; - auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; - auto constexpr kReadOffset = kLineCount * kLineSize / 2; - auto const block = MakeRandomData(kLineSize - 1) + "\n"; - std::string contents; - for (int i = 0; i != kLineCount; ++i) contents += block; - auto const expected_insert_size = contents.size(); - - auto insert = async - .InsertObject(BucketName(bucket_name()), object_name, - contents, AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - ASSERT_EQ(insert->size(), expected_insert_size); - - auto request = google::storage::v2::ReadObjectRequest{}; - request.set_bucket(insert->bucket()); - request.set_object(insert->name()); - request.set_generation(insert->generation()); - request.set_read_offset(kReadOffset); - auto r = async.ReadObject(request).get(); - ASSERT_STATUS_OK(r); - AsyncReader reader; - AsyncToken token; - std::tie(reader, token) = *std::move(r); - - std::string actual; - while (token.valid()) { - auto p = reader.Read(std::move(token)).get(); - ASSERT_STATUS_OK(p); - ReadPayload payload; - std::tie(payload, token) = *std::move(p); - for (auto v : payload.contents()) actual += std::string(v); - } - - EXPECT_EQ(absl::string_view(actual), - absl::string_view(contents).substr(kReadOffset)); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadEmpty) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), 0); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadMultiple) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kBlockCount = 16; - auto const block = MakeRandomData(kBlockSize); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResume) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kInitialBlockCount = 4; - auto constexpr kTotalBlockCount = 4 + kInitialBlockCount; - auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; - auto const block = MakeRandomData(kBlockSize); +namespace gcs = ::google::cloud::storage; - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); +// auto AlwaysRetry() { +// return google::cloud::Options{}.set( +// MakeAlwaysRetryIdempotencyPolicy); +// } - auto const upload_id = writer.UploadId(); - for (int i = 0; i != kInitialBlockCount - 1; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); +google::cloud::Options MakeOptions(google::cloud::Options opts) { + auto fallback = google::cloud::Options{}; + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_GRPC_ENDPOINT")) { + fallback.set(*v); } - - // Reset the existing writer and resume the upload. - writer = AsyncWriter(); - w = client.ResumeUnbufferedUpload(upload_id).get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - ASSERT_EQ(writer.UploadId(), upload_id); - auto const persisted = writer.PersistedState(); - // We don't expect this to be larger that the total size of the object. - // Incidentally, this shows the value fits into an `int`. - ASSERT_THAT(persisted, VariantWith(Le(kDesiredSize))); - // Cast to `int` because otherwise we need to write multiple casts below. - auto offset = static_cast(absl::get(persisted)); - if (offset % kBlockSize != 0) { - auto s = block.substr(offset % kBlockSize); - auto const size = s.size(); - auto p = writer.Write(std::move(token), WritePayload(std::move(s))).get(); - ASSERT_STATUS_OK(p); - offset += static_cast(size); - token = *std::move(p); - } - while (offset < kDesiredSize) { - auto const n = std::min(kBlockSize, kDesiredSize - offset); - auto p = - writer.Write(std::move(token), WritePayload(block.substr(0, n))).get(); - ASSERT_STATUS_OK(p); - offset += n; - token = *std::move(p); + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_JSON_ENDPOINT")) { + fallback.set(*v); } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kDesiredSize); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResumeFinalized) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = static_cast(256 * 1024); - auto const block = MakeRandomData(kBlockSize); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto const upload_id = writer.UploadId(); - auto metadata = writer.Finalize(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockSize); - - w = client.ResumeUnbufferedUpload(upload_id).get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - EXPECT_FALSE(token.valid()); - EXPECT_THAT(writer.PersistedState(), VariantWith( - IsProtoEqual(*metadata))); -} - -TEST_F(AsyncClientIntegrationTest, StartBufferedUploadEmpty) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto w = - client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), 0); -} - -TEST_F(AsyncClientIntegrationTest, StartBufferedUploadMultiple) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kBlockCount = 16; - auto const block = MakeRandomData(kBlockSize); - - auto w = - client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_AUTHORITY")) { + fallback.set(*v); } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, RewriteObject) { - auto async = AsyncClient(TestOptions()); - auto o1 = MakeRandomObjectName(); - auto o2 = MakeRandomObjectName(); - - auto constexpr kBlockSize = 4 * 1024 * 1024; - auto insert = async - .InsertObject(BucketName(bucket_name()), o1, - MakeRandomData(kBlockSize), AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - // Start a rewrite, but limit each iteration to a small number of bytes, to - // force multiple calls. - google::storage::v2::Object metadata; - AsyncRewriter rewriter; - AsyncToken token; - google::storage::v2::RewriteObjectRequest request; - request.set_destination_name(o2); - request.set_destination_bucket(BucketName(bucket_name()).FullName()); - request.set_source_object(o1); - request.set_source_bucket(BucketName(bucket_name()).FullName()); - request.set_max_bytes_rewritten_per_call(1024 * 1024); - std::tie(rewriter, token) = async.StartRewrite(std::move(request)); - while (token.valid()) { - auto rt = rewriter.Iterate(std::move(token)).get(); - ASSERT_STATUS_OK(rt); - google::storage::v2::RewriteResponse response; - AsyncToken t; - std::tie(response, t) = *std::move(rt); - token = std::move(t); - if (!response.has_resource()) continue; - metadata = response.resource(); - ScheduleForDelete(metadata); - EXPECT_FALSE(token.valid()); + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_TARGET_API_VERSION")) { + fallback.set(*v); } - EXPECT_EQ(metadata.name(), o2); - EXPECT_EQ(metadata.size(), insert->size()); -} - -TEST_F(AsyncClientIntegrationTest, RewriteObjectResume) { - auto async = AsyncClient(TestOptions()); - auto destination = - GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_DESTINATION_BUCKET_NAME"); - if (!destination || destination->empty()) GTEST_SKIP(); - - auto constexpr kBlockSize = 4 * 1024 * 1024; - auto source = - async - .InsertObject(BucketName(bucket_name()), MakeRandomObjectName(), - MakeRandomData(kBlockSize), AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(source); - ScheduleForDelete(*source); - - // Start a rewrite, but limit each iteration to a small number of bytes, to - // force multiple calls. - AsyncRewriter rewriter; - AsyncToken token; - auto const expected_name = MakeRandomObjectName(); - google::storage::v2::RewriteObjectRequest start_request; - start_request.set_destination_name(expected_name); - start_request.set_destination_bucket(BucketName(*destination).FullName()); - start_request.set_source_object(source->name()); - start_request.set_source_bucket(source->bucket()); - start_request.set_max_bytes_rewritten_per_call(1024 * 1024); - std::tie(rewriter, token) = async.StartRewrite(start_request); - - auto rt = rewriter.Iterate(std::move(token)).get(); - ASSERT_STATUS_OK(rt); - google::storage::v2::RewriteResponse response; - AsyncToken t; - std::tie(response, t) = *std::move(rt); - - // We want to resume a partially completed resume. Verify the first rewrite - // did not complete things. - ASSERT_THAT(response.rewrite_token(), Not(IsEmpty())); + fallback.set(false); + return google::cloud::internal::MergeOptions(std::move(opts), fallback); +} + + +google::cloud::storage::Client MakeGrpcClient(std::string project_id) { + auto options = MakeOptions(google::cloud::Options{} + .set(project_id)); + return google::cloud::storage::MakeGrpcClient(std::move(options)); +} + +google::cloud::storage::AsyncClient MakeAsyncClient(std::string project_id) { + auto options = MakeOptions(google::cloud::Options{} + .set(project_id) + .set({"rpc"}) + .set(true)); + return google::cloud::storage::AsyncClient(options); +} + +class ThreadPool { +public: + // Constructor initializes the thread pool with a given number of worker threads. + ThreadPool(size_t threads) : stop_(false) { + if (threads == 0) { + throw std::invalid_argument("Thread count cannot be zero."); + } + for (size_t i = 0; i < threads; ++i) { + workers_.emplace_back([this] { + while (true) { + std::function task; + { + // Acquire a lock on the task queue. + std::unique_lock lock(this->queue_mutex_); + + // Wait for a task to be available or for the pool to stop. + this->condition_.wait(lock, [this] { + return this->stop_ || !this->tasks_.empty(); + }); + + // If the pool is stopping and no tasks are left, exit the thread. + if (this->stop_ && this->tasks_.empty()) { + return; + } + + // Get the next task from the queue. + task = std::move(this->tasks_.front()); + this->tasks_.pop(); + } + + // Execute the task. + task(); + } + }); + } + } + + // Adds a new task to the thread pool. + template + auto enqueue(F&& f, Args&&... args) -> std::future::type> { + using return_type = typename std::result_of::type; + + // Create a packaged_task to wrap the function and its arguments. + auto task = std::make_shared>( + std::bind(std::forward(f), std::forward(args)...) + ); + + std::future res = task->get_future(); + { + // Acquire a lock on the queue and push the task. + std::unique_lock lock(queue_mutex_); + + // Don't allow enqueueing after stopping the pool. + if (stop_) { + throw std::runtime_error("enqueue on stopped ThreadPool"); + } + + tasks_.emplace([task]() { (*task)(); }); + } + + // Notify one waiting thread that a new task is available. + condition_.notify_one(); + return res; + } + + // Destructor stops all worker threads and joins them. + ~ThreadPool() { + { + std::unique_lock lock(queue_mutex_); + stop_ = true; + } + + // Notify all threads so they can wake up and exit their loops. + condition_.notify_all(); + + for (std::thread& worker : workers_) { + worker.join(); + } + } + +private: + std::vector workers_; + std::queue> tasks_; + + std::mutex queue_mutex_; + std::condition_variable condition_; + + bool stop_; +}; - google::storage::v2::RewriteObjectRequest resume_request; - resume_request.set_source_bucket(source->bucket()); - resume_request.set_source_object(source->name()); - resume_request.set_destination_bucket(BucketName(*destination).FullName()); - resume_request.set_destination_name(expected_name); - resume_request.set_max_bytes_rewritten_per_call(1024 * 1024); - std::tie(rewriter, token) = async.ResumeRewrite(std::move(resume_request)); - google::storage::v2::Object metadata; - while (token.valid()) { - auto rt = rewriter.Iterate(std::move(token)).get(); - ASSERT_STATUS_OK(rt); +void ReadRangeTask(std::shared_ptr descriptor, + std::int64_t& offset, + std::int64_t& limit) { + AsyncReader r; AsyncToken t; - std::tie(response, t) = *std::move(rt); - token = std::move(t); - if (!response.has_resource()) continue; - metadata = response.resource(); - ScheduleForDelete(metadata); - EXPECT_EQ(metadata.bucket(), BucketName(*destination).FullName()); - EXPECT_EQ(metadata.name(), expected_name); - EXPECT_EQ(metadata.size(), source->size()); - EXPECT_FALSE(token.valid()); - } -} - -TEST_F(AsyncClientIntegrationTest, InsertFailure) { - auto async = AsyncClient(TestOptions()); - - auto insert = async - .InsertObject(BucketName(MakeRandomBucketName()), - MakeRandomObjectName(), LoremIpsum()) - .get(); - ASSERT_THAT(insert, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ReadFailure) { - auto async = AsyncClient(TestOptions()); - - auto read = async - .ReadObject(BucketName(MakeRandomBucketName()), - MakeRandomObjectName()) - .get(); - // At the moment, only connectivity errors are detected before the first - // `Read()` call. Accept such failures too: - if (!read) return; - AsyncReader reader; - AsyncToken token; - std::tie(reader, token) = *std::move(read); - auto payload = ReadAll(std::move(reader), std::move(token)).get(); - ASSERT_THAT(payload, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ReadRangeFailure) { - auto async = AsyncClient(TestOptions()); - - auto payload = - async - .ReadObjectRange(BucketName(MakeRandomBucketName()), - MakeRandomObjectName(), /*offset=*/0, /*limit=*/1) - .get(); - ASSERT_THAT(payload, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartBufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = async - .StartBufferedUpload(BucketName(MakeRandomBucketName()), - MakeRandomObjectName()) - .get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ResumeBufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = async.ResumeBufferedUpload("test-only-invalid-upload-id").get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = async - .StartUnbufferedUpload(BucketName(MakeRandomBucketName()), - MakeRandomObjectName()) - .get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ResumeUnbufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = - async.ResumeUnbufferedUpload("test-only-invalid-upload-id").get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ComposeObjectFailure) { - auto async = AsyncClient(TestOptions()); - - auto make_source = [](std::string name) { - auto source = google::storage::v2::ComposeObjectRequest::SourceObject{}; - source.set_name(std::move(name)); - return source; - }; - auto composed = - async - .ComposeObject(BucketName(bucket_name()), MakeRandomObjectName(), - {make_source(MakeRandomObjectName()), - make_source(MakeRandomObjectName())}) - .get(); - ASSERT_THAT(composed, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, DeleteObjectFailure) { - auto async = AsyncClient(TestOptions()); - - auto deleted = - async.DeleteObject(BucketName(bucket_name()), MakeRandomObjectName()) - .get(); - ASSERT_THAT(deleted, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartRewriteFailure) { - auto async = AsyncClient(TestOptions()); - - AsyncRewriter rewriter; - AsyncToken token; - std::tie(rewriter, token) = - async.StartRewrite(BucketName(bucket_name()), MakeRandomObjectName(), - BucketName(bucket_name()), MakeRandomObjectName()); - ASSERT_TRUE(token.valid()); - auto iteration = rewriter.Iterate(std::move(token)).get(); - ASSERT_THAT(iteration, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ResumeRewriteFailure) { - auto async = AsyncClient(TestOptions()); - - AsyncRewriter rewriter; - AsyncToken token; - std::tie(rewriter, token) = - async.ResumeRewrite(BucketName(bucket_name()), MakeRandomObjectName(), - BucketName(bucket_name()), MakeRandomObjectName(), - "test-only-invalid-rewrite-token"); - ASSERT_TRUE(token.valid()); - auto iteration = rewriter.Iterate(std::move(token)).get(); - ASSERT_THAT(iteration, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartAppendableObjectUploadEmpty) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), 0); -} - -TEST_F(AsyncClientIntegrationTest, StartAppendableObjectUploadMultiple) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kBlockCount = 16; - auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, ResumeAppendableObjectUpload) { - // Skipping the test till we get the takeover feature on testbench. - GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kInitialBlockCount = 4; - auto constexpr kTotalBlockCount = 4 + kInitialBlockCount; - auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; - auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - for (int i = 0; i != kInitialBlockCount - 1; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - writer.Close(); - - // Reset the existing writer and resume the upload. - writer = AsyncWriter(); - - auto object_metadata = client.GetObjectMetadata(bucket_name(), object_name); - ASSERT_STATUS_OK(object_metadata); - auto m = *object_metadata; - auto generation = m.generation(); - - w = async - .ResumeAppendableObjectUpload(BucketName(bucket_name()), object_name, - generation) - .get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - auto const persisted = writer.PersistedState(); - ASSERT_THAT(persisted, VariantWith(Le(kDesiredSize))); - // Cast to `int` because otherwise we need to write multiple casts below. - auto offset = static_cast(absl::get(persisted)); - if (offset % kBlockSize != 0) { - auto s = block.substr(offset % kBlockSize); - auto const size = s.size(); - auto p = writer.Write(std::move(token), WritePayload(std::move(s))).get(); - ASSERT_STATUS_OK(p); - offset += static_cast(size); - token = *std::move(p); - } - while (offset < kDesiredSize) { - auto const n = std::min(kBlockSize, kDesiredSize - offset); - auto p = - writer.Write(std::move(token), WritePayload(block.substr(0, n))).get(); - ASSERT_STATUS_OK(p); - offset += n; - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kDesiredSize); -} - -TEST_F(AsyncClientIntegrationTest, ResumeFinalizedAppendableObjectUpload) { - // Skipping the test till we get the takeover feature on testbench. - GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = static_cast(256 * 1024); - auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockSize); - - auto object_metadata = client.GetObjectMetadata(bucket_name(), object_name); - ASSERT_STATUS_OK(object_metadata); - auto m = *object_metadata; - auto generation = m.generation(); - - w = async - .ResumeAppendableObjectUpload(BucketName(bucket_name()), object_name, - generation) - .get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - EXPECT_FALSE(token.valid()); - EXPECT_THAT(writer.PersistedState(), VariantWith( - IsProtoEqual(*metadata))); -} - -TEST_F(AsyncClientIntegrationTest, ExplicitFlushAppendableObjectUpload) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = static_cast(256 * 1024); + + // 1. Get the reader and token for the specified range + // MODIFIED: Call Read() directly on the descriptor object + std::tie(r, t) = descriptor->Read(offset, limit); + + // 2. Consume the entire stream for this range + while (t.valid()) { + auto read = r.Read(std::move(t)).get(); + + // ASSERT_STATUS_OK will flag the test as failed and abort this + // thread if the status is not OK. + ASSERT_STATUS_OK(read); + + ReadPayload p; + AsyncToken t_new; + std::tie(p, t_new) = *std::move(read); + t = std::move(t_new); + + // In this test, we are discarding the payload `p`, just as + // the original single-threaded loop did. + } +} + + +TEST_F(AsyncClientIntegrationTest, StartAppendableUploadEmpty) { + auto project_id = "bajajnehaa-devrel-test"; + // auto const kproject = google::cloud::Project(project_id); + + auto client = MakeGrpcClient(project_id); + + auto bucket_name = std::string{"gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"}; + auto object_name = "vaibhav-test-file-113"; + auto placement = gcs::BucketCustomPlacementConfig{{"us-west4-a"}}; + // auto hns = gcs::BucketHierarchicalNamespace{true}; + auto ubla = gcs::BucketIamConfiguration{gcs::UniformBucketLevelAccess{true, {}}, absl::nullopt}; + + auto constexpr kBlockSize = 1024*1024; + auto constexpr kBlockCount = 1000; auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - - // Explicitly flush the data. - auto flush_status = writer.Flush().get(); - EXPECT_STATUS_OK(flush_status); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, Open) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto create = client.CreateBucket( - bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - - auto constexpr kSize = 8 * 1024; - auto constexpr kStride = 2 * kSize; - auto constexpr kBlockCount = 4; - auto const block = MakeRandomData(kSize); - - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); + auto const block2 = MakeRandomData(kBlockSize); + + auto async = MakeAsyncClient(project_id); + + // auto w = async.StartAppendableObjectUpload(BucketName(bucket_name), object_name) + // .get(); + // ASSERT_STATUS_OK(w); + + // AsyncWriter writer; + // AsyncToken token; + // std::tie(writer, token) = *std::move(w); + // for (int i = 0; i < kBlockCount; ++i) { + // std::cout << "Writing data iteration #" << i << std::endl; + // auto p = writer.Write(std::move(token), WritePayload(block)).get(); + // ASSERT_STATUS_OK(p); + // token = *std::move(p); + // } + + // auto state = writer.PersistedState(); + // ASSERT_TRUE(absl::holds_alternative(state)); + // auto generation = absl::get(state).generation(); + // std::cout << "Initial generation: " << generation << std::endl; + + auto w1 = async.ResumeAppendableObjectUpload(BucketName(bucket_name), object_name, 1779125190298918) + .get(); + + ASSERT_STATUS_OK(w1); + + AsyncWriter writer1; + AsyncToken token1; + std::tie(writer1, token1) = *std::move(w1); + + for (int i = 0; i < kBlockCount; ++i) { + std::cout << "Writing resumed data iteration #" << i << std::endl; + auto p = writer1.Write(std::move(token1), WritePayload(block)).get(); ASSERT_STATUS_OK(p); - token = *std::move(p); + token1 = *std::move(p); } - auto metadata = writer.Finalize(std::move(token)).get(); + auto metadata = writer1.Finalize(std::move(token1)).get(); ASSERT_STATUS_OK(metadata); - - auto spec = google::storage::v2::BidiReadObjectSpec{}; - spec.set_bucket(BucketName(bucket_name()).FullName()); - spec.set_object(object_name); - auto descriptor = async.Open(spec).get(); - ASSERT_STATUS_OK(descriptor); - - AsyncReader r0; - AsyncToken t0; - auto actual0 = std::string{}; - std::tie(r0, t0) = descriptor->Read(0 * kStride, kSize); - while (t0.valid()) { - auto read = r0.Read(std::move(t0)).get(); - ASSERT_STATUS_OK(read); - ReadPayload p; - AsyncToken t; - std::tie(p, t) = *std::move(read); - for (auto sv : p.contents()) actual0 += std::string(sv); - t0 = std::move(t); - } - - EXPECT_EQ(actual0.size(), kSize); - client.DeleteObject(bucket_name(), object_name, - storage::Generation(metadata->generation())); + // ScheduleForDelete(*metadata); + + std::cout << "Finalized Object metadata: " << metadata->DebugString() << std::endl; + + EXPECT_EQ(1,2); + +// auto spec = google::storage::v2::BidiReadObjectSpec{}; +// // std::cout << object_metadata->bucket() << "\n"; + +// // spec.set_bucket("projects/_/buckets/gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"); +// // spec.set_object(object_name); +// // auto descriptor_status = async.Open(spec).get(); +// // ASSERT_STATUS_OK(descriptor_status); +// // ObjectDescriptor descriptor = *std::move(descriptor_status); +// // auto descriptor_ptr = +// // std::make_shared(std::move(descriptor)); +// // std::shared_ptr descriptor_ptr = std::make_shared(*descriptor); + +// // --- Start of ThreadPool implementation --- + +// // 1. Initialize the ThreadPool +// // Use hardware_concurrency to get a reasonable number of threads +// size_t num_threads = std::thread::hardware_concurrency(); +// std::cout << "Starting ThreadPool with " << num_threads << " threads." << std::endl; +// ThreadPool pool(num_threads); + +// // 2. Define read parameters and storage for futures +// std::vector> futures; +// int num_reads = 1000; +// std::int64_t read_offset = 0; +// std::int64_t read_limit = 1024 * 1024 * 1024; // 1 GiB + +// std::cout << "Enqueuing " << num_reads << " read tasks..." << std::endl; + +// // 3. Enqueue all the read tasks +// // The original loop is replaced with this loop. +// for (int i = 0; i < num_reads; ++i) { +// // Pass *descriptor (the ObjectDescriptor) by value. +// // This is safe because it's a copyable wrapper. +// futures.push_back( +// pool.enqueue(ReadRangeTask, descriptor_ptr, read_offset, read_limit) +// ); +// } + +// // 4. Wait for all enqueued tasks to complete +// std::cout << "Waiting for all " << futures.size() << " read tasks to complete..." << std::endl; +// for (auto& f : futures) { +// f.get(); // This blocks until the future is ready. +// // If a task failed (e.g., via ASSERT_STATUS_OK), +// // gtest will have already flagged the failure. +// // If the task threw an exception, get() will re-throw it. +// } + +// std::cout << "All " << num_reads << " parallel read tasks completed." << std::endl; + + // --- End of ThreadPool implementation --- + + // auto actual0 = std::string{}; + // for(int i =0 ; i< 1000 ; i++){ + // std::tie(r0, t0) = descriptor->Read(0 , 1024* 1024* 1024); + // actual0 = std::string{}; + // while (t0.valid()) { + // auto read = r0.Read(std::move(t0)).get(); + // ASSERT_STATUS_OK(read); + // ReadPayload p; + // AsyncToken t; + // std::tie(p, t) = *std::move(read); + // t0 = std::move(t); + // } + // } + +// auto ans = block + block + block; + EXPECT_EQ(1,2); } -TEST_F(AsyncClientIntegrationTest, OpenExceedMaximumRange) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = - AsyncClient(TestOptions().set(1024)); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto create = client.CreateBucket( - bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - - auto constexpr kSize = 2048; - auto const block = MakeRandomData(kSize); - - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - - auto spec = google::storage::v2::BidiReadObjectSpec{}; - spec.set_bucket(BucketName(bucket_name()).FullName()); - spec.set_object(object_name); - auto descriptor = async.Open(spec).get(); - ASSERT_STATUS_OK(descriptor); - - AsyncReader r0; - AsyncToken t0; - auto actual0 = std::string{}; - std::tie(r0, t0) = descriptor->Read(0, kSize); - while (t0.valid()) { - auto read = r0.Read(std::move(t0)).get(); - ASSERT_STATUS_OK(read); - ReadPayload p; - AsyncToken t; - std::tie(p, t) = *std::move(read); - for (auto sv : p.contents()) actual0 += std::string(sv); - t0 = std::move(t); - } - - EXPECT_EQ(actual0.size(), kSize); - client.DeleteObject(bucket_name(), object_name, - storage::Generation(metadata->generation())); -} } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END @@ -1065,4 +367,4 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace cloud } // namespace google -#endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC +#endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC \ No newline at end of file From f2e5f17af9f52bdea55f277f20ce06d7a694ee29 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Thu, 21 May 2026 08:59:01 +0000 Subject: [PATCH 2/5] add fastbyte checksum --- .../storage/internal/async/connection_impl.cc | 43 ++--------------- .../storage/internal/async/partial_upload.cc | 1 - .../internal/async/writer_connection_impl.cc | 19 +++++++- .../tests/async_client_integration_test.cc | 46 ++++++++----------- 4 files changed, 43 insertions(+), 66 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index eed682c5d7517..51c5633106508 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -13,7 +13,6 @@ // limitations under the License. #include "google/cloud/storage/internal/async/connection_impl.h" -#include #include "google/cloud/storage/async/idempotency_policy.h" #include "google/cloud/storage/async/options.h" #include "google/cloud/storage/async/read_all.h" @@ -403,7 +402,7 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { auto pending = factory(std::move(request)); return pending.then( [current, request = std::move(p.request), persisted_size, - fa = std::move(factory), stub = stub_](auto f) mutable + fa = std::move(factory)](auto f) mutable -> StatusOr> { auto rpc = f.get(); if (!rpc) return std::move(rpc).status(); @@ -412,7 +411,6 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { std::unique_ptr impl; if (rpc->first_response.has_resource()) { - std::cout << "connection_impl: First response has resource" << std::endl; auto const& resource = rpc->first_response.resource(); if (current->get() && resource.has_checksums()) { @@ -425,42 +423,11 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { current, request, std::move(rpc->stream), hash, resource, false); } else { - std::cout << "connection_impl: First response has persisted_size: " << rpc->first_response.persisted_size() << std::endl; persisted_size = rpc->first_response.persisted_size(); - - // Fallback: Fetch metadata synchronously - google::storage::v2::GetObjectRequest get_request; - if (request.has_append_object_spec()) { - get_request.set_bucket(request.append_object_spec().bucket()); - get_request.set_object(request.append_object_spec().object()); - get_request.set_generation(request.append_object_spec().generation()); - } else if (request.has_write_object_spec()) { - get_request.set_bucket(request.write_object_spec().resource().bucket()); - get_request.set_object(request.write_object_spec().resource().name()); - } - - grpc::ClientContext context; - auto get_response = stub->GetObject(context, *current, get_request); - if (get_response.ok()) { - std::cout << "connection_impl: Fetched metadata successfully!" << std::endl; - auto const& resource = *get_response; - if (current->get() && - resource.has_checksums()) { - hash = std::make_shared( - resource.checksums().crc32c(), resource.size()); - } else { - hash = CreateHashFunction(*current); - } - impl = std::make_unique( - current, request, std::move(rpc->stream), hash, - resource, false); - } else { - std::cout << "connection_impl: Failed to fetch metadata: " << get_response.status().message() << std::endl; - hash = CreateHashFunction(*current); - impl = std::make_unique( - current, request, std::move(rpc->stream), hash, persisted_size, - false); - } + hash = CreateHashFunction(*current); + impl = std::make_unique( + current, request, std::move(rpc->stream), hash, persisted_size, + false); } return MakeWriterConnectionResumed(std::move(fa), std::move(impl), std::move(request), std::move(hash), diff --git a/google/cloud/storage/internal/async/partial_upload.cc b/google/cloud/storage/internal/async/partial_upload.cc index 3961d548c7bbc..154fc6aca49dd 100644 --- a/google/cloud/storage/internal/async/partial_upload.cc +++ b/google/cloud/storage/internal/async/partial_upload.cc @@ -53,7 +53,6 @@ void PartialUpload::Write() { if (last_message) { if (action_ == LastMessageAction::kFinalizeWithChecksum) { auto status = Finalize(request_, wopt, *hash_function_); - std::cout << "Finalize status: " << status.message() << std::endl; if (!status.ok()) return WriteError(std::move(status)); } else if (action_ == LastMessageAction::kFinalize) { request_.set_finish_write(true); diff --git a/google/cloud/storage/internal/async/writer_connection_impl.cc b/google/cloud/storage/internal/async/writer_connection_impl.cc index 5e8ba83722a8a..427b4bdf63a7a 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl.cc @@ -145,7 +145,6 @@ AsyncWriterConnectionImpl::Finalize(storage::WritePayload payload) { action = PartialUpload::kFinalize; } } - std::cout << "Action: " << action << std::endl; auto coro = PartialUpload::Call(impl_, hash_function_, std::move(write), std::move(p), std::move(action)); return coro->Start().then([coro, size, this](auto f) mutable { @@ -260,7 +259,25 @@ future> AsyncWriterConnectionImpl::OnQuery( latest_write_handle_ = response->write_handle(); } if (response->has_persisted_size()) { + absl::optional old_obj; + if (absl::holds_alternative(persisted_state_)) { + old_obj = absl::get(persisted_state_); + } + persisted_state_ = response->persisted_size(); + + if (response->has_persisted_data_checksums()) { + auto const& checksums = response->persisted_data_checksums(); + if (checksums.has_crc32c()) { + google::storage::v2::Object obj; + obj.set_size(response->persisted_size()); + *obj.mutable_checksums() = checksums; + if (old_obj) { + obj.set_generation(old_obj->generation()); + } + persisted_state_ = obj; + } + } return make_ready_future(make_status_or(response->persisted_size())); } if (response->has_resource()) { diff --git a/google/cloud/storage/tests/async_client_integration_test.cc b/google/cloud/storage/tests/async_client_integration_test.cc index f058b74daa7be..2df85b6124828 100644 --- a/google/cloud/storage/tests/async_client_integration_test.cc +++ b/google/cloud/storage/tests/async_client_integration_test.cc @@ -236,38 +236,36 @@ TEST_F(AsyncClientIntegrationTest, StartAppendableUploadEmpty) { auto client = MakeGrpcClient(project_id); auto bucket_name = std::string{"gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"}; - auto object_name = "vaibhav-test-file-113"; + auto object_name = "vaibhav-test-file-117"; auto placement = gcs::BucketCustomPlacementConfig{{"us-west4-a"}}; // auto hns = gcs::BucketHierarchicalNamespace{true}; auto ubla = gcs::BucketIamConfiguration{gcs::UniformBucketLevelAccess{true, {}}, absl::nullopt}; auto constexpr kBlockSize = 1024*1024; - auto constexpr kBlockCount = 1000; + auto constexpr kBlockCount = 10; auto const block = MakeRandomData(kBlockSize); auto const block2 = MakeRandomData(kBlockSize); auto async = MakeAsyncClient(project_id); - // auto w = async.StartAppendableObjectUpload(BucketName(bucket_name), object_name) - // .get(); - // ASSERT_STATUS_OK(w); - - // AsyncWriter writer; - // AsyncToken token; - // std::tie(writer, token) = *std::move(w); - // for (int i = 0; i < kBlockCount; ++i) { - // std::cout << "Writing data iteration #" << i << std::endl; - // auto p = writer.Write(std::move(token), WritePayload(block)).get(); - // ASSERT_STATUS_OK(p); - // token = *std::move(p); - // } + auto w = async.StartAppendableObjectUpload(BucketName(bucket_name), object_name) + .get(); + ASSERT_STATUS_OK(w); - // auto state = writer.PersistedState(); - // ASSERT_TRUE(absl::holds_alternative(state)); - // auto generation = absl::get(state).generation(); - // std::cout << "Initial generation: " << generation << std::endl; + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + for (int i = 0; i < kBlockCount; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + auto state = writer.PersistedState(); + ASSERT_TRUE(absl::holds_alternative(state)); + auto generation = absl::get(state).generation(); - auto w1 = async.ResumeAppendableObjectUpload(BucketName(bucket_name), object_name, 1779125190298918) + auto w1 = async.ResumeAppendableObjectUpload(BucketName(bucket_name), object_name, generation) .get(); ASSERT_STATUS_OK(w1); @@ -277,7 +275,6 @@ TEST_F(AsyncClientIntegrationTest, StartAppendableUploadEmpty) { std::tie(writer1, token1) = *std::move(w1); for (int i = 0; i < kBlockCount; ++i) { - std::cout << "Writing resumed data iteration #" << i << std::endl; auto p = writer1.Write(std::move(token1), WritePayload(block)).get(); ASSERT_STATUS_OK(p); token1 = *std::move(p); @@ -285,11 +282,9 @@ TEST_F(AsyncClientIntegrationTest, StartAppendableUploadEmpty) { auto metadata = writer1.Finalize(std::move(token1)).get(); ASSERT_STATUS_OK(metadata); - // ScheduleForDelete(*metadata); - - std::cout << "Finalized Object metadata: " << metadata->DebugString() << std::endl; + ScheduleForDelete(*metadata); - EXPECT_EQ(1,2); + EXPECT_EQ(metadata->size(), 2 * kBlockCount * kBlockSize); // auto spec = google::storage::v2::BidiReadObjectSpec{}; // // std::cout << object_metadata->bucket() << "\n"; @@ -357,7 +352,6 @@ TEST_F(AsyncClientIntegrationTest, StartAppendableUploadEmpty) { // } // auto ans = block + block + block; - EXPECT_EQ(1,2); } From 242745c18c83608a38a83984d07a723f877e8820 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Thu, 21 May 2026 09:04:38 +0000 Subject: [PATCH 3/5] revert async client integration test local change --- .../tests/async_client_integration_test.cc | 1222 +++++++++++++---- 1 file changed, 963 insertions(+), 259 deletions(-) diff --git a/google/cloud/storage/tests/async_client_integration_test.cc b/google/cloud/storage/tests/async_client_integration_test.cc index 2df85b6124828..43d989719e5a1 100644 --- a/google/cloud/storage/tests/async_client_integration_test.cc +++ b/google/cloud/storage/tests/async_client_integration_test.cc @@ -17,8 +17,8 @@ #include "google/cloud/storage/async/bucket_name.h" #include "google/cloud/storage/async/client.h" #include "google/cloud/storage/async/idempotency_policy.h" +#include "google/cloud/storage/async/options.h" #include "google/cloud/storage/async/read_all.h" -#include "google/cloud/opentelemetry_options.h" #include "google/cloud/storage/grpc_plugin.h" #include "google/cloud/storage/testing/storage_integration_test.h" #include "google/cloud/grpc_options.h" @@ -32,8 +32,6 @@ #include #include #include -#include -#include #include namespace google { @@ -54,7 +52,15 @@ using ::testing::VariantWith; class AsyncClientIntegrationTest : public google::cloud::storage::testing::StorageIntegrationTest { -protected: + protected: + void SetUp() override { + bucket_name_ = + GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME").value_or(""); + ASSERT_THAT(bucket_name_, Not(IsEmpty())) + << "GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME is not set"; + } + + std::string const& bucket_name() const { return bucket_name_; } using google::cloud::storage::testing::StorageIntegrationTest:: ScheduleForDelete; @@ -65,295 +71,993 @@ class AsyncClientIntegrationTest .set_generation(object.generation())); } -private: + private: std::string bucket_name_; }; -namespace gcs = ::google::cloud::storage; +auto TestOptions() { + // Disable metrics in the test, they just make the logs harder to grok. + return Options{} + .set(false) + .set(1) + .set(TracingOptions().SetOptions( + "truncate_string_field_longer_than=2048")); +} + +auto AlwaysRetry() { + return TestOptions().set( + MakeAlwaysRetryAsyncIdempotencyPolicy); +} + +TEST_F(AsyncClientIntegrationTest, ObjectCRUD) { + auto async = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto insert = async + .InsertObject(BucketName(bucket_name()), object_name, + LoremIpsum(), AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); + + auto full0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, + LoremIpsum().size()); + auto full1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, + LoremIpsum().size()); + auto partial0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, + 2, LoremIpsum().size()); + auto partial1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, + 2, LoremIpsum().size()); + + for (auto* p : {&full1, &full0}) { + auto response = p->get(); + ASSERT_STATUS_OK(response); + auto contents = response->contents(); + auto const full = std::accumulate(contents.begin(), contents.end(), + std::string{}, [](auto a, auto b) { + a += std::string(b); + return a; + }); + EXPECT_EQ(full, LoremIpsum()); + } + for (auto* p : {&partial1, &partial0}) { + auto response = p->get(); + ASSERT_STATUS_OK(response); + auto contents = response->contents(); + auto const partial = std::accumulate(contents.begin(), contents.end(), + std::string{}, [](auto a, auto b) { + a += std::string(b); + return a; + }); + EXPECT_EQ(partial, LoremIpsum().substr(2)); + } + + auto status = async + .DeleteObject(BucketName(bucket_name()), object_name, + insert->generation()) + .get(); + EXPECT_STATUS_OK(status); + + auto request = google::storage::v2::ReadObjectRequest{}; + request.set_bucket(insert->bucket()); + request.set_object(insert->name()); + request.set_generation(insert->generation()); + auto head = async.ReadObjectRange(request, /*offset=*/0, /*limit=*/1).get(); + EXPECT_THAT(head, StatusIs(StatusCode::kNotFound)); +} -// auto AlwaysRetry() { -// return google::cloud::Options{}.set( -// MakeAlwaysRetryIdempotencyPolicy); -// } +TEST_F(AsyncClientIntegrationTest, ComposeObject) { + auto async = AsyncClient(TestOptions()); + auto o1 = MakeRandomObjectName(); + auto o2 = MakeRandomObjectName(); + auto destination = MakeRandomObjectName(); -google::cloud::Options MakeOptions(google::cloud::Options opts) { - auto fallback = google::cloud::Options{}; - if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_GRPC_ENDPOINT")) { - fallback.set(*v); + auto insert1 = async.InsertObject(BucketName(bucket_name()), o1, LoremIpsum(), + AlwaysRetry()); + auto insert2 = async.InsertObject(BucketName(bucket_name()), o2, LoremIpsum(), + AlwaysRetry()); + std::vector> inserted{insert1.get(), + insert2.get()}; + for (auto const& insert : inserted) { + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); } - if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_JSON_ENDPOINT")) { - fallback.set(*v); + std::vector sources; + std::transform(inserted.begin(), inserted.end(), std::back_inserter(sources), + [](auto const& o) { + google::storage::v2::ComposeObjectRequest::SourceObject r; + r.set_name(o->name()); + r.set_generation(o->generation()); + return r; + }); + auto pending = async.ComposeObject(BucketName(bucket_name()), destination, + std::move(sources)); + auto const composed = pending.get(); + EXPECT_STATUS_OK(composed); + ScheduleForDelete(*composed); + + auto read = async + .ReadObjectRange(BucketName(bucket_name()), destination, 0, + 2 * LoremIpsum().size()) + .get(); + ASSERT_STATUS_OK(read); + auto contents = read->contents(); + auto const full_contents = std::accumulate(contents.begin(), contents.end(), + std::string{}, [](auto a, auto b) { + a += std::string(b); + return a; + }); + EXPECT_EQ(full_contents, LoremIpsum() + LoremIpsum()); + EXPECT_THAT(read->metadata(), Optional(IsProtoEqual(*composed))); +} + +TEST_F(AsyncClientIntegrationTest, StreamingRead) { + auto async = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a relatively large object so the streaming read makes sense. We + // aim for something around 5MiB, enough for 3 `Read()` calls. + auto constexpr kLineSize = 64; + auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; + auto const block = MakeRandomData(kLineSize); + std::vector insert_data(kLineCount); + std::generate(insert_data.begin(), insert_data.end(), [&, n = 0]() mutable { + return std::to_string(++n) + ": " + block; + }); + auto const expected_size = std::accumulate( + insert_data.begin(), insert_data.end(), static_cast(0), + [](auto a, auto const& b) { return a + b.size(); }); + + auto insert = async + .InsertObject(BucketName(bucket_name()), object_name, + insert_data, AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); + + ASSERT_EQ(insert->size(), expected_size); + + auto r = async.ReadObject(BucketName(bucket_name()), object_name).get(); + ASSERT_STATUS_OK(r); + AsyncReader reader; + AsyncToken token; + std::tie(reader, token) = *std::move(r); + + std::string actual; + while (token.valid()) { + auto p = reader.Read(std::move(token)).get(); + ASSERT_STATUS_OK(p); + ReadPayload payload; + std::tie(payload, token) = *std::move(p); + for (auto v : payload.contents()) actual += std::string(v); } - if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_AUTHORITY")) { - fallback.set(*v); + EXPECT_EQ(actual.size(), expected_size); + auto view = absl::string_view(actual); + for (auto const& expected : insert_data) { + ASSERT_GE(view.size(), expected.size()); + ASSERT_EQ(expected, view.substr(0, expected.size())); + view.remove_prefix(expected.size()); } - if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_TARGET_API_VERSION")) { - fallback.set(*v); + EXPECT_EQ(view, absl::string_view{}); +} + +TEST_F(AsyncClientIntegrationTest, StreamingReadRange) { + auto async = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a relatively large object so the streaming read makes sense. We + // aim for something around 5MiB, enough for 3 `Read()` calls. + auto constexpr kLineSize = 64; + auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; + auto constexpr kReadOffset = kLineCount * kLineSize / 2; + auto const block = MakeRandomData(kLineSize - 1) + "\n"; + std::string contents; + for (int i = 0; i != kLineCount; ++i) contents += block; + auto const expected_insert_size = contents.size(); + + auto insert = async + .InsertObject(BucketName(bucket_name()), object_name, + contents, AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); + + ASSERT_EQ(insert->size(), expected_insert_size); + + auto request = google::storage::v2::ReadObjectRequest{}; + request.set_bucket(insert->bucket()); + request.set_object(insert->name()); + request.set_generation(insert->generation()); + request.set_read_offset(kReadOffset); + auto r = async.ReadObject(request).get(); + ASSERT_STATUS_OK(r); + AsyncReader reader; + AsyncToken token; + std::tie(reader, token) = *std::move(r); + + std::string actual; + while (token.valid()) { + auto p = reader.Read(std::move(token)).get(); + ASSERT_STATUS_OK(p); + ReadPayload payload; + std::tie(payload, token) = *std::move(p); + for (auto v : payload.contents()) actual += std::string(v); } - fallback.set(false); - return google::cloud::internal::MergeOptions(std::move(opts), fallback); -} - - -google::cloud::storage::Client MakeGrpcClient(std::string project_id) { - auto options = MakeOptions(google::cloud::Options{} - .set(project_id)); - return google::cloud::storage::MakeGrpcClient(std::move(options)); -} - -google::cloud::storage::AsyncClient MakeAsyncClient(std::string project_id) { - auto options = MakeOptions(google::cloud::Options{} - .set(project_id) - .set({"rpc"}) - .set(true)); - return google::cloud::storage::AsyncClient(options); -} - -class ThreadPool { -public: - // Constructor initializes the thread pool with a given number of worker threads. - ThreadPool(size_t threads) : stop_(false) { - if (threads == 0) { - throw std::invalid_argument("Thread count cannot be zero."); - } - for (size_t i = 0; i < threads; ++i) { - workers_.emplace_back([this] { - while (true) { - std::function task; - { - // Acquire a lock on the task queue. - std::unique_lock lock(this->queue_mutex_); - - // Wait for a task to be available or for the pool to stop. - this->condition_.wait(lock, [this] { - return this->stop_ || !this->tasks_.empty(); - }); - - // If the pool is stopping and no tasks are left, exit the thread. - if (this->stop_ && this->tasks_.empty()) { - return; - } - - // Get the next task from the queue. - task = std::move(this->tasks_.front()); - this->tasks_.pop(); - } - - // Execute the task. - task(); - } - }); - } - } - - // Adds a new task to the thread pool. - template - auto enqueue(F&& f, Args&&... args) -> std::future::type> { - using return_type = typename std::result_of::type; - - // Create a packaged_task to wrap the function and its arguments. - auto task = std::make_shared>( - std::bind(std::forward(f), std::forward(args)...) - ); - - std::future res = task->get_future(); - { - // Acquire a lock on the queue and push the task. - std::unique_lock lock(queue_mutex_); - - // Don't allow enqueueing after stopping the pool. - if (stop_) { - throw std::runtime_error("enqueue on stopped ThreadPool"); - } - - tasks_.emplace([task]() { (*task)(); }); - } - - // Notify one waiting thread that a new task is available. - condition_.notify_one(); - return res; - } - - // Destructor stops all worker threads and joins them. - ~ThreadPool() { - { - std::unique_lock lock(queue_mutex_); - stop_ = true; - } - - // Notify all threads so they can wake up and exit their loops. - condition_.notify_all(); - - for (std::thread& worker : workers_) { - worker.join(); - } - } - -private: - std::vector workers_; - std::queue> tasks_; - - std::mutex queue_mutex_; - std::condition_variable condition_; - - bool stop_; -}; + EXPECT_EQ(absl::string_view(actual), + absl::string_view(contents).substr(kReadOffset)); +} -void ReadRangeTask(std::shared_ptr descriptor, - std::int64_t& offset, - std::int64_t& limit) { - AsyncReader r; - AsyncToken t; - - // 1. Get the reader and token for the specified range - // MODIFIED: Call Read() directly on the descriptor object - std::tie(r, t) = descriptor->Read(offset, limit); - - // 2. Consume the entire stream for this range - while (t.valid()) { - auto read = r.Read(std::move(t)).get(); - - // ASSERT_STATUS_OK will flag the test as failed and abort this - // thread if the status is not OK. - ASSERT_STATUS_OK(read); - - ReadPayload p; - AsyncToken t_new; - std::tie(p, t_new) = *std::move(read); - t = std::move(t_new); - - // In this test, we are discarding the payload `p`, just as - // the original single-threaded loop did. - } -} - - -TEST_F(AsyncClientIntegrationTest, StartAppendableUploadEmpty) { - auto project_id = "bajajnehaa-devrel-test"; - // auto const kproject = google::cloud::Project(project_id); - - auto client = MakeGrpcClient(project_id); - - auto bucket_name = std::string{"gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"}; - auto object_name = "vaibhav-test-file-117"; - auto placement = gcs::BucketCustomPlacementConfig{{"us-west4-a"}}; - // auto hns = gcs::BucketHierarchicalNamespace{true}; - auto ubla = gcs::BucketIamConfiguration{gcs::UniformBucketLevelAccess{true, {}}, absl::nullopt}; - - auto constexpr kBlockSize = 1024*1024; - auto constexpr kBlockCount = 10; +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadEmpty) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), 0); +} + +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadMultiple) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kBlockCount = 16; + auto const block = MakeRandomData(kBlockSize); + + auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + for (int i = 0; i != kBlockCount; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); +} + +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResume) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kInitialBlockCount = 4; + auto constexpr kTotalBlockCount = 4 + kInitialBlockCount; + auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; + auto const block = MakeRandomData(kBlockSize); + + auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto const upload_id = writer.UploadId(); + for (int i = 0; i != kInitialBlockCount - 1; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + // Reset the existing writer and resume the upload. + writer = AsyncWriter(); + w = client.ResumeUnbufferedUpload(upload_id).get(); + ASSERT_STATUS_OK(w); + std::tie(writer, token) = *std::move(w); + ASSERT_EQ(writer.UploadId(), upload_id); + auto const persisted = writer.PersistedState(); + // We don't expect this to be larger that the total size of the object. + // Incidentally, this shows the value fits into an `int`. + ASSERT_THAT(persisted, VariantWith(Le(kDesiredSize))); + // Cast to `int` because otherwise we need to write multiple casts below. + auto offset = static_cast(absl::get(persisted)); + if (offset % kBlockSize != 0) { + auto s = block.substr(offset % kBlockSize); + auto const size = s.size(); + auto p = writer.Write(std::move(token), WritePayload(std::move(s))).get(); + ASSERT_STATUS_OK(p); + offset += static_cast(size); + token = *std::move(p); + } + while (offset < kDesiredSize) { + auto const n = std::min(kBlockSize, kDesiredSize - offset); + auto p = + writer.Write(std::move(token), WritePayload(block.substr(0, n))).get(); + ASSERT_STATUS_OK(p); + offset += n; + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kDesiredSize); +} + +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResumeFinalized) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = static_cast(256 * 1024); + auto const block = MakeRandomData(kBlockSize); + + auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto const upload_id = writer.UploadId(); + auto metadata = writer.Finalize(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockSize); + + w = client.ResumeUnbufferedUpload(upload_id).get(); + ASSERT_STATUS_OK(w); + std::tie(writer, token) = *std::move(w); + EXPECT_FALSE(token.valid()); + EXPECT_THAT(writer.PersistedState(), VariantWith( + IsProtoEqual(*metadata))); +} + +TEST_F(AsyncClientIntegrationTest, StartBufferedUploadEmpty) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto w = + client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), 0); +} + +TEST_F(AsyncClientIntegrationTest, StartBufferedUploadMultiple) { + auto client = AsyncClient(TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kBlockCount = 16; auto const block = MakeRandomData(kBlockSize); - auto const block2 = MakeRandomData(kBlockSize); - auto async = MakeAsyncClient(project_id); - - auto w = async.StartAppendableObjectUpload(BucketName(bucket_name), object_name) - .get(); + auto w = + client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + for (int i = 0; i != kBlockCount; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); +} + +TEST_F(AsyncClientIntegrationTest, RewriteObject) { + auto async = AsyncClient(TestOptions()); + auto o1 = MakeRandomObjectName(); + auto o2 = MakeRandomObjectName(); + + auto constexpr kBlockSize = 4 * 1024 * 1024; + auto insert = async + .InsertObject(BucketName(bucket_name()), o1, + MakeRandomData(kBlockSize), AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(insert); + ScheduleForDelete(*insert); + + // Start a rewrite, but limit each iteration to a small number of bytes, to + // force multiple calls. + google::storage::v2::Object metadata; + AsyncRewriter rewriter; + AsyncToken token; + google::storage::v2::RewriteObjectRequest request; + request.set_destination_name(o2); + request.set_destination_bucket(BucketName(bucket_name()).FullName()); + request.set_source_object(o1); + request.set_source_bucket(BucketName(bucket_name()).FullName()); + request.set_max_bytes_rewritten_per_call(1024 * 1024); + std::tie(rewriter, token) = async.StartRewrite(std::move(request)); + while (token.valid()) { + auto rt = rewriter.Iterate(std::move(token)).get(); + ASSERT_STATUS_OK(rt); + google::storage::v2::RewriteResponse response; + AsyncToken t; + std::tie(response, t) = *std::move(rt); + token = std::move(t); + if (!response.has_resource()) continue; + metadata = response.resource(); + ScheduleForDelete(metadata); + EXPECT_FALSE(token.valid()); + } + EXPECT_EQ(metadata.name(), o2); + EXPECT_EQ(metadata.size(), insert->size()); +} + +TEST_F(AsyncClientIntegrationTest, RewriteObjectResume) { + auto async = AsyncClient(TestOptions()); + auto destination = + GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_DESTINATION_BUCKET_NAME"); + if (!destination || destination->empty()) GTEST_SKIP(); + + auto constexpr kBlockSize = 4 * 1024 * 1024; + auto source = + async + .InsertObject(BucketName(bucket_name()), MakeRandomObjectName(), + MakeRandomData(kBlockSize), AlwaysRetry()) + .get(); + ASSERT_STATUS_OK(source); + ScheduleForDelete(*source); + + // Start a rewrite, but limit each iteration to a small number of bytes, to + // force multiple calls. + AsyncRewriter rewriter; + AsyncToken token; + auto const expected_name = MakeRandomObjectName(); + google::storage::v2::RewriteObjectRequest start_request; + start_request.set_destination_name(expected_name); + start_request.set_destination_bucket(BucketName(*destination).FullName()); + start_request.set_source_object(source->name()); + start_request.set_source_bucket(source->bucket()); + start_request.set_max_bytes_rewritten_per_call(1024 * 1024); + std::tie(rewriter, token) = async.StartRewrite(start_request); + + auto rt = rewriter.Iterate(std::move(token)).get(); + ASSERT_STATUS_OK(rt); + google::storage::v2::RewriteResponse response; + AsyncToken t; + std::tie(response, t) = *std::move(rt); + + // We want to resume a partially completed resume. Verify the first rewrite + // did not complete things. + ASSERT_THAT(response.rewrite_token(), Not(IsEmpty())); + + google::storage::v2::RewriteObjectRequest resume_request; + resume_request.set_source_bucket(source->bucket()); + resume_request.set_source_object(source->name()); + resume_request.set_destination_bucket(BucketName(*destination).FullName()); + resume_request.set_destination_name(expected_name); + resume_request.set_max_bytes_rewritten_per_call(1024 * 1024); + std::tie(rewriter, token) = async.ResumeRewrite(std::move(resume_request)); + + google::storage::v2::Object metadata; + while (token.valid()) { + auto rt = rewriter.Iterate(std::move(token)).get(); + ASSERT_STATUS_OK(rt); + AsyncToken t; + std::tie(response, t) = *std::move(rt); + token = std::move(t); + if (!response.has_resource()) continue; + metadata = response.resource(); + ScheduleForDelete(metadata); + EXPECT_EQ(metadata.bucket(), BucketName(*destination).FullName()); + EXPECT_EQ(metadata.name(), expected_name); + EXPECT_EQ(metadata.size(), source->size()); + EXPECT_FALSE(token.valid()); + } +} + +TEST_F(AsyncClientIntegrationTest, InsertFailure) { + auto async = AsyncClient(TestOptions()); + + auto insert = async + .InsertObject(BucketName(MakeRandomBucketName()), + MakeRandomObjectName(), LoremIpsum()) + .get(); + ASSERT_THAT(insert, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ReadFailure) { + auto async = AsyncClient(TestOptions()); + + auto read = async + .ReadObject(BucketName(MakeRandomBucketName()), + MakeRandomObjectName()) + .get(); + // At the moment, only connectivity errors are detected before the first + // `Read()` call. Accept such failures too: + if (!read) return; + AsyncReader reader; + AsyncToken token; + std::tie(reader, token) = *std::move(read); + auto payload = ReadAll(std::move(reader), std::move(token)).get(); + ASSERT_THAT(payload, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ReadRangeFailure) { + auto async = AsyncClient(TestOptions()); + + auto payload = + async + .ReadObjectRange(BucketName(MakeRandomBucketName()), + MakeRandomObjectName(), /*offset=*/0, /*limit=*/1) + .get(); + ASSERT_THAT(payload, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, StartBufferedUploadFailure) { + auto async = AsyncClient(TestOptions()); + + auto writer = async + .StartBufferedUpload(BucketName(MakeRandomBucketName()), + MakeRandomObjectName()) + .get(); + ASSERT_THAT(writer, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ResumeBufferedUploadFailure) { + auto async = AsyncClient(TestOptions()); + + auto writer = async.ResumeBufferedUpload("test-only-invalid-upload-id").get(); + ASSERT_THAT(writer, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadFailure) { + auto async = AsyncClient(TestOptions()); + + auto writer = async + .StartUnbufferedUpload(BucketName(MakeRandomBucketName()), + MakeRandomObjectName()) + .get(); + ASSERT_THAT(writer, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ResumeUnbufferedUploadFailure) { + auto async = AsyncClient(TestOptions()); + + auto writer = + async.ResumeUnbufferedUpload("test-only-invalid-upload-id").get(); + ASSERT_THAT(writer, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ComposeObjectFailure) { + auto async = AsyncClient(TestOptions()); + + auto make_source = [](std::string name) { + auto source = google::storage::v2::ComposeObjectRequest::SourceObject{}; + source.set_name(std::move(name)); + return source; + }; + auto composed = + async + .ComposeObject(BucketName(bucket_name()), MakeRandomObjectName(), + {make_source(MakeRandomObjectName()), + make_source(MakeRandomObjectName())}) + .get(); + ASSERT_THAT(composed, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, DeleteObjectFailure) { + auto async = AsyncClient(TestOptions()); + + auto deleted = + async.DeleteObject(BucketName(bucket_name()), MakeRandomObjectName()) + .get(); + ASSERT_THAT(deleted, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, StartRewriteFailure) { + auto async = AsyncClient(TestOptions()); + + AsyncRewriter rewriter; + AsyncToken token; + std::tie(rewriter, token) = + async.StartRewrite(BucketName(bucket_name()), MakeRandomObjectName(), + BucketName(bucket_name()), MakeRandomObjectName()); + ASSERT_TRUE(token.valid()); + auto iteration = rewriter.Iterate(std::move(token)).get(); + ASSERT_THAT(iteration, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, ResumeRewriteFailure) { + auto async = AsyncClient(TestOptions()); + + AsyncRewriter rewriter; + AsyncToken token; + std::tie(rewriter, token) = + async.ResumeRewrite(BucketName(bucket_name()), MakeRandomObjectName(), + BucketName(bucket_name()), MakeRandomObjectName(), + "test-only-invalid-rewrite-token"); + ASSERT_TRUE(token.valid()); + auto iteration = rewriter.Iterate(std::move(token)).get(); + ASSERT_THAT(iteration, Not(IsOk())); +} + +TEST_F(AsyncClientIntegrationTest, StartAppendableObjectUploadEmpty) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), 0); +} + +TEST_F(AsyncClientIntegrationTest, StartAppendableObjectUploadMultiple) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kBlockCount = 16; + auto const block = MakeRandomData(kBlockSize); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); AsyncWriter writer; AsyncToken token; std::tie(writer, token) = *std::move(w); - for (int i = 0; i < kBlockCount; ++i) { + for (int i = 0; i != kBlockCount; ++i) { auto p = writer.Write(std::move(token), WritePayload(block)).get(); ASSERT_STATUS_OK(p); token = *std::move(p); } - auto state = writer.PersistedState(); - ASSERT_TRUE(absl::holds_alternative(state)); - auto generation = absl::get(state).generation(); + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); +} + +TEST_F(AsyncClientIntegrationTest, ResumeAppendableObjectUpload) { + // Skipping the test till we get the takeover feature on testbench. + GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = 256 * 1024; + auto constexpr kInitialBlockCount = 4; + auto constexpr kTotalBlockCount = 4 + kInitialBlockCount; + auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; + auto const block = MakeRandomData(kBlockSize); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + for (int i = 0; i != kInitialBlockCount - 1; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } - auto w1 = async.ResumeAppendableObjectUpload(BucketName(bucket_name), object_name, generation) - .get(); + writer.Close(); - ASSERT_STATUS_OK(w1); + // Reset the existing writer and resume the upload. + writer = AsyncWriter(); - AsyncWriter writer1; - AsyncToken token1; - std::tie(writer1, token1) = *std::move(w1); + auto object_metadata = client.GetObjectMetadata(bucket_name(), object_name); + ASSERT_STATUS_OK(object_metadata); + auto m = *object_metadata; + auto generation = m.generation(); - for (int i = 0; i < kBlockCount; ++i) { - auto p = writer1.Write(std::move(token1), WritePayload(block)).get(); + w = async + .ResumeAppendableObjectUpload(BucketName(bucket_name()), object_name, + generation) + .get(); + ASSERT_STATUS_OK(w); + std::tie(writer, token) = *std::move(w); + auto const persisted = writer.PersistedState(); + ASSERT_THAT(persisted, VariantWith(Le(kDesiredSize))); + // Cast to `int` because otherwise we need to write multiple casts below. + auto offset = static_cast(absl::get(persisted)); + if (offset % kBlockSize != 0) { + auto s = block.substr(offset % kBlockSize); + auto const size = s.size(); + auto p = writer.Write(std::move(token), WritePayload(std::move(s))).get(); + ASSERT_STATUS_OK(p); + offset += static_cast(size); + token = *std::move(p); + } + while (offset < kDesiredSize) { + auto const n = std::min(kBlockSize, kDesiredSize - offset); + auto p = + writer.Write(std::move(token), WritePayload(block.substr(0, n))).get(); ASSERT_STATUS_OK(p); - token1 = *std::move(p); + offset += n; + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kDesiredSize); +} + +TEST_F(AsyncClientIntegrationTest, ResumeFinalizedAppendableObjectUpload) { + // Skipping the test till we get the takeover feature on testbench. + GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = static_cast(256 * 1024); + auto const block = MakeRandomData(kBlockSize); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + + auto metadata = writer.Finalize(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(metadata); + ScheduleForDelete(*metadata); + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockSize); + + auto object_metadata = client.GetObjectMetadata(bucket_name(), object_name); + ASSERT_STATUS_OK(object_metadata); + auto m = *object_metadata; + auto generation = m.generation(); + + w = async + .ResumeAppendableObjectUpload(BucketName(bucket_name()), object_name, + generation) + .get(); + ASSERT_STATUS_OK(w); + std::tie(writer, token) = *std::move(w); + EXPECT_FALSE(token.valid()); + EXPECT_THAT(writer.PersistedState(), VariantWith( + IsProtoEqual(*metadata))); +} + +TEST_F(AsyncClientIntegrationTest, ExplicitFlushAppendableObjectUpload) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + // Create a small block to send over and over. + auto constexpr kBlockSize = static_cast(256 * 1024); + auto const block = MakeRandomData(kBlockSize); + + auto create = + client.CreateBucket(bucket_name(), storage::BucketMetadata{} + .set_location("us-west4") + .set_storage_class("RAPID")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); } + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); - auto metadata = writer1.Finalize(std::move(token1)).get(); + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + + // Explicitly flush the data. + auto flush_status = writer.Flush().get(); + EXPECT_STATUS_OK(flush_status); + + auto metadata = writer.Finalize(std::move(token)).get(); ASSERT_STATUS_OK(metadata); ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->size(), 2 * kBlockCount * kBlockSize); - -// auto spec = google::storage::v2::BidiReadObjectSpec{}; -// // std::cout << object_metadata->bucket() << "\n"; - -// // spec.set_bucket("projects/_/buckets/gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"); -// // spec.set_object(object_name); -// // auto descriptor_status = async.Open(spec).get(); -// // ASSERT_STATUS_OK(descriptor_status); -// // ObjectDescriptor descriptor = *std::move(descriptor_status); -// // auto descriptor_ptr = -// // std::make_shared(std::move(descriptor)); -// // std::shared_ptr descriptor_ptr = std::make_shared(*descriptor); - -// // --- Start of ThreadPool implementation --- - -// // 1. Initialize the ThreadPool -// // Use hardware_concurrency to get a reasonable number of threads -// size_t num_threads = std::thread::hardware_concurrency(); -// std::cout << "Starting ThreadPool with " << num_threads << " threads." << std::endl; -// ThreadPool pool(num_threads); - -// // 2. Define read parameters and storage for futures -// std::vector> futures; -// int num_reads = 1000; -// std::int64_t read_offset = 0; -// std::int64_t read_limit = 1024 * 1024 * 1024; // 1 GiB - -// std::cout << "Enqueuing " << num_reads << " read tasks..." << std::endl; - -// // 3. Enqueue all the read tasks -// // The original loop is replaced with this loop. -// for (int i = 0; i < num_reads; ++i) { -// // Pass *descriptor (the ObjectDescriptor) by value. -// // This is safe because it's a copyable wrapper. -// futures.push_back( -// pool.enqueue(ReadRangeTask, descriptor_ptr, read_offset, read_limit) -// ); -// } - -// // 4. Wait for all enqueued tasks to complete -// std::cout << "Waiting for all " << futures.size() << " read tasks to complete..." << std::endl; -// for (auto& f : futures) { -// f.get(); // This blocks until the future is ready. -// // If a task failed (e.g., via ASSERT_STATUS_OK), -// // gtest will have already flagged the failure. -// // If the task threw an exception, get() will re-throw it. -// } - -// std::cout << "All " << num_reads << " parallel read tasks completed." << std::endl; - - // --- End of ThreadPool implementation --- - - // auto actual0 = std::string{}; - // for(int i =0 ; i< 1000 ; i++){ - // std::tie(r0, t0) = descriptor->Read(0 , 1024* 1024* 1024); - // actual0 = std::string{}; - // while (t0.valid()) { - // auto read = r0.Read(std::move(t0)).get(); - // ASSERT_STATUS_OK(read); - // ReadPayload p; - // AsyncToken t; - // std::tie(p, t) = *std::move(read); - // t0 = std::move(t); - // } - // } - -// auto ans = block + block + block; + + EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); + EXPECT_EQ(metadata->name(), object_name); + EXPECT_EQ(metadata->size(), kBlockSize); +} + +TEST_F(AsyncClientIntegrationTest, Open) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = AsyncClient(TestOptions()); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto create = client.CreateBucket( + bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + + auto constexpr kSize = 8 * 1024; + auto constexpr kStride = 2 * kSize; + auto constexpr kBlockCount = 4; + auto const block = MakeRandomData(kSize); + + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + for (int i = 0; i != kBlockCount; ++i) { + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + } + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + + auto spec = google::storage::v2::BidiReadObjectSpec{}; + spec.set_bucket(BucketName(bucket_name()).FullName()); + spec.set_object(object_name); + auto descriptor = async.Open(spec).get(); + ASSERT_STATUS_OK(descriptor); + + AsyncReader r0; + AsyncToken t0; + auto actual0 = std::string{}; + std::tie(r0, t0) = descriptor->Read(0 * kStride, kSize); + while (t0.valid()) { + auto read = r0.Read(std::move(t0)).get(); + ASSERT_STATUS_OK(read); + ReadPayload p; + AsyncToken t; + std::tie(p, t) = *std::move(read); + for (auto sv : p.contents()) actual0 += std::string(sv); + t0 = std::move(t); + } + + EXPECT_EQ(actual0.size(), kSize); + client.DeleteObject(bucket_name(), object_name, + storage::Generation(metadata->generation())); } +TEST_F(AsyncClientIntegrationTest, OpenExceedMaximumRange) { + if (!UsingEmulator()) GTEST_SKIP(); + auto async = + AsyncClient(TestOptions().set(1024)); + auto client = MakeIntegrationTestClient(true, TestOptions()); + auto object_name = MakeRandomObjectName(); + + auto create = client.CreateBucket( + bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); + if (!create && create.status().code() != StatusCode::kAlreadyExists) { + GTEST_FAIL() << "cannot create bucket: " << create.status(); + } + + auto constexpr kSize = 2048; + auto const block = MakeRandomData(kSize); + + auto w = + async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) + .get(); + ASSERT_STATUS_OK(w); + AsyncWriter writer; + AsyncToken token; + std::tie(writer, token) = *std::move(w); + auto p = writer.Write(std::move(token), WritePayload(block)).get(); + ASSERT_STATUS_OK(p); + token = *std::move(p); + + auto metadata = writer.Finalize(std::move(token)).get(); + ASSERT_STATUS_OK(metadata); + + auto spec = google::storage::v2::BidiReadObjectSpec{}; + spec.set_bucket(BucketName(bucket_name()).FullName()); + spec.set_object(object_name); + auto descriptor = async.Open(spec).get(); + ASSERT_STATUS_OK(descriptor); + + AsyncReader r0; + AsyncToken t0; + auto actual0 = std::string{}; + std::tie(r0, t0) = descriptor->Read(0, kSize); + while (t0.valid()) { + auto read = r0.Read(std::move(t0)).get(); + ASSERT_STATUS_OK(read); + ReadPayload p; + AsyncToken t; + std::tie(p, t) = *std::move(read); + for (auto sv : p.contents()) actual0 += std::string(sv); + t0 = std::move(t); + } + + EXPECT_EQ(actual0.size(), kSize); + client.DeleteObject(bucket_name(), object_name, + storage::Generation(metadata->generation())); +} } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END From 698f77888a2ae3ebb11e3d242ea75e15113953ad Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Thu, 21 May 2026 09:08:42 +0000 Subject: [PATCH 4/5] resolve ai comments --- google/cloud/storage/internal/async/connection_impl.cc | 2 +- google/cloud/storage/tests/async_client_integration_test.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 51c5633106508..267177be45e9c 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -413,7 +413,7 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { if (rpc->first_response.has_resource()) { auto const& resource = rpc->first_response.resource(); if (current->get() && - resource.has_checksums()) { + resource.has_checksums() && resource.checksums().has_crc32c()) { hash = std::make_shared( resource.checksums().crc32c(), resource.size()); } else { diff --git a/google/cloud/storage/tests/async_client_integration_test.cc b/google/cloud/storage/tests/async_client_integration_test.cc index 43d989719e5a1..8bfccb3f2db16 100644 --- a/google/cloud/storage/tests/async_client_integration_test.cc +++ b/google/cloud/storage/tests/async_client_integration_test.cc @@ -1065,4 +1065,4 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace cloud } // namespace google -#endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC \ No newline at end of file +#endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC From 2793d6159a1dca58d620dd98f2c9e12abfb7d9c2 Mon Sep 17 00:00:00 2001 From: Vaibhav Pratap Date: Thu, 21 May 2026 09:15:48 +0000 Subject: [PATCH 5/5] fix the format --- .../storage/internal/async/connection_impl.cc | 28 +++++++++---------- .../connection_impl_appendable_upload_test.cc | 15 ++++++---- .../internal/async/writer_connection_impl.cc | 6 ++-- .../storage/internal/hash_function_impl.h | 2 +- 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 267177be45e9c..624e9de74a204 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -403,13 +403,13 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { return pending.then( [current, request = std::move(p.request), persisted_size, fa = std::move(factory)](auto f) mutable - -> StatusOr> { + -> StatusOr> { auto rpc = f.get(); if (!rpc) return std::move(rpc).status(); - + std::shared_ptr hash; std::unique_ptr impl; - + if (rpc->first_response.has_resource()) { auto const& resource = rpc->first_response.resource(); if (current->get() && @@ -420,8 +420,7 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { hash = CreateHashFunction(*current); } impl = std::make_unique( - current, request, std::move(rpc->stream), hash, - resource, false); + current, request, std::move(rpc->stream), hash, resource, false); } else { persisted_size = rpc->first_response.persisted_size(); hash = CreateHashFunction(*current); @@ -471,7 +470,7 @@ AsyncConnectionImpl::StartBufferedUpload(UploadParams p) { return StartUnbufferedUpload(std::move(p)) .then([current = std::move(current), async_write_object = std::move(async_write_object)](auto f) mutable - -> StatusOr> { + -> StatusOr> { auto w = f.get(); if (!w) return std::move(w).status(); auto factory = [upload_id = (*w)->UploadId(), @@ -505,14 +504,15 @@ AsyncConnectionImpl::ResumeBufferedUpload(ResumeUploadParams p) { }; auto f = make_unbuffered(); - return f.then([current = std::move(current), - make_unbuffered = std::move(make_unbuffered)](auto f) mutable - -> StatusOr> { - auto w = f.get(); - if (!w) return std::move(w).status(); - return MakeWriterConnectionBuffered(std::move(make_unbuffered), - *std::move(w), *current); - }); + return f.then( + [current = std::move(current), + make_unbuffered = std::move(make_unbuffered)](auto f) mutable + -> StatusOr> { + auto w = f.get(); + if (!w) return std::move(w).status(); + return MakeWriterConnectionBuffered(std::move(make_unbuffered), + *std::move(w), *current); + }); } future> diff --git a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc index 45c53f23341fc..27b037f5d17a2 100644 --- a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc @@ -628,7 +628,8 @@ TEST_F(AsyncConnectionImplAppendableTest, AppendableUploadRedirectNoHandle) { next.first.set_value(true); } -TEST_F(AsyncConnectionImplAppendableTest, StartAppendableObjectUploadWithChecksum) { +TEST_F(AsyncConnectionImplAppendableTest, + StartAppendableObjectUploadWithChecksum) { auto constexpr kRequestText = R"pb( write_object_spec { resource { @@ -645,7 +646,7 @@ TEST_F(AsyncConnectionImplAppendableTest, StartAppendableObjectUploadWithChecksu initial_resource.set_bucket("projects/_/buckets/test-bucket"); initial_resource.set_name("test-object"); initial_resource.set_size(1024); - initial_resource.mutable_checksums()->set_crc32c(12345); // Some dummy CRC + initial_resource.mutable_checksums()->set_crc32c(12345); // Some dummy CRC auto stream = std::make_unique(); EXPECT_CALL(*stream, Start).WillOnce([&] { @@ -666,7 +667,8 @@ TEST_F(AsyncConnectionImplAppendableTest, StartAppendableObjectUploadWithChecksu .then([initial_resource](auto) { auto response = google::storage::v2::BidiWriteObjectResponse{}; *response.mutable_resource() = initial_resource; - response.mutable_resource()->set_size(initial_resource.size() + 9); // "some data" size is 9 + response.mutable_resource()->set_size( + initial_resource.size() + 9); // "some data" size is 9 return absl::make_optional(std::move(response)); }); }); @@ -692,7 +694,8 @@ TEST_F(AsyncConnectionImplAppendableTest, StartAppendableObjectUploadWithChecksu grpc::WriteOptions wopt) { EXPECT_TRUE(request.finish_write()); EXPECT_TRUE(wopt.is_last_message()); - // Here we expect full checksums to be set because we had the resource in takeover. + // Here we expect full checksums to be set because we had the resource + // in takeover. EXPECT_TRUE(request.has_object_checksums()); return sequencer.PushBack("Write(Finalize)"); }); @@ -728,7 +731,7 @@ TEST_F(AsyncConnectionImplAppendableTest, StartAppendableObjectUploadWithChecksu auto r = pending.get(); ASSERT_STATUS_OK(r); auto writer = *std::move(r); - + // Write some data. auto w1 = writer->Write(storage::WritePayload("some data")); next = sequencer.PopFrontWithName(); @@ -747,7 +750,7 @@ TEST_F(AsyncConnectionImplAppendableTest, StartAppendableObjectUploadWithChecksu auto response = w2.get(); ASSERT_STATUS_OK(response); - + writer.reset(); next = sequencer.PopFrontWithName(); EXPECT_EQ(next.second, "Finish"); diff --git a/google/cloud/storage/internal/async/writer_connection_impl.cc b/google/cloud/storage/internal/async/writer_connection_impl.cc index 427b4bdf63a7a..32def346097a8 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl.cc @@ -141,7 +141,8 @@ AsyncWriterConnectionImpl::Finalize(storage::WritePayload payload) { auto action = PartialUpload::kFinalizeWithChecksum; if (request_.has_append_object_spec() || request_.write_object_spec().appendable()) { - if (!absl::holds_alternative(persisted_state_)) { + if (!absl::holds_alternative( + persisted_state_)) { action = PartialUpload::kFinalize; } } @@ -260,7 +261,8 @@ future> AsyncWriterConnectionImpl::OnQuery( } if (response->has_persisted_size()) { absl::optional old_obj; - if (absl::holds_alternative(persisted_state_)) { + if (absl::holds_alternative( + persisted_state_)) { old_obj = absl::get(persisted_state_); } diff --git a/google/cloud/storage/internal/hash_function_impl.h b/google/cloud/storage/internal/hash_function_impl.h index a845b87f7cb77..99cf0188d25c6 100644 --- a/google/cloud/storage/internal/hash_function_impl.h +++ b/google/cloud/storage/internal/hash_function_impl.h @@ -106,7 +106,7 @@ class MD5HashFunction : public HashFunction { class Crc32cHashFunction : public HashFunction { public: Crc32cHashFunction() = default; - Crc32cHashFunction(std::uint32_t initial_crc, std::int64_t initial_offset) + explicit Crc32cHashFunction(std::uint32_t initial_crc, std::int64_t initial_offset) : current_(initial_crc), minimum_offset_(initial_offset) {} Crc32cHashFunction(Crc32cHashFunction const&) = delete;