From ec4c4635d78b52d019951c1267a1668bdb10a30e Mon Sep 17 00:00:00 2001 From: Kevin Wang Date: Wed, 28 Jan 2026 09:52:10 +0000 Subject: [PATCH] feat(gateway): add per-app connection rate limiting Add configurable maximum concurrent connections per app to prevent a single slow or overloaded app from exhausting system resources and affecting other apps. - Add `max_connections_per_app` config option (default: 2000, 0 = unlimited) - Check connection count before establishing new backend connections - Log warning when rate limit is exceeded --- gateway/gateway.toml | 2 ++ gateway/src/config.rs | 2 ++ gateway/src/proxy/tls_passthough.rs | 36 ++++++++++++++++++++++++++--- gateway/src/proxy/tls_terminate.rs | 3 ++- 4 files changed, 39 insertions(+), 4 deletions(-) diff --git a/gateway/gateway.toml b/gateway/gateway.toml index d3e5816b..7c830b47 100644 --- a/gateway/gateway.toml +++ b/gateway/gateway.toml @@ -70,6 +70,8 @@ app_address_ns_prefix = "_dstack-app-address" app_address_ns_compat = true workers = 32 external_port = 443 +# Maximum concurrent connections per app. 0 means unlimited. +max_connections_per_app = 2000 [core.proxy.timeouts] # Timeout for establishing a connection to the target app. diff --git a/gateway/src/config.rs b/gateway/src/config.rs index 9c81ca8e..eac34f25 100644 --- a/gateway/src/config.rs +++ b/gateway/src/config.rs @@ -85,6 +85,8 @@ pub struct ProxyConfig { pub workers: usize, pub app_address_ns_prefix: String, pub app_address_ns_compat: bool, + /// Maximum concurrent connections per app. 0 means unlimited. + pub max_connections_per_app: u64, } #[derive(Debug, Clone, Deserialize)] diff --git a/gateway/src/proxy/tls_passthough.rs b/gateway/src/proxy/tls_passthough.rs index 6184c1b5..8079f746 100644 --- a/gateway/src/proxy/tls_passthough.rs +++ b/gateway/src/proxy/tls_passthough.rs @@ -2,10 +2,12 @@ // // SPDX-License-Identifier: Apache-2.0 -use anyhow::{Context, Result}; use std::fmt::Debug; +use std::sync::atomic::Ordering; + +use anyhow::{bail, Context, Result}; use tokio::{io::AsyncWriteExt, net::TcpStream, task::JoinSet, time::timeout}; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; use crate::{ main_service::Proxy, @@ -88,11 +90,38 @@ pub(crate) async fn proxy_with_sni( proxy_to_app(state, inbound, buffer, &addr.app_id, addr.port).await } +/// Check if app has reached max connections limit +fn check_connection_limit( + addresses: &AddressGroup, + max_connections: u64, + app_id: &str, +) -> Result<()> { + if max_connections == 0 { + return Ok(()); + } + let total: u64 = addresses + .iter() + .map(|a| a.counter.load(Ordering::Relaxed)) + .sum(); + if total >= max_connections { + warn!( + app_id, + total, max_connections, "app connection limit exceeded" + ); + bail!("app connection limit exceeded: {total}/{max_connections}"); + } + Ok(()) +} + /// connect to multiple hosts simultaneously and return the first successful connection pub(crate) async fn connect_multiple_hosts( addresses: AddressGroup, port: u16, + max_connections: u64, + app_id: &str, ) -> Result<(TcpStream, EnteredCounter)> { + check_connection_limit(&addresses, max_connections, app_id)?; + let mut join_set = JoinSet::new(); for addr in addresses { let counter = addr.counter.enter(); @@ -127,9 +156,10 @@ pub(crate) async fn proxy_to_app( port: u16, ) -> Result<()> { let addresses = state.lock().select_top_n_hosts(app_id)?; + let max_connections = state.config.proxy.max_connections_per_app; let (mut outbound, _counter) = timeout( state.config.proxy.timeouts.connect, - connect_multiple_hosts(addresses.clone(), port), + connect_multiple_hosts(addresses.clone(), port, max_connections, app_id), ) .await .with_context(|| format!("connecting timeout to app {app_id}: {addresses:?}:{port}"))? diff --git a/gateway/src/proxy/tls_terminate.rs b/gateway/src/proxy/tls_terminate.rs index ad19ebf4..025e62e0 100644 --- a/gateway/src/proxy/tls_terminate.rs +++ b/gateway/src/proxy/tls_terminate.rs @@ -318,9 +318,10 @@ impl Proxy { .with_context(|| format!("app {app_id} not found"))?; debug!("selected top n hosts: {addresses:?}"); let tls_stream = self.tls_accept(inbound, buffer, h2).await?; + let max_connections = self.config.proxy.max_connections_per_app; let (outbound, _counter) = timeout( self.config.proxy.timeouts.connect, - connect_multiple_hosts(addresses, port), + connect_multiple_hosts(addresses, port, max_connections, app_id), ) .await .map_err(|_| anyhow!("connecting timeout"))?