From 680c1ababa7b403200d615fbca65ca99b3e5822e Mon Sep 17 00:00:00 2001 From: Skiba Jan Date: Mon, 30 Mar 2026 11:32:09 +0200 Subject: [PATCH 1/2] make futures a workspace dependency --- Cargo.toml | 1 + broker/Cargo.toml | 2 +- broker/src/serve_health.rs | 2 +- broker/src/serve_tasks.rs | 2 +- broker/src/task_manager.rs | 2 +- proxy/Cargo.toml | 2 +- shared/Cargo.toml | 1 + tests/Cargo.toml | 2 +- 8 files changed, 8 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 4333f6d3..8bee7ae4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ beam-lib = { path = "./beam-lib", features = [ "strict-ids" ] } # Command Line Interface clap = { version = "4", features = ["env", "derive"] } reqwest = { version = "0.13", default-features = false } +futures = { version = "0.3" } [profile.release] #opt-level = "z" # Optimize for size. diff --git a/broker/Cargo.toml b/broker/Cargo.toml index d0b755ca..0c94f774 100644 --- a/broker/Cargo.toml +++ b/broker/Cargo.toml @@ -27,7 +27,7 @@ tracing = "0.1" # Server-sent Events (SSE) support async-stream = "0.3" -futures-core = { version = "0.3", default-features = false } +futures.workspace = true once_cell = "1" # Socket dependencies bytes = { version = "1", optional = true } diff --git a/broker/src/serve_health.rs b/broker/src/serve_health.rs index 7454ede7..fb188b89 100644 --- a/broker/src/serve_health.rs +++ b/broker/src/serve_health.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, convert::Infallible, marker::PhantomData, sync:: use axum::{extract::{Path, State}, http::StatusCode, response::{sse::{Event, KeepAlive, KeepAliveStream}, Response, Sse}, routing::get, Json, Router}; use axum_extra::{headers::{authorization::Basic, Authorization}, TypedHeader}; use beam_lib::ProxyId; -use futures_core::Stream; +use futures::Stream; use serde::{Serialize, Deserialize}; use shared::{crypto_jwt::Authorized, Msg}; use tokio::sync::{Mutex, OwnedMutexGuard, RwLock}; diff --git a/broker/src/serve_tasks.rs b/broker/src/serve_tasks.rs index 2a0e8d49..74d1668a 100644 --- a/broker/src/serve_tasks.rs +++ b/broker/src/serve_tasks.rs @@ -12,7 +12,7 @@ use axum::{ Json, Router, }; use beam_lib::AppOrProxyId; -use futures_core::{stream, Stream}; +use futures::{stream, Stream}; use serde::Deserialize; use beam_lib::WorkStatus; use shared::{ diff --git a/broker/src/task_manager.rs b/broker/src/task_manager.rs index fded3089..dc6d5a13 100644 --- a/broker/src/task_manager.rs +++ b/broker/src/task_manager.rs @@ -4,7 +4,7 @@ use std::{ use axum::{response::{IntoResponse, sse::Event, Sse}, Json, http::StatusCode}; use dashmap::DashMap; -use futures_core::Stream; +use futures::Stream; use once_cell::sync::Lazy; use serde::Serialize; use serde_json::json; diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index f1b8a0b6..0613818d 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -34,7 +34,7 @@ rsa = "0.9" # Server-sent Events (SSE) support tokio-util = { version = "0.7", features = ["io"] } -futures = "0.3" +futures.workspace = true async-sse = "5.1" async-stream = "0.3" diff --git a/shared/Cargo.toml b/shared/Cargo.toml index 9e84ce61..9302fd6b 100644 --- a/shared/Cargo.toml +++ b/shared/Cargo.toml @@ -20,6 +20,7 @@ clap.workspace = true tokio = { version = "1", features = ["full"] } axum = { version = "0.8", features = [] } bytes = "1.4" +futures.workspace = true # This includes all default features of reqwest but uses native-tls aka openssl as we depend on it for encryption anyways reqwest = { workspace = true, features = ["stream", "native-tls", "charset", "system-proxy", "http2"] } diff --git a/tests/Cargo.toml b/tests/Cargo.toml index b69830ba..c675c5e6 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -14,7 +14,7 @@ anyhow = "1" rand = "0.8" serde = { version = "1", features = ["derive"] } reqwest = { workspace = true, features = ["stream"], default-features = false } -futures = "0.3.28" +futures.workspace = true async-sse = "5.1.0" [features] From e3de5e10937329701e34b89ae77cfced2ecd7375 Mon Sep 17 00:00:00 2001 From: Skiba Jan Date: Mon, 30 Mar 2026 11:32:09 +0200 Subject: [PATCH 2/2] fetch certs in parallel --- shared/src/crypto.rs | 54 +++++++++++++++++++++++--------------------- 1 file changed, 28 insertions(+), 26 deletions(-) diff --git a/shared/src/crypto.rs b/shared/src/crypto.rs index 1140a0d2..2a60960b 100644 --- a/shared/src/crypto.rs +++ b/shared/src/crypto.rs @@ -282,9 +282,15 @@ impl CertificateCache { revoked_certs } - pub async fn update_certificates_mut(&mut self) -> Result { + pub async fn update_certificates_mut( + &mut self, + ) -> Result { debug!("Updating certificates via network ..."); - let certificate_list = CERT_GETTER.get().unwrap().certificate_list_via_network().await?; + let certificate_list = CERT_GETTER + .get() + .unwrap() + .certificate_list_via_network() + .await?; let certificate_revocation_list = CERT_GETTER.get().unwrap().get_crl().await?; // Check if any of the certs in the cache have been revoked let mut revoked_certs = certificate_revocation_list @@ -303,32 +309,28 @@ impl CertificateCache { ); let mut new_count = 0; - //TODO Check for validity - for serial in new_certificate_serials { + let cert_getter = CERT_GETTER.get().unwrap(); + let cert_pems = new_certificate_serials + .iter() + .map(|s| cert_getter.certificate_by_serial_as_pem(s)) + .collect::>() + .await; + for (serial, cert_pem) in new_certificate_serials.into_iter().zip(cert_pems) { debug!("Checking certificate with serial {serial}"); - let certificate = CERT_GETTER - .get() - .unwrap() - .certificate_by_serial_as_pem(serial) - .await; - if let Err(e) = certificate { - match e { - SamplyBeamError::CertificateError(err) => { - debug!("Will skip invalid certificate {serial} from now on."); - self.serial_to_x509 - .insert(serial.clone(), CertificateCacheEntry::Invalid(err)); - } - other_error => { - warn!( - "Could not retrieve certificate for serial {serial}: {}", - other_error - ); - } - }; - continue; - } - let certificate = certificate.unwrap(); + let certificate = match cert_pem { + Err(SamplyBeamError::CertificateError(err)) => { + debug!("Will skip invalid certificate {serial} from now on."); + self.serial_to_x509 + .insert(serial.clone(), CertificateCacheEntry::Invalid(err)); + continue; + }, + Err(other_error) => { + warn!("Could not retrieve certificate for serial {serial}: {other_error}"); + continue; + } + Ok(cert) => cert, + }; let opensslcert = match X509::from_pem(certificate.as_bytes()) { Ok(x) => x, Err(err) => {