From 9d32ea5f4851ca9c7fa4868682afb46f10d70a6b Mon Sep 17 00:00:00 2001 From: osuketh Date: Sun, 23 May 2021 12:59:09 +0900 Subject: [PATCH 1/4] retry requests to azure blob --- Cargo.lock | 2 + frame/azure-client/Cargo.toml | 2 + frame/azure-client/src/blob.rs | 74 +++++++++++++++++++++------------- 3 files changed, 51 insertions(+), 27 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fa76f1bc..a22b3209 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1828,6 +1828,8 @@ dependencies = [ "azure_storage", "bytes 1.0.1", "env_logger 0.8.3", + "frame-config", + "frame-retrier", "reqwest 0.11.3", "tokio 1.6.0", "url 2.2.0", diff --git a/frame/azure-client/Cargo.toml b/frame/azure-client/Cargo.toml index cc25503f..0d789475 100644 --- a/frame/azure-client/Cargo.toml +++ b/frame/azure-client/Cargo.toml @@ -5,6 +5,8 @@ authors = ["LayerX Labs "] edition = "2018" [dependencies] +frame-retrier = { path = "../retrier" } +frame-config = { path = "../../frame/config" } anyhow = "1.0" tokio = { version = "1", features = ["full"] } bytes = "1.0" diff --git a/frame/azure-client/src/blob.rs b/frame/azure-client/src/blob.rs index d0505659..3569dd27 100644 --- a/frame/azure-client/src/blob.rs +++ b/frame/azure-client/src/blob.rs @@ -1,11 +1,14 @@ use anyhow::anyhow; use azure_core::prelude::Range; use azure_core::HttpClient; +#[cfg(test)] use azure_storage::blob::container::PublicAccess; use azure_storage::blob::prelude::{AsBlobClient, AsContainerClient}; use azure_storage::clients::AsStorageClient; use azure_storage::core::clients::{StorageAccountClient, StorageClient}; use bytes::Bytes; +use frame_config::{REQUEST_RETRIES, RETRY_DELAY_MILLS}; +use frame_retrier::{strategy, Retry}; use std::sync::Arc; #[cfg(test)] use url::Url; @@ -58,13 +61,17 @@ impl BlobClient { .client .as_container_client(container_name) .as_blob_client(blob_name); - - let response = blob_client - .get() - .range(Range::new(0, 1024)) // TODO: Fix range nums - .execute() - .await - .map_err(|err| anyhow!(err))?; + let request = blob_client.get().range(Range::new(0, 1024)); // TODO: Fix range nums + + let response = Retry::new( + "get_blob", + *REQUEST_RETRIES, + strategy::FixedDelay::new(*RETRY_DELAY_MILLS), + ) + .set_condition(|res| matches!(res, Err(_err))) // TODO: Set concrete retry conditions + .spawn_async(|| async { request.execute().await }) + .await + .map_err(|err| anyhow!(err))?; let s_content = String::from_utf8(response.data.to_vec())?; @@ -82,13 +89,17 @@ impl BlobClient { .client .as_container_client(container_name) .as_blob_client(blob_name); - - let _res = blob_client - .put_block_blob(data) - .content_type("text/plain") - .execute() - .await - .map_err(|err| anyhow!(err))?; + let request = blob_client.put_block_blob(data).content_type("text/plain"); + + let _response = Retry::new( + "put_blob", + *REQUEST_RETRIES, + strategy::FixedDelay::new(*RETRY_DELAY_MILLS), + ) + .set_condition(|res| matches!(res, Err(_err))) // TODO: Set concrete retry conditions + .spawn_async(|| async { request.execute().await }) + .await + .map_err(|err| anyhow!(err))?; Ok(()) } @@ -96,12 +107,16 @@ impl BlobClient { /// list_containers gets list of container names. #[cfg(test)] pub async fn list_containers(&self) -> anyhow::Result> { - let iv = self - .client - .list_containers() - .execute() - .await - .map_err(|err| anyhow!(err))?; + let request = self.client.list_containers(); + let iv = Retry::new( + "list containers", + *REQUEST_RETRIES, + strategy::FixedDelay::new(*RETRY_DELAY_MILLS), + ) + .set_condition(|res| matches!(res, Err(_err))) // TODO: Set concrete retry conditions + .spawn_async(|| async { request.execute().await }) + .await + .map_err(|err| anyhow!(err))?; let mut vector: Vec = Vec::with_capacity(iv.incomplete_vector.len()); for cont in iv.incomplete_vector.iter() { @@ -112,15 +127,20 @@ impl BlobClient { } /// create_container creates a container. + #[cfg(test)] pub async fn create_container(&self, container_name: impl Into) -> anyhow::Result<()> { let container_client = self.client.as_container_client(container_name); - - let _res = container_client - .create() - .public_access(PublicAccess::None) - .execute() - .await - .map_err(|err| anyhow!(err))?; + let request = container_client.create().public_access(PublicAccess::None); + + let _response = Retry::new( + "create containers", + *REQUEST_RETRIES, + strategy::FixedDelay::new(*RETRY_DELAY_MILLS), + ) + .set_condition(|res| matches!(res, Err(_err))) // TODO: Set concrete retry conditions + .spawn_async(|| async { request.execute().await }) + .await + .map_err(|err| anyhow!(err))?; Ok(()) } From d4a48e0423cb8671d230994d5a5753abf9ccbf61 Mon Sep 17 00:00:00 2001 From: osuketh Date: Sun, 23 May 2021 13:30:38 +0900 Subject: [PATCH 2/4] fix create_container func --- frame/azure-client/src/blob.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/frame/azure-client/src/blob.rs b/frame/azure-client/src/blob.rs index 3569dd27..11ca2e23 100644 --- a/frame/azure-client/src/blob.rs +++ b/frame/azure-client/src/blob.rs @@ -128,17 +128,21 @@ impl BlobClient { /// create_container creates a container. #[cfg(test)] - pub async fn create_container(&self, container_name: impl Into) -> anyhow::Result<()> { - let container_client = self.client.as_container_client(container_name); - let request = container_client.create().public_access(PublicAccess::None); - + pub async fn create_container(&self, container_name: &str) -> anyhow::Result<()> { let _response = Retry::new( "create containers", *REQUEST_RETRIES, strategy::FixedDelay::new(*RETRY_DELAY_MILLS), ) .set_condition(|res| matches!(res, Err(_err))) // TODO: Set concrete retry conditions - .spawn_async(|| async { request.execute().await }) + .spawn_async(|| async { + let container_client = self.client.as_container_client(container_name); + container_client + .create() + .public_access(PublicAccess::None) + .execute() + .await + }) .await .map_err(|err| anyhow!(err))?; From 15af70374890e018ed474cb68f66310964a56c28 Mon Sep 17 00:00:00 2001 From: osuketh Date: Sun, 23 May 2021 14:54:45 +0900 Subject: [PATCH 3/4] impl use_tokio_rt() --- Cargo.lock | 5 +---- frame/azure-client/Cargo.toml | 2 +- frame/azure-client/src/blob.rs | 4 ++++ frame/retrier/Cargo.toml | 2 ++ frame/retrier/src/retry.rs | 15 ++++++++++++++- 5 files changed, 22 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a22b3209..4cca29dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1946,6 +1946,7 @@ version = "0.5.4" dependencies = [ "actix-rt", "sgx_tstd", + "tokio 1.6.0", "tracing", ] @@ -5476,12 +5477,8 @@ dependencies = [ "memchr", "mio 0.7.11", "num_cpus", - "once_cell 1.7.2", - "parking_lot 0.11.0", "pin-project-lite 0.2.4", - "signal-hook-registry", "tokio-macros 1.2.0", - "winapi 0.3.9", ] [[package]] diff --git a/frame/azure-client/Cargo.toml b/frame/azure-client/Cargo.toml index 0d789475..9465d58f 100644 --- a/frame/azure-client/Cargo.toml +++ b/frame/azure-client/Cargo.toml @@ -8,7 +8,6 @@ edition = "2018" frame-retrier = { path = "../retrier" } frame-config = { path = "../../frame/config" } anyhow = "1.0" -tokio = { version = "1", features = ["full"] } bytes = "1.0" url = "2.2" reqwest = { version = "0.11", features = ["json"] } @@ -19,6 +18,7 @@ azure_storage = { version = "0.1", git = "https://github.com/Azure/azure-sdk-for azure_core = { version = "0.1", git = "https://github.com/Azure/azure-sdk-for-rust.git", default-features = false, features = ["azurite_workaround"]} azure_storage = { version = "0.1", git = "https://github.com/Azure/azure-sdk-for-rust.git", default-features = false, features = ["blob", "azurite_workaround"] } env_logger = "0.8" +tokio = { version = "1", features = ["macros"] } [features] default = ["blob"] diff --git a/frame/azure-client/src/blob.rs b/frame/azure-client/src/blob.rs index 11ca2e23..cdc7c866 100644 --- a/frame/azure-client/src/blob.rs +++ b/frame/azure-client/src/blob.rs @@ -68,6 +68,7 @@ impl BlobClient { *REQUEST_RETRIES, strategy::FixedDelay::new(*RETRY_DELAY_MILLS), ) + .use_tokio_rt() .set_condition(|res| matches!(res, Err(_err))) // TODO: Set concrete retry conditions .spawn_async(|| async { request.execute().await }) .await @@ -96,6 +97,7 @@ impl BlobClient { *REQUEST_RETRIES, strategy::FixedDelay::new(*RETRY_DELAY_MILLS), ) + .use_tokio_rt() .set_condition(|res| matches!(res, Err(_err))) // TODO: Set concrete retry conditions .spawn_async(|| async { request.execute().await }) .await @@ -113,6 +115,7 @@ impl BlobClient { *REQUEST_RETRIES, strategy::FixedDelay::new(*RETRY_DELAY_MILLS), ) + .use_tokio_rt() .set_condition(|res| matches!(res, Err(_err))) // TODO: Set concrete retry conditions .spawn_async(|| async { request.execute().await }) .await @@ -134,6 +137,7 @@ impl BlobClient { *REQUEST_RETRIES, strategy::FixedDelay::new(*RETRY_DELAY_MILLS), ) + .use_tokio_rt() .set_condition(|res| matches!(res, Err(_err))) // TODO: Set concrete retry conditions .spawn_async(|| async { let container_client = self.client.as_container_client(container_name); diff --git a/frame/retrier/Cargo.toml b/frame/retrier/Cargo.toml index 24e329f0..546d2094 100644 --- a/frame/retrier/Cargo.toml +++ b/frame/retrier/Cargo.toml @@ -8,11 +8,13 @@ edition = "2018" sgx_tstd = { rev = "v1.1.3", git = "https://github.com/apache/teaclave-sgx-sdk.git", optional = true } tracing = { version = "0.1", default-features = false } actix-rt = { version = "1.1", optional = true } +tokio = { version = "1.6", optional = true, features = ["time"] } [features] default = ["std"] std = [ "actix-rt", + "tokio", ] sgx = [ "sgx_tstd", diff --git a/frame/retrier/src/retry.rs b/frame/retrier/src/retry.rs index 713ae022..8aa63148 100644 --- a/frame/retrier/src/retry.rs +++ b/frame/retrier/src/retry.rs @@ -13,6 +13,7 @@ pub struct Retry { tries: usize, strategy: I, condition: Condition, + is_tokio_rt: bool, } impl Retry @@ -27,6 +28,7 @@ where tries, strategy, condition: Condition::Always, + is_tokio_rt: false, } } @@ -39,6 +41,12 @@ where self } + /// Use tokio runtime to retry + pub fn use_tokio_rt(mut self) -> Self { + self.is_tokio_rt = true; + self + } + /// Retry a given operation a certain number of times. /// The interval depends on the delay strategy. pub fn spawn(self, mut operation: O) -> Result @@ -86,7 +94,12 @@ where curr_tries + 1, res ); - actix_rt::time::delay_for(delay).await; + if self.is_tokio_rt { + tokio::time::sleep(delay).await; + } else { + actix_rt::time::delay_for(delay).await; + } + } else { // if it overs the number of retries return res; From 7ae5a877955355968d8215d77cfa8a03ff07f95d Mon Sep 17 00:00:00 2001 From: osuketh Date: Sun, 23 May 2021 16:05:01 +0900 Subject: [PATCH 4/4] separete azure client tests in CI --- azure-pipelines.yml | 9 +++++++++ scripts/test.sh | 1 - 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index c48f9e7f..fbfd04d1 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -9,6 +9,15 @@ stages: pool: name: 'AnonifyAgent' steps: + - script: | + docker-compose up -d + cargo test -- --nocapture + workingDirectory: frame/azure-client + displayName: 'Run azure-client tests' + - script: docker-compose down + workingDirectory: frame/azure-client + condition: always() + displayName: 'azure client docker-compose down' - script: | cp .env.sample .env export SPID=$(SPID) diff --git a/scripts/test.sh b/scripts/test.sh index f1c0fde0..4e6bf275 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -177,7 +177,6 @@ RUST_BACKTRACE=1 RUST_LOG=debug TEST=1 cargo test \ -p unit-tests-host \ -p frame-runtime \ -p frame-retrier \ - -p frame-azure-client \ -p frame-sodium -- --nocapture #