diff --git a/Cargo.lock b/Cargo.lock index fa76f1bc0..4cca29dd0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1828,6 +1828,8 @@ dependencies = [ "azure_storage", "bytes 1.0.1", "env_logger 0.8.3", + "frame-config", + "frame-retrier", "reqwest 0.11.3", "tokio 1.6.0", "url 2.2.0", @@ -1944,6 +1946,7 @@ version = "0.5.4" dependencies = [ "actix-rt", "sgx_tstd", + "tokio 1.6.0", "tracing", ] @@ -5474,12 +5477,8 @@ dependencies = [ "memchr", "mio 0.7.11", "num_cpus", - "once_cell 1.7.2", - "parking_lot 0.11.0", "pin-project-lite 0.2.4", - "signal-hook-registry", "tokio-macros 1.2.0", - "winapi 0.3.9", ] [[package]] diff --git a/azure-pipelines.yml b/azure-pipelines.yml index c48f9e7f1..fbfd04d15 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -9,6 +9,15 @@ stages: pool: name: 'AnonifyAgent' steps: + - script: | + docker-compose up -d + cargo test -- --nocapture + workingDirectory: frame/azure-client + displayName: 'Run azure-client tests' + - script: docker-compose down + workingDirectory: frame/azure-client + condition: always() + displayName: 'azure client docker-compose down' - script: | cp .env.sample .env export SPID=$(SPID) diff --git a/frame/azure-client/Cargo.toml b/frame/azure-client/Cargo.toml index cc25503fb..9465d58f0 100644 --- a/frame/azure-client/Cargo.toml +++ b/frame/azure-client/Cargo.toml @@ -5,8 +5,9 @@ authors = ["LayerX Labs "] edition = "2018" [dependencies] +frame-retrier = { path = "../retrier" } +frame-config = { path = "../../frame/config" } anyhow = "1.0" -tokio = { version = "1", features = ["full"] } bytes = "1.0" url = "2.2" reqwest = { version = "0.11", features = ["json"] } @@ -17,6 +18,7 @@ azure_storage = { version = "0.1", git = "https://github.com/Azure/azure-sdk-for azure_core = { version = "0.1", git = "https://github.com/Azure/azure-sdk-for-rust.git", default-features = false, features = ["azurite_workaround"]} azure_storage = { version = "0.1", git = "https://github.com/Azure/azure-sdk-for-rust.git", default-features = false, features = ["blob", "azurite_workaround"] } env_logger = "0.8" +tokio = { version = "1", features = ["macros"] } [features] default = ["blob"] diff --git a/frame/azure-client/src/blob.rs b/frame/azure-client/src/blob.rs index d05056591..cdc7c866d 100644 --- a/frame/azure-client/src/blob.rs +++ b/frame/azure-client/src/blob.rs @@ -1,11 +1,14 @@ use anyhow::anyhow; use azure_core::prelude::Range; use azure_core::HttpClient; +#[cfg(test)] use azure_storage::blob::container::PublicAccess; use azure_storage::blob::prelude::{AsBlobClient, AsContainerClient}; use azure_storage::clients::AsStorageClient; use azure_storage::core::clients::{StorageAccountClient, StorageClient}; use bytes::Bytes; +use frame_config::{REQUEST_RETRIES, RETRY_DELAY_MILLS}; +use frame_retrier::{strategy, Retry}; use std::sync::Arc; #[cfg(test)] use url::Url; @@ -58,13 +61,18 @@ impl BlobClient { .client .as_container_client(container_name) .as_blob_client(blob_name); - - let response = blob_client - .get() - .range(Range::new(0, 1024)) // TODO: Fix range nums - .execute() - .await - .map_err(|err| anyhow!(err))?; + let request = blob_client.get().range(Range::new(0, 1024)); // TODO: Fix range nums + + let response = Retry::new( + "get_blob", + *REQUEST_RETRIES, + strategy::FixedDelay::new(*RETRY_DELAY_MILLS), + ) + .use_tokio_rt() + .set_condition(|res| matches!(res, Err(_err))) // TODO: Set concrete retry conditions + .spawn_async(|| async { request.execute().await }) + .await + .map_err(|err| anyhow!(err))?; let s_content = String::from_utf8(response.data.to_vec())?; @@ -82,13 +90,18 @@ impl BlobClient { .client .as_container_client(container_name) .as_blob_client(blob_name); - - let _res = blob_client - .put_block_blob(data) - .content_type("text/plain") - .execute() - .await - .map_err(|err| anyhow!(err))?; + let request = blob_client.put_block_blob(data).content_type("text/plain"); + + let _response = Retry::new( + "put_blob", + *REQUEST_RETRIES, + strategy::FixedDelay::new(*RETRY_DELAY_MILLS), + ) + .use_tokio_rt() + .set_condition(|res| matches!(res, Err(_err))) // TODO: Set concrete retry conditions + .spawn_async(|| async { request.execute().await }) + .await + .map_err(|err| anyhow!(err))?; Ok(()) } @@ -96,12 +109,17 @@ impl BlobClient { /// list_containers gets list of container names. #[cfg(test)] pub async fn list_containers(&self) -> anyhow::Result> { - let iv = self - .client - .list_containers() - .execute() - .await - .map_err(|err| anyhow!(err))?; + let request = self.client.list_containers(); + let iv = Retry::new( + "list containers", + *REQUEST_RETRIES, + strategy::FixedDelay::new(*RETRY_DELAY_MILLS), + ) + .use_tokio_rt() + .set_condition(|res| matches!(res, Err(_err))) // TODO: Set concrete retry conditions + .spawn_async(|| async { request.execute().await }) + .await + .map_err(|err| anyhow!(err))?; let mut vector: Vec = Vec::with_capacity(iv.incomplete_vector.len()); for cont in iv.incomplete_vector.iter() { @@ -112,15 +130,25 @@ impl BlobClient { } /// create_container creates a container. - pub async fn create_container(&self, container_name: impl Into) -> anyhow::Result<()> { - let container_client = self.client.as_container_client(container_name); - - let _res = container_client - .create() - .public_access(PublicAccess::None) - .execute() - .await - .map_err(|err| anyhow!(err))?; + #[cfg(test)] + pub async fn create_container(&self, container_name: &str) -> anyhow::Result<()> { + let _response = Retry::new( + "create containers", + *REQUEST_RETRIES, + strategy::FixedDelay::new(*RETRY_DELAY_MILLS), + ) + .use_tokio_rt() + .set_condition(|res| matches!(res, Err(_err))) // TODO: Set concrete retry conditions + .spawn_async(|| async { + let container_client = self.client.as_container_client(container_name); + container_client + .create() + .public_access(PublicAccess::None) + .execute() + .await + }) + .await + .map_err(|err| anyhow!(err))?; Ok(()) } diff --git a/frame/retrier/Cargo.toml b/frame/retrier/Cargo.toml index 24e329f09..546d2094e 100644 --- a/frame/retrier/Cargo.toml +++ b/frame/retrier/Cargo.toml @@ -8,11 +8,13 @@ edition = "2018" sgx_tstd = { rev = "v1.1.3", git = "https://github.com/apache/teaclave-sgx-sdk.git", optional = true } tracing = { version = "0.1", default-features = false } actix-rt = { version = "1.1", optional = true } +tokio = { version = "1.6", optional = true, features = ["time"] } [features] default = ["std"] std = [ "actix-rt", + "tokio", ] sgx = [ "sgx_tstd", diff --git a/frame/retrier/src/retry.rs b/frame/retrier/src/retry.rs index 713ae0221..8aa631487 100644 --- a/frame/retrier/src/retry.rs +++ b/frame/retrier/src/retry.rs @@ -13,6 +13,7 @@ pub struct Retry { tries: usize, strategy: I, condition: Condition, + is_tokio_rt: bool, } impl Retry @@ -27,6 +28,7 @@ where tries, strategy, condition: Condition::Always, + is_tokio_rt: false, } } @@ -39,6 +41,12 @@ where self } + /// Use tokio runtime to retry + pub fn use_tokio_rt(mut self) -> Self { + self.is_tokio_rt = true; + self + } + /// Retry a given operation a certain number of times. /// The interval depends on the delay strategy. pub fn spawn(self, mut operation: O) -> Result @@ -86,7 +94,12 @@ where curr_tries + 1, res ); - actix_rt::time::delay_for(delay).await; + if self.is_tokio_rt { + tokio::time::sleep(delay).await; + } else { + actix_rt::time::delay_for(delay).await; + } + } else { // if it overs the number of retries return res; diff --git a/scripts/test.sh b/scripts/test.sh index f1c0fde01..4e6bf2751 100755 --- a/scripts/test.sh +++ b/scripts/test.sh @@ -177,7 +177,6 @@ RUST_BACKTRACE=1 RUST_LOG=debug TEST=1 cargo test \ -p unit-tests-host \ -p frame-runtime \ -p frame-retrier \ - -p frame-azure-client \ -p frame-sodium -- --nocapture #