Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ beam-lib = { path = "./beam-lib", features = [ "strict-ids" ] }
# Command Line Interface
clap = { version = "4", features = ["env", "derive"] }
reqwest = { version = "0.13", default-features = false }
futures = { version = "0.3" }

[profile.release]
#opt-level = "z" # Optimize for size.
Expand Down
2 changes: 1 addition & 1 deletion broker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ tracing = "0.1"

# Server-sent Events (SSE) support
async-stream = "0.3"
futures-core = { version = "0.3", default-features = false }
futures.workspace = true
once_cell = "1"
# Socket dependencies
bytes = { version = "1", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion broker/src/serve_health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashMap, convert::Infallible, marker::PhantomData, sync::
use axum::{extract::{Path, State}, http::StatusCode, response::{sse::{Event, KeepAlive, KeepAliveStream}, Response, Sse}, routing::get, Json, Router};
use axum_extra::{headers::{authorization::Basic, Authorization}, TypedHeader};
use beam_lib::ProxyId;
use futures_core::Stream;
use futures::Stream;
use serde::{Serialize, Deserialize};
use shared::{crypto_jwt::Authorized, Msg};
use tokio::sync::{Mutex, OwnedMutexGuard, RwLock};
Expand Down
2 changes: 1 addition & 1 deletion broker/src/serve_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use axum::{
Json, Router,
};
use beam_lib::AppOrProxyId;
use futures_core::{stream, Stream};
use futures::{stream, Stream};
use serde::Deserialize;
use beam_lib::WorkStatus;
use shared::{
Expand Down
2 changes: 1 addition & 1 deletion broker/src/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{

use axum::{response::{IntoResponse, sse::Event, Sse}, Json, http::StatusCode};
use dashmap::DashMap;
use futures_core::Stream;
use futures::Stream;
use once_cell::sync::Lazy;
use serde::Serialize;
use serde_json::json;
Expand Down
2 changes: 1 addition & 1 deletion proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ rsa = "0.9"

# Server-sent Events (SSE) support
tokio-util = { version = "0.7", features = ["io"] }
futures = "0.3"
futures.workspace = true
async-sse = "5.1"
async-stream = "0.3"

Expand Down
1 change: 1 addition & 0 deletions shared/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ clap.workspace = true
tokio = { version = "1", features = ["full"] }
axum = { version = "0.8", features = [] }
bytes = "1.4"
futures.workspace = true

# This includes all default features of reqwest but uses native-tls aka openssl as we depend on it for encryption anyways
reqwest = { workspace = true, features = ["stream", "native-tls", "charset", "system-proxy", "http2"] }
Expand Down
54 changes: 28 additions & 26 deletions shared/src/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,15 @@ impl CertificateCache {
revoked_certs
}

pub async fn update_certificates_mut(&mut self) -> Result<CertificateCacheUpdate, SamplyBeamError> {
pub async fn update_certificates_mut(
&mut self,
) -> Result<CertificateCacheUpdate, SamplyBeamError> {
debug!("Updating certificates via network ...");
let certificate_list = CERT_GETTER.get().unwrap().certificate_list_via_network().await?;
let certificate_list = CERT_GETTER
.get()
.unwrap()
.certificate_list_via_network()
.await?;
let certificate_revocation_list = CERT_GETTER.get().unwrap().get_crl().await?;
// Check if any of the certs in the cache have been revoked
let mut revoked_certs = certificate_revocation_list
Expand All @@ -303,32 +309,28 @@ impl CertificateCache {
);

let mut new_count = 0;
//TODO Check for validity
for serial in new_certificate_serials {
let cert_getter = CERT_GETTER.get().unwrap();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this hold an exclusive lock? If yes, please introduce an additional scope. If not, all good.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It just gets a shared ref from the OnceCell which should be set at this point. No mutex involved as all methods on dyn GetCerts only need a shared ref.

let cert_pems = new_certificate_serials
.iter()
.map(|s| cert_getter.certificate_by_serial_as_pem(s))
.collect::<futures::future::JoinAll<_>>()
.await;
for (serial, cert_pem) in new_certificate_serials.into_iter().zip(cert_pems) {
debug!("Checking certificate with serial {serial}");

let certificate = CERT_GETTER
.get()
.unwrap()
.certificate_by_serial_as_pem(serial)
.await;
if let Err(e) = certificate {
match e {
SamplyBeamError::CertificateError(err) => {
debug!("Will skip invalid certificate {serial} from now on.");
self.serial_to_x509
.insert(serial.clone(), CertificateCacheEntry::Invalid(err));
}
other_error => {
warn!(
"Could not retrieve certificate for serial {serial}: {}",
other_error
);
}
};
continue;
}
let certificate = certificate.unwrap();
let certificate = match cert_pem {
Err(SamplyBeamError::CertificateError(err)) => {
debug!("Will skip invalid certificate {serial} from now on.");
self.serial_to_x509
.insert(serial.clone(), CertificateCacheEntry::Invalid(err));
continue;
},
Err(other_error) => {
warn!("Could not retrieve certificate for serial {serial}: {other_error}");
continue;
}
Ok(cert) => cert,
};
let opensslcert = match X509::from_pem(certificate.as_bytes()) {
Ok(x) => x,
Err(err) => {
Expand Down
2 changes: 1 addition & 1 deletion tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ anyhow = "1"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
reqwest = { workspace = true, features = ["stream"], default-features = false }
futures = "0.3.28"
futures.workspace = true
async-sse = "5.1.0"

[features]
Expand Down
Loading