diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 7e123e61ce06c..624e9de74a204 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -315,8 +315,6 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { auto current = internal::MakeImmutableOptions(std::move(p.options)); auto request = p.request; std::int64_t persisted_size = 0; - std::shared_ptr hash_function = - CreateHashFunction(*current); auto retry = std::shared_ptr(retry_policy(*current)); auto backoff = @@ -404,17 +402,28 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { auto pending = factory(std::move(request)); return pending.then( [current, request = std::move(p.request), persisted_size, - hash = std::move(hash_function), fa = std::move(factory)](auto f) mutable - -> StatusOr> { + fa = std::move(factory)](auto f) mutable + -> StatusOr> { auto rpc = f.get(); if (!rpc) return std::move(rpc).status(); + + std::shared_ptr hash; std::unique_ptr impl; + if (rpc->first_response.has_resource()) { + auto const& resource = rpc->first_response.resource(); + if (current->get() && + resource.has_checksums() && resource.checksums().has_crc32c()) { + hash = std::make_shared( + resource.checksums().crc32c(), resource.size()); + } else { + hash = CreateHashFunction(*current); + } impl = std::make_unique( - current, request, std::move(rpc->stream), hash, - rpc->first_response.resource(), false); + current, request, std::move(rpc->stream), hash, resource, false); } else { persisted_size = rpc->first_response.persisted_size(); + hash = CreateHashFunction(*current); impl = std::make_unique( current, request, std::move(rpc->stream), hash, persisted_size, false); @@ -461,7 +470,7 @@ AsyncConnectionImpl::StartBufferedUpload(UploadParams p) { return StartUnbufferedUpload(std::move(p)) .then([current = std::move(current), async_write_object = std::move(async_write_object)](auto f) mutable - -> StatusOr> { + -> StatusOr> { auto w = f.get(); if (!w) return std::move(w).status(); auto factory = [upload_id = (*w)->UploadId(), @@ -495,14 +504,15 @@ AsyncConnectionImpl::ResumeBufferedUpload(ResumeUploadParams p) { }; auto f = make_unbuffered(); - return f.then([current = std::move(current), - make_unbuffered = std::move(make_unbuffered)](auto f) mutable - -> StatusOr> { - auto w = f.get(); - if (!w) return std::move(w).status(); - return MakeWriterConnectionBuffered(std::move(make_unbuffered), - *std::move(w), *current); - }); + return f.then( + [current = std::move(current), + make_unbuffered = std::move(make_unbuffered)](auto f) mutable + -> StatusOr> { + auto w = f.get(); + if (!w) return std::move(w).status(); + return MakeWriterConnectionBuffered(std::move(make_unbuffered), + *std::move(w), *current); + }); } future> diff --git a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc index bb6833b1526a7..27b037f5d17a2 100644 --- a/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_appendable_upload_test.cc @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include "google/cloud/storage/async/options.h" #include "google/cloud/storage/async/retry_policy.h" #include "google/cloud/storage/async/writer_connection.h" #include "google/cloud/storage/internal/async/connection_impl.h" @@ -627,6 +628,135 @@ TEST_F(AsyncConnectionImplAppendableTest, AppendableUploadRedirectNoHandle) { next.first.set_value(true); } +TEST_F(AsyncConnectionImplAppendableTest, + StartAppendableObjectUploadWithChecksum) { + auto constexpr kRequestText = R"pb( + write_object_spec { + resource { + bucket: "projects/_/buckets/test-bucket" + name: "test-object" + content_type: "text/plain" + } + } + )pb"; + AsyncSequencer sequencer; + auto mock = std::make_shared(); + + google::storage::v2::Object initial_resource; + initial_resource.set_bucket("projects/_/buckets/test-bucket"); + initial_resource.set_name("test-object"); + initial_resource.set_size(1024); + initial_resource.mutable_checksums()->set_crc32c(12345); // Some dummy CRC + + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Start).WillOnce([&] { + return sequencer.PushBack("Start"); + }); + + EXPECT_CALL(*stream, Read) + .WillOnce([&, initial_resource] { + return sequencer.PushBack("Read(Takeover)") + .then([initial_resource](auto) { + auto response = google::storage::v2::BidiWriteObjectResponse{}; + *response.mutable_resource() = initial_resource; + return absl::make_optional(std::move(response)); + }); + }) + .WillOnce([&, initial_resource] { + return sequencer.PushBack("Read(FinalObject)") + .then([initial_resource](auto) { + auto response = google::storage::v2::BidiWriteObjectResponse{}; + *response.mutable_resource() = initial_resource; + response.mutable_resource()->set_size( + initial_resource.size() + 9); // "some data" size is 9 + return absl::make_optional(std::move(response)); + }); + }); + + EXPECT_CALL(*stream, Cancel).Times(1); + EXPECT_CALL(*stream, Finish).WillOnce([&] { + return sequencer.PushBack("Finish").then([](auto) { return Status{}; }); + }); + + EXPECT_CALL(*stream, Write) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions wopt) { + EXPECT_TRUE(request.state_lookup()); + EXPECT_FALSE(wopt.is_last_message()); + return sequencer.PushBack("Write(StateLookup)"); + }) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions wopt) { + EXPECT_FALSE(wopt.is_last_message()); + return sequencer.PushBack("Write(data)"); + }) + .WillOnce([&](google::storage::v2::BidiWriteObjectRequest const& request, + grpc::WriteOptions wopt) { + EXPECT_TRUE(request.finish_write()); + EXPECT_TRUE(wopt.is_last_message()); + // Here we expect full checksums to be set because we had the resource + // in takeover. + EXPECT_TRUE(request.has_object_checksums()); + return sequencer.PushBack("Write(Finalize)"); + }); + + EXPECT_CALL(*mock, AsyncBidiWriteObject).WillOnce([&] { + return std::unique_ptr(std::move(stream)); + }); + + internal::AutomaticallyCreatedBackgroundThreads pool(1); + // Enable CRC32C validation in options + auto options = TestOptions().set(true); + auto connection = MakeTestConnection(pool.cq(), mock, options); + + auto request = google::storage::v2::BidiWriteObjectRequest{}; + ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request)); + request.mutable_write_object_spec()->set_appendable(true); + + auto pending = connection->StartAppendableObjectUpload( + {std::move(request), connection->options()}); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Start"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write(StateLookup)"); + next.first.set_value(true); + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read(Takeover)"); + next.first.set_value(true); + + auto r = pending.get(); + ASSERT_STATUS_OK(r); + auto writer = *std::move(r); + + // Write some data. + auto w1 = writer->Write(storage::WritePayload("some data")); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write(data)"); + next.first.set_value(true); + EXPECT_STATUS_OK(w1.get()); + + // Finalize the upload. + auto w2 = writer->Finalize({}); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Write(Finalize)"); + next.first.set_value(true); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Read(FinalObject)"); + next.first.set_value(true); + + auto response = w2.get(); + ASSERT_STATUS_OK(response); + + writer.reset(); + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Finish"); + next.first.set_value(true); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/async/writer_connection_impl.cc b/google/cloud/storage/internal/async/writer_connection_impl.cc index a97a62960e3f6..32def346097a8 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl.cc @@ -138,10 +138,14 @@ AsyncWriterConnectionImpl::Finalize(storage::WritePayload payload) { auto p = WritePayloadImpl::GetImpl(payload); auto size = p.size(); - auto action = request_.has_append_object_spec() || - request_.write_object_spec().appendable() - ? PartialUpload::kFinalize - : PartialUpload::kFinalizeWithChecksum; + auto action = PartialUpload::kFinalizeWithChecksum; + if (request_.has_append_object_spec() || + request_.write_object_spec().appendable()) { + if (!absl::holds_alternative( + persisted_state_)) { + action = PartialUpload::kFinalize; + } + } auto coro = PartialUpload::Call(impl_, hash_function_, std::move(write), std::move(p), std::move(action)); return coro->Start().then([coro, size, this](auto f) mutable { @@ -256,7 +260,26 @@ future> AsyncWriterConnectionImpl::OnQuery( latest_write_handle_ = response->write_handle(); } if (response->has_persisted_size()) { + absl::optional old_obj; + if (absl::holds_alternative( + persisted_state_)) { + old_obj = absl::get(persisted_state_); + } + persisted_state_ = response->persisted_size(); + + if (response->has_persisted_data_checksums()) { + auto const& checksums = response->persisted_data_checksums(); + if (checksums.has_crc32c()) { + google::storage::v2::Object obj; + obj.set_size(response->persisted_size()); + *obj.mutable_checksums() = checksums; + if (old_obj) { + obj.set_generation(old_obj->generation()); + } + persisted_state_ = obj; + } + } return make_ready_future(make_status_or(response->persisted_size())); } if (response->has_resource()) { diff --git a/google/cloud/storage/internal/hash_function_impl.h b/google/cloud/storage/internal/hash_function_impl.h index b3a8e68eeebfe..99cf0188d25c6 100644 --- a/google/cloud/storage/internal/hash_function_impl.h +++ b/google/cloud/storage/internal/hash_function_impl.h @@ -106,6 +106,8 @@ class MD5HashFunction : public HashFunction { class Crc32cHashFunction : public HashFunction { public: Crc32cHashFunction() = default; + explicit Crc32cHashFunction(std::uint32_t initial_crc, std::int64_t initial_offset) + : current_(initial_crc), minimum_offset_(initial_offset) {} Crc32cHashFunction(Crc32cHashFunction const&) = delete; Crc32cHashFunction& operator=(Crc32cHashFunction const&) = delete;