diff --git a/Cargo.lock b/Cargo.lock index 34ed7b1f779..da18cfdb112 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1297,12 +1297,12 @@ checksum = "2cce099aaf3abb89f9a1f8594ffe07fa53738ebc2882fac624d10d9ba31a1b10" [[package]] name = "aya-ebpf-macros" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72f47f7b4a75eb5f1d7ba0fb5628d247b1cf20388658899177875dabdda66865" +checksum = "96fd02363736177e7e91d6c95d7effbca07be87502c7b5b32fc194aed8b177a0" dependencies = [ - "proc-macro-error", "proc-macro2", + "proc-macro2-diagnostics", "quote", "syn 2.0.116", ] @@ -1321,20 +1321,6 @@ dependencies = [ "thiserror 1.0.69", ] -[[package]] -name = "backoff" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" -dependencies = [ - "futures-core", - "getrandom 0.2.15", - "instant", - "pin-project-lite", - "rand 0.8.5", - "tokio", -] - [[package]] name = "base16ct" version = "0.2.0" @@ -5423,30 +5409,6 @@ dependencies = [ "toml_edit", ] -[[package]] -name = "proc-macro-error" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" -dependencies = [ - "proc-macro-error-attr", - "proc-macro2", - "quote", - "syn 1.0.109", - "version_check", -] - -[[package]] -name = "proc-macro-error-attr" -version = "1.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" -dependencies = [ - "proc-macro2", - "quote", - "version_check", -] - [[package]] name = "proc-macro-error-attr2" version = "2.0.0" @@ -5477,6 +5439,18 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "proc-macro2-diagnostics" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af066a9c399a26e020ada66a034357a868728e72cd426f3adcd35f80d88d88c8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.116", + "version_check", +] + [[package]] name = "proptest" version = "1.10.0" @@ -10700,7 +10674,6 @@ name = "solana-storage-bigtable" version = "4.1.0-alpha.0" dependencies = [ "agave-reserved-account-keys", - "backoff", "bincode", "bytes", "bzip2", @@ -12380,12 +12353,11 @@ dependencies = [ [[package]] name = "test-case-core" -version = "3.2.1" +version = "3.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54c25e2cb8f5fcd7318157634e8838aa6f7e4715c96637f969fabaccd1ef5462" +checksum = "adcb7fd841cd518e279be3d5a3eb0636409487998a4aff22f3de87b81e88384f" dependencies = [ "cfg-if 1.0.4", - "proc-macro-error", "proc-macro2", "quote", "syn 2.0.116", @@ -12393,11 +12365,10 @@ dependencies = [ [[package]] name = "test-case-macros" -version = "3.2.1" +version = "3.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37cfd7bbc88a0104e304229fba519bdc45501a30b760fb72240342f1289ad257" +checksum = "5c89e72a01ed4c579669add59014b9a524d609c0c88c6a585ce37485879f6ffb" dependencies = [ - "proc-macro-error", "proc-macro2", "quote", "syn 2.0.116", diff --git a/Cargo.toml b/Cargo.toml index 88c47de68cf..10ac0e933b7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -223,7 +223,6 @@ atty = "0.2.11" axum = "0.8.8" aya = "0.13" aya-ebpf = "0.1.1" -backoff = "0.4.0" base64 = "0.22.1" bencher = "0.1.5" bincode = "1.3.3" diff --git a/storage-bigtable/Cargo.toml b/storage-bigtable/Cargo.toml index 0864d17acab..aac941332bc 100644 --- a/storage-bigtable/Cargo.toml +++ b/storage-bigtable/Cargo.toml @@ -21,7 +21,6 @@ agave-unstable-api = [] [dependencies] agave-reserved-account-keys = { workspace = true } -backoff = { workspace = true, features = ["tokio"] } bincode = { workspace = true } bytes = { workspace = true } bzip2 = { workspace = true } diff --git a/storage-bigtable/src/bigtable.rs b/storage-bigtable/src/bigtable.rs index 9ead4188864..0f794c7c85e 100644 --- a/storage-bigtable/src/bigtable.rs +++ b/storage-bigtable/src/bigtable.rs @@ -7,9 +7,9 @@ use { compression::{compress_best, decompress}, root_ca_certificate, }, - backoff::{Error as BackoffError, ExponentialBackoff, future::retry}, log::*, std::{ + future::Future, str::FromStr, time::{Duration, Instant}, }, @@ -85,13 +85,40 @@ pub enum Error { Timeout, } -fn to_backoff_err(err: Error) -> BackoffError { - if let Error::Rpc(ref status) = err { - if status.code() == tonic::Code::NotFound && status.message().starts_with("table") { - return BackoffError::Permanent(err); +fn is_retryable_error(err: &Error) -> bool { + if let Error::Rpc(status) = err { + return !(status.code() == tonic::Code::NotFound && status.message().starts_with("table")); + } + true +} + +async fn retry_with_exponential_backoff(mut operation: O) -> Result +where + O: FnMut() -> F, + F: Future>, +{ + const INITIAL_INTERVAL: Duration = Duration::from_millis(500); + const MULTIPLIER: u32 = 2; + const MAX_INTERVAL: Duration = Duration::from_secs(60); + const MAX_ELAPSED_TIME: Duration = Duration::from_secs(15 * 60); + + let started = Instant::now(); + let mut delay = INITIAL_INTERVAL; + + loop { + match operation().await { + Ok(value) => return Ok(value), + Err(err) if is_retryable_error(&err) => { + if started.elapsed() >= MAX_ELAPSED_TIME { + return Err(err); + } + + tokio::time::sleep(delay).await; + delay = std::cmp::min(delay.saturating_mul(MULTIPLIER), MAX_INTERVAL); + } + Err(err) => return Err(err), } } - err.into() } impl std::convert::From for Error { @@ -286,18 +313,17 @@ impl BigTableConnection { where T: serde::ser::Serialize, { - retry(ExponentialBackoff::default(), || async { + retry_with_exponential_backoff(|| async { let mut client = self.client(); - let result = client.put_bincode_cells(table, cells).await; - result.map_err(to_backoff_err) + client.put_bincode_cells(table, cells).await }) .await } pub async fn delete_rows_with_retry(&self, table: &str, row_keys: &[RowKey]) -> Result<()> { - retry(ExponentialBackoff::default(), || async { + retry_with_exponential_backoff(|| async { let mut client = self.client(); - Ok(client.delete_rows(table, row_keys).await?) + client.delete_rows(table, row_keys).await }) .await } @@ -310,9 +336,9 @@ impl BigTableConnection { where T: serde::de::DeserializeOwned, { - retry(ExponentialBackoff::default(), || async { + retry_with_exponential_backoff(|| async { let mut client = self.client(); - Ok(client.get_bincode_cells(table, row_keys).await?) + client.get_bincode_cells(table, row_keys).await }) .await } @@ -325,10 +351,9 @@ impl BigTableConnection { where T: prost::Message, { - retry(ExponentialBackoff::default(), || async { + retry_with_exponential_backoff(|| async { let mut client = self.client(); - let result = client.put_protobuf_cells(table, cells).await; - result.map_err(to_backoff_err) + client.put_protobuf_cells(table, cells).await }) .await }