diff --git a/Cargo.lock b/Cargo.lock index 8f8d245..84f57de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -173,7 +173,7 @@ dependencies = [ "axum-core", "bytes", "futures-util", - "http 1.3.1", + "http", "http-body", "http-body-util", "itoa", @@ -198,7 +198,7 @@ checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6" dependencies = [ "bytes", "futures-core", - "http 1.3.1", + "http", "http-body", "http-body-util", "mime", @@ -645,7 +645,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http 1.3.1", + "http", "indexmap", "slab", "tokio", @@ -665,17 +665,6 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "http" -version = "0.0.0" -dependencies = [ - "env_logger", - "log", - "serde_json", - "tokio", - "tucana", -] - [[package]] name = "http" version = "1.3.1" @@ -694,7 +683,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http 1.3.1", + "http", ] [[package]] @@ -705,7 +694,7 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http 1.3.1", + "http", "http-body", "pin-project-lite", ] @@ -724,20 +713,22 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.6.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" +checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" dependencies = [ + "atomic-waker", "bytes", "futures-channel", - "futures-util", + "futures-core", "h2", - "http 1.3.1", + "http", "http-body", "httparse", "httpdate", "itoa", "pin-project-lite", + "pin-utils", "smallvec", "tokio", "want", @@ -758,19 +749,20 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.11" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497bbc33a26fdd4af9ed9c70d63f61cf56a938375fbb32df34db9b1cd6d643f2" +checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" dependencies = [ "bytes", "futures-channel", + "futures-core", "futures-util", - "http 1.3.1", + "http", "http-body", "hyper", "libc", "pin-project-lite", - "socket2 0.5.9", + "socket2", "tokio", "tower-service", "tracing", @@ -1404,8 +1396,11 @@ dependencies = [ "anyhow", "base", "code0-flow", - "http 0.0.0", + "http-body-util", + "hyper", + "hyper-util", "log", + "prost", "regex", "serde_json", "tokio", @@ -1691,16 +1686,6 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" -[[package]] -name = "socket2" -version = "0.5.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f5fd57c80058a56cf5c777ab8a126398ece8e442983605d280a44ce79d0edef" -dependencies = [ - "libc", - "windows-sys 0.52.0", -] - [[package]] name = "socket2" version = "0.6.0" @@ -1846,7 +1831,7 @@ dependencies = [ "mio", "pin-project-lite", "signal-hook-registry", - "socket2 0.6.0", + "socket2", "tokio-macros", "windows-sys 0.61.2", ] @@ -1907,7 +1892,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", - "http 1.3.1", + "http", "httparse", "rand", "ring", @@ -1929,7 +1914,7 @@ dependencies = [ "base64", "bytes", "h2", - "http 1.3.1", + "http", "http-body", "http-body-util", "hyper", @@ -1937,7 +1922,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "socket2 0.6.0", + "socket2", "sync_wrapper", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index 6b24365..a8986fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["crates/http", "adapter/rest", "crates/base"] +members = ["adapter/rest", "crates/base"] resolver = "3" [workspace.package] @@ -23,8 +23,5 @@ prost = "0.14.0" tonic-health = "0.14.0" futures-lite = "2.6.1" -[workspace.dependencies.http] -path = "../draco/crates/http" - [workspace.dependencies.base] path = "../draco/crates/base" diff --git a/adapter/rest/Cargo.toml b/adapter/rest/Cargo.toml index 9c41825..5c56b26 100644 --- a/adapter/rest/Cargo.toml +++ b/adapter/rest/Cargo.toml @@ -4,7 +4,6 @@ version = "0.1.0" edition.workspace = true [dependencies] -http = { workspace = true } code0-flow = { workspace = true } tokio = { workspace = true } tucana = { workspace = true } @@ -14,3 +13,7 @@ regex = { workspace = true } tonic = { workspace = true } base = { workspace = true } anyhow = { workspace = true } +hyper-util = "0.1.19" +hyper = "1.8.1" +http-body-util = "0.1.3" +prost = { workspace = true } diff --git a/adapter/rest/src/config.rs b/adapter/rest/src/config.rs new file mode 100644 index 0000000..e946c27 --- /dev/null +++ b/adapter/rest/src/config.rs @@ -0,0 +1,17 @@ +use base::traits::LoadConfig; +use code0_flow::flow_config::env_with_default; + +#[derive(Clone)] +pub struct HttpServerConfig { + pub port: u16, + pub host: String, +} + +impl LoadConfig for HttpServerConfig { + fn load() -> Self { + Self { + port: env_with_default("HTTP_SERVER_PORT", 8080), + host: env_with_default("HTTP_SERVER_HOST", String::from("127.0.0.1")), + } + } +} diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 060ec62..847a0eb 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -1,24 +1,43 @@ use base::{ runner::{ServerContext, ServerRunner}, store::FlowIdentifyResult, - traits::{IdentifiableFlow, LoadConfig, Server as ServerTrait}, + traits::Server as ServerTrait, }; -use code0_flow::flow_config::env_with_default; -use http::{request::HttpRequest, response::HttpResponse, server::Server}; +use http_body_util::{BodyExt, Full}; +use hyper::{Request, Response}; +use hyper::{ + StatusCode, + body::{Bytes, Incoming}, +}; +use hyper::{ + header::{HeaderName, HeaderValue}, + server::conn::http1, +}; +use hyper_util::rt::TokioIo; use std::collections::HashMap; +use std::convert::Infallible; +use std::net::SocketAddr; use std::sync::Arc; +use tokio::net::TcpListener; use tonic::async_trait; -use tucana::shared::value::Kind; use tucana::shared::value::Kind::StructValue; +use tucana::shared::{ListValue, value::Kind}; use tucana::shared::{Struct, ValidationFlow, Value}; +mod config; +mod route; + #[tokio::main] async fn main() { - let server = HttpServer { http_server: None }; + let server = HttpServer { + shutdown_tx: None, + addr: None, + }; let runner = match ServerRunner::new(server).await { Ok(runner) => runner, Err(err) => panic!("Failed to create server runner: {:?}", err), }; + log::info!("Successfully created runner for http service"); match runner.serve().await { Ok(_) => (), Err(err) => panic!("Failed to start server runner: {:?}", err), @@ -26,182 +45,312 @@ async fn main() { } struct HttpServer { - http_server: Option, + shutdown_tx: Option>, + addr: Option, } -struct RequestRoute { - url: String, +fn json_error(status: StatusCode, msg: &str) -> Response> { + let body = format!(r#"{{"error": "{}"}}"#, msg); + Response::builder() + .status(status) + .header("content-type", "application/json") + .body(Full::new(Bytes::from(body))) + .unwrap() } -impl IdentifiableFlow for RequestRoute { - fn identify(&self, flow: &ValidationFlow) -> bool { - let regex_str = flow - .settings - .iter() - .find(|s| s.flow_setting_id == "HTTP_URL") - .and_then(|s| s.value.as_ref()) - .and_then(|v| v.kind.as_ref()) - .and_then(|k| match k { - Kind::StringValue(s) => Some(s.as_str()), - _ => None, - }); - - let Some(regex_str) = regex_str else { - return false; - }; - - print!( - "Comparing regex {} with literal route: {}", - regex_str, self.url - ); - - match regex::Regex::new(regex_str) { - Ok(regex) => regex.is_match(&self.url), - Err(err) => { - log::error!("Failed to compile regex: {}", err); - false - } - } - } -} +fn build_response( + status: StatusCode, + headers: HashMap, + body: Vec, +) -> Response> { + let mut builder = Response::builder().status(status); -#[async_trait] -impl ServerTrait for HttpServer { - async fn init(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { - log::info!("Initializing http server"); - self.http_server = Some(Server::new( - ctx.server_config.host.clone(), - ctx.server_config.port, - )); - Ok(()) - } + { + let h = builder.headers_mut().unwrap(); + for (k, v) in headers { + let name = match HeaderName::from_bytes(k.as_bytes()) { + Ok(n) => n, + Err(_) => { + log::warn!("Dropping invalid header name: {}", k); + continue; + } + }; - async fn run(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { - if let Some(server) = &mut self.http_server { - log::info!("Running http server"); - server.register_async_closure({ - let store = Arc::clone(&ctx.adapter_store); - move |request: HttpRequest| { - let store = Arc::clone(&store); - async move { - //Get slug => host/slug/real_path - - let splits: Vec<_> = request.path.split("/").collect(); - let first = splits.first(); - - if let Some(slug) = first { - let pattern = format!("REST.{}.*", slug); - let route = RequestRoute { - url: request.path.clone(), - }; - - match store.get_possible_flow_match(pattern, route).await { - FlowIdentifyResult::Single(flow) => { - print!("Found flow: {}", flow.flow_id); - execute_flow(flow, request, store).await - } - _ => Some(HttpResponse::internal_server_error( - format!("No flow found for path: {}", request.path), - HashMap::new(), - )), - } - } else { - Some(HttpResponse::internal_server_error( - format!("No flow found for path: {}", request.path), - HashMap::new(), - )) - } - } + let value = match HeaderValue::from_str(&v) { + Ok(v) => v, + Err(_) => { + log::warn!("Dropping invalid header value for {}: {:?}", k, v); + continue; } - }); + }; - server.start().await; + h.insert(name, value); } - Ok(()) } - async fn shutdown(&mut self, _ctx: &ServerContext) -> anyhow::Result<()> { - if let Some(server) = &self.http_server { - server.shutdown(); - } - Ok(()) - } + builder.body(Full::new(Bytes::from(body))).unwrap() } -async fn execute_flow( +async fn execute_flow_to_hyper_response( flow: ValidationFlow, - request: HttpRequest, + body: Vec, store: Arc, -) -> Option { - match store.validate_and_execute_flow(flow, request.body).await { +) -> Response> { + let value: Option = if body.is_empty() { + None + } else { + match prost::Message::decode(body.as_slice()) { + Ok(v) => Some(v), + Err(e) => { + log::warn!("Failed to decode request body as protobuf Value: {}", e); + return json_error( + StatusCode::BAD_REQUEST, + "Failed to decode request body as protobuf Value", + ); + } + } + }; + + match store.validate_and_execute_flow(flow, value).await { Some(result) => { + log::debug!("Received Result: {:?}", result); let Value { kind: Some(StructValue(Struct { fields })), } = result else { - return None; + return json_error( + StatusCode::INTERNAL_SERVER_ERROR, + "Flow result was not a struct", + ); }; - let Some(headers) = fields.get("headers") else { - return None; + let Some(headers_val) = fields.get("headers") else { + return json_error( + StatusCode::INTERNAL_SERVER_ERROR, + "Flow result missing headers", + ); }; - - let Some(status_code) = fields.get("status_code") else { - return None; + let Some(status_code_val) = fields.get("status_code") else { + return json_error( + StatusCode::INTERNAL_SERVER_ERROR, + "Flow result missing status_code", + ); }; - - let Some(payload) = fields.get("payload") else { - return None; + let Some(payload_val) = fields.get("payload") else { + return json_error( + StatusCode::INTERNAL_SERVER_ERROR, + "Flow result missing payload", + ); }; + // headers struct let Value { kind: - Some(StructValue(Struct { - fields: header_fields, + Some(Kind::ListValue(ListValue { + values: header_fields, })), - } = headers + } = headers_val else { - return None; + return json_error( + StatusCode::INTERNAL_SERVER_ERROR, + "headers was not a list of header entries", + ); }; + let http_headers: HashMap = header_fields .iter() - .filter_map(|(k, v)| { - let value = match &v.kind { - Some(Kind::StringValue(s)) if !s.is_empty() => s.clone(), - _ => return None, + .filter_map(|x| { + if let Value { + kind: Some(StructValue(Struct { fields: f })), + } = x + { + Some(f) + } else { + None + } + }) + .filter_map(|f| { + let key = match f.get("key") { + Some(value) => { + if let Value { + kind: Some(Kind::StringValue(x)), + } = value + { + x + } else { + return None; + } + } + None => return None, + }; + let value = match f.get("value") { + Some(value) => { + if let Value { + kind: Some(Kind::StringValue(x)), + } = value + { + x + } else { + return None; + } + } + None => return None, }; - Some((k.clone(), value)) + Some((key.clone(), value.clone())) }) .collect(); - let json = serde_json::to_vec_pretty(&payload).unwrap_or_else(|err| { - format!(r#"{{"error": "Serialization failed: {}"}}"#, err).into_bytes() + // status_code number + let Some(Kind::NumberValue(code)) = status_code_val.kind else { + return json_error( + StatusCode::INTERNAL_SERVER_ERROR, + "status_code was not a number", + ); + }; + + // payload -> json bytes + let json = serde_json::to_vec_pretty(payload_val).unwrap_or_else(|err| { + format!(r#"{{"error":"Serialization failed: {}"}}"#, err).into_bytes() }); - let Some(Kind::NumberValue(code)) = status_code.kind else { - return None; - }; - Some(HttpResponse::new(code as u16, http_headers.clone(), json)) + let status = + StatusCode::from_u16(code as u16).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + build_response(status, http_headers, json) + } + None => { + log::error!("flow execution failed"); + json_error(StatusCode::INTERNAL_SERVER_ERROR, "Flow execution failed") } - None => Some(HttpResponse::internal_server_error( - "Flow execution failed".to_string(), - HashMap::new(), - )), } } -#[derive(Clone)] -struct HttpServerConfig { - port: u16, - host: String, +pub async fn handle_request( + req: Request, + store: Arc, +) -> Result>, Infallible> { + let method = req.method().clone(); + let path = req.uri().path().to_string(); + + // Read full body + let body_bytes = match BodyExt::collect(req.into_body()).await { + Ok(collected) => collected.to_bytes().to_vec(), + Err(err) => { + log::error!("Failed to read request body: {}", err); + return Ok(json_error( + StatusCode::BAD_REQUEST, + "Failed to read request body", + )); + } + }; + + // slug matching + let Some(slug) = route::extract_slug_from_path(&path) else { + return Ok(json_error(StatusCode::BAD_REQUEST, "Missing slug in path")); + }; + + let pattern = format!("REST.{}.*", slug); + let route = route::RequestRoute { + url: path.clone(), + method, + }; + + let resp = match store.get_possible_flow_match(pattern, route).await { + FlowIdentifyResult::Single(flow) => { + execute_flow_to_hyper_response(flow, body_bytes, store).await + } + _ => json_error(StatusCode::NOT_FOUND, "No flow found for path"), + }; + + Ok(resp) } -impl LoadConfig for HttpServerConfig { - fn load() -> Self { - Self { - port: env_with_default("HTTP_SERVER_PORT", 8082), - host: env_with_default("HTTP_SERVER_HOST", String::from("0.0.0.0")), +#[async_trait] +impl ServerTrait for HttpServer { + async fn init(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { + log::info!("Initializing http server"); + + let (shutdown_tx, _) = tokio::sync::broadcast::channel(1); + self.shutdown_tx = Some(shutdown_tx); + + let bind = format!("{}:{}", ctx.server_config.host, ctx.server_config.port); + + self.addr = Some( + bind.parse::() + .map_err(|e| anyhow::anyhow!("Invalid bind address '{}': {}", bind, e))?, + ); + + log::debug!("Initialized with Address: {:?}", self.addr); + Ok(()) + } + + async fn run(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { + let addr = self + .addr + .expect("cannot start tcp listener with empty address"); + + let listener = TcpListener::bind(addr) + .await + .map_err(|e| anyhow::anyhow!("failed to bind {addr}: {e}"))?; + + // Create a receiver for this run loop + let shutdown_tx = self + .shutdown_tx + .as_ref() + .expect("shutdown_tx not initialized; init() must run first") + .clone(); + let mut shutdown_rx = shutdown_tx.subscribe(); + + loop { + let (stream, _) = tokio::select! { + _ = shutdown_rx.recv() => { + log::info!("HTTP server: shutdown received, stopping accept loop"); + break; + } + res = listener.accept() => { + res.map_err(|e| anyhow::anyhow!("accept failed: {e}"))? + } + }; + + let io = TokioIo::new(stream); + let store = Arc::clone(&ctx.adapter_store); + + let mut conn_shutdown_rx = shutdown_tx.subscribe(); + + tokio::spawn(async move { + let svc = hyper::service::service_fn(move |req| { + let store = Arc::clone(&store); + async move { handle_request(req, store).await } + }); + + let conn = http1::Builder::new().serve_connection(io, svc); + + tokio::pin!(conn); + + tokio::select! { + res = conn.as_mut() => { + if let Err(err) = res { + log::error!("Error serving connection: {:?}", err); + } + } + _ = conn_shutdown_rx.recv() => { + conn.as_mut().graceful_shutdown(); + } + } + }); } + + Ok(()) + } + async fn shutdown( + &mut self, + _ctx: &ServerContext, + ) -> anyhow::Result<()> { + if let Some(ref tx) = self.shutdown_tx { + log::info!("Received a shutdown signal for Adapter Server"); + let _ = tx.send(()); + } + + Ok(()) } } diff --git a/adapter/rest/src/route.rs b/adapter/rest/src/route.rs new file mode 100644 index 0000000..b36e615 --- /dev/null +++ b/adapter/rest/src/route.rs @@ -0,0 +1,85 @@ +use base::traits::IdentifiableFlow; +use tucana::shared::{ValidationFlow, value::Kind}; + +pub struct RequestRoute { + pub url: String, + pub method: hyper::Method, +} + +// Checks if the Method and Url matches any of the +// Flows that matched the original slug pattern for project +// Only if both matched, it will return true +impl IdentifiableFlow for RequestRoute { + fn identify(&self, flow: &ValidationFlow) -> bool { + // Get Method of the FlowSetting + let method_str = flow + .settings + .iter() + .find(|s| s.flow_setting_id == "HTTP_METHOD") + .and_then(|s| s.value.as_ref()) + .and_then(|v| v.kind.as_ref()) + .and_then(|k| match k { + Kind::StringValue(s) => Some(s.as_str()), + _ => None, + }); + + log::debug!( + "Comparing flows method: {:?} with request route: {}", + method_str, + self.method.as_str() + ); + + if let Some(method) = method_str { + if method != self.method.as_str() { + log::debug!("Method didn't eq"); + return false; + } + } else { + log::debug!("Method didn't eq"); + return false; + } + // Get URL of the FlowSetting + let regex_str_v = flow + .settings + .iter() + .find(|s| s.flow_setting_id == "HTTP_URL"); + + log::debug!("Extracted: {:?} as HTTP_URL", ®ex_str_v); + + let regex_str = regex_str_v + .and_then(|s| s.value.as_ref()) + .and_then(|v| v.kind.as_ref()) + .and_then(|k| match k { + Kind::StringValue(s) => Some(s.as_str()), + _ => None, + }); + + let Some(regex_str) = regex_str else { + log::debug!("Regex was empty"); + return false; + }; + + log::debug!( + "Comparing regex {} with literal route: {}", + regex_str, + self.url + ); + + // Check if the request is matching + match regex::Regex::new(regex_str) { + Ok(regex) => { + log::debug!("Successfully compiled regex: {}", regex_str); + regex.is_match(&self.url) + } + Err(err) => { + log::error!("Failed to compile regex: {}", err); + false + } + } + } +} + +pub fn extract_slug_from_path(path: &str) -> Option<&str> { + let trimmed = path.trim_start_matches('/'); + trimmed.split('/').next().filter(|s| !s.is_empty()) +} diff --git a/crates/base/src/store.rs b/crates/base/src/store.rs index 380d85e..f222d35 100644 --- a/crates/base/src/store.rs +++ b/crates/base/src/store.rs @@ -131,9 +131,15 @@ impl AdapterStore { } let uuid = uuid::Uuid::new_v4().to_string(); + let flow_id = flow.flow_id; let execution_flow: ExecutionFlow = Self::convert_validation_flow(flow, input_value); let bytes = execution_flow.encode_to_vec(); let topic = format!("execution.{}", uuid); + log::info!( + "Requesting execution of flow {} with execution id {}", + flow_id, + uuid + ); let result = self.client.request(topic, bytes.into()).await; match result { diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml deleted file mode 100644 index 91a13dc..0000000 --- a/crates/http/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "http" -version.workspace = true -edition.workspace = true - -[dependencies] -tucana = { workspace = true } -serde_json = { workspace = true } -log = { workspace = true } -env_logger = { workspace = true } -tokio = { workspace = true } diff --git a/crates/http/src/lib.rs b/crates/http/src/lib.rs deleted file mode 100644 index 4a101cf..0000000 --- a/crates/http/src/lib.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod request; -pub mod response; -pub mod server; diff --git a/crates/http/src/request.rs b/crates/http/src/request.rs deleted file mode 100644 index b770771..0000000 --- a/crates/http/src/request.rs +++ /dev/null @@ -1,309 +0,0 @@ -use super::response::HttpResponse; -use std::{ - collections::HashMap, - io::{BufRead, BufReader, Read}, - net::TcpStream, - str::FromStr, - usize, -}; -use tucana::shared::{Struct, Value, helper::value::from_json_value, value::Kind}; - -#[derive(Debug, Clone)] -pub enum HttpOption { - GET, - POST, - PUT, - DELETE, -} - -impl FromStr for HttpOption { - type Err = (); - - fn from_str(s: &str) -> Result { - match s { - "GET" => Ok(HttpOption::GET), - "POST" => Ok(HttpOption::POST), - "PUT" => Ok(HttpOption::PUT), - "DELETE" => Ok(HttpOption::DELETE), - _ => Err(()), - } - } -} - -impl ToString for HttpOption { - fn to_string(&self) -> String { - match self { - HttpOption::GET => "GET".to_string(), - HttpOption::POST => "POST".to_string(), - HttpOption::PUT => "PUT".to_string(), - HttpOption::DELETE => "DELETE".to_string(), - } - } -} - -#[derive(Debug, Clone)] -pub struct HeaderMap { - pub fields: HashMap, -} - -impl Default for HeaderMap { - fn default() -> Self { - Self::new() - } -} - -impl HeaderMap { - pub fn new() -> Self { - HeaderMap { - fields: HashMap::new(), - } - } - - /// Create a new HeaderMap from a vector of strings. - /// - /// Each string should be in the format "key: value". - /// - /// # Examples - /// - /// ``` - /// use http::request::HeaderMap; - /// - /// let header = vec![ - /// "Content-Type: application/json".to_string(), - /// "User-Agent: Mozilla/5.0".to_string(), - /// ]; - /// let header_map = HeaderMap::from_vec(header); - /// assert_eq!(header_map.get("content-type"), Some(&"application/json".to_string())); - /// assert_eq!(header_map.get("user-agent"), Some(&"mozilla/5.0".to_string())); - /// ``` - pub fn from_vec(header: Vec) -> Self { - let mut header_map = HeaderMap::new(); - - for param in header { - let mut parts = param.split(": "); - let key = match parts.next() { - Some(key) => key.to_lowercase(), - None => continue, - }; - let value = match parts.next() { - Some(value) => value.to_lowercase(), - None => continue, - }; - - header_map.add(key, value); - } - - header_map - } - - #[inline] - pub fn add(&mut self, key: String, value: String) { - self.fields.insert(key, value); - } - - #[inline] - pub fn get(&self, key: &str) -> Option<&String> { - self.fields.get(key) - } -} - -#[derive(Debug, Clone)] -pub struct HttpRequest { - pub method: HttpOption, - pub path: String, - pub version: String, - pub host: String, - pub headers: HeaderMap, - - /// The body of the request. - /// - /// # Example - /// If the url was called: - /// - /// url: .../api/users/123/posts/456?filter=recent&sort=asc - /// - /// from the regex: "^/api/users/(?P\d+)/posts/(?P\d+)(\?.*)?$" - /// - /// With the request body: - /// - /// ```json - /// { - /// "first": 2, - /// "second": 300 - /// } - /// ``` - /// The equivalent HTTP request body will look like: - /// ```json - /// { - /// "url": { - /// "user_id": "123", - /// "post_id": "456", - /// }, - /// "query": { - /// "filter": "recent", - /// "sort": "asc" - /// }, - /// "body": { - /// "first": "1", - /// "second": "2" - /// } - /// } - /// ``` - pub body: Option, -} - -pub fn convert_to_http_request(stream: &TcpStream) -> Result { - let mut buf_reader = BufReader::new(stream); - let mut raw_http_request: Vec = Vec::new(); - let mut line = String::new(); - - // Read headers until empty line - while let Ok(bytes) = buf_reader.read_line(&mut line) { - if bytes == 0 || line.trim().is_empty() { - break; - } - - raw_http_request.push(line.trim().to_string()); - line.clear(); - } - - println!("{:?}", &raw_http_request); - // Parse headers - let http_request = parse_request(raw_http_request, buf_reader)?; - - log::debug!("Received HTTP Request: {:?}", &http_request); - - if http_request.version != "HTTP/1.1" { - return Err(HttpResponse::not_implemented( - "The HTTP version is not supported".to_string(), - HashMap::new(), - )); - } - - Ok(http_request) -} - -#[inline] -fn parse_request( - raw_http_request: Vec, - mut buf_reader: BufReader<&TcpStream>, -) -> Result { - let params = &raw_http_request[0]; - let headers = raw_http_request[1..raw_http_request.len()].to_vec(); - let header_map = HeaderMap::from_vec(headers); - - if params.is_empty() { - return Err(HttpResponse::bad_request( - "Empty HTTP request line".to_string(), - HashMap::new(), - )); - } - - let mut header_params = params.split(" "); - let raw_method = header_params.next().ok_or_else(|| { - HttpResponse::bad_request("Missing HTTP method".to_string(), HashMap::new()) - })?; - let path = header_params.next().ok_or_else(|| { - HttpResponse::bad_request("Missing request path".to_string(), HashMap::new()) - })?; - let version = header_params.next().ok_or_else(|| { - HttpResponse::bad_request("Missing HTTP version".to_string(), HashMap::new()) - })?; - - let method = match HttpOption::from_str(raw_method) { - Ok(method) => method, - Err(_) => { - return Err(HttpResponse::method_not_allowed( - format!("Unsupported HTTP method: {}", raw_method), - HashMap::new(), - )); - } - }; - - let mut body_values: HashMap = HashMap::new(); - - if let Some(content_length) = header_map.get("content-length") { - let size: usize = match content_length.parse() { - Ok(len) => len, - Err(_) => { - return Err(HttpResponse::bad_request( - "Invalid content-length header".to_string(), - HashMap::new(), - )); - } - }; - - let mut body = vec![0; size]; - if buf_reader.read_exact(&mut body).is_ok() - && let Ok(json_value) = serde_json::from_slice::(&body) - { - body_values.insert("body".to_string(), from_json_value(json_value)); - } - }; - - if path.contains("?") { - let mut fields: HashMap = HashMap::new(); - if let Some((_, query)) = path.split_once("?") { - let values = query.split("&"); - - for value in values { - let mut parts = value.split("="); - let key = match parts.next() { - Some(key) => key.to_string(), - None => continue, - }; - - let value = match parts.next() { - Some(value) => value.to_string(), - None => continue, - }; - - fields.insert( - key, - Value { - kind: Some(Kind::StringValue(value)), - }, - ); - } - }; - - if !fields.is_empty() { - let value = Value { - kind: Some(Kind::StructValue(Struct { fields })), - }; - - body_values.insert("query".to_string(), value); - } - }; - - let body = if !body_values.is_empty() { - Some(Value { - kind: Some(Kind::StructValue(Struct { - fields: body_values, - })), - }) - } else { - None - }; - - let host = { - match header_map.get("host") { - Some(host) => host.clone(), - None => { - return Err(HttpResponse::bad_request( - "Missing Host in Headers!".to_string(), - HashMap::new(), - )); - } - } - }; - - Ok(HttpRequest { - method, - path: path.to_string(), - version: version.to_string(), - host, - headers: header_map, - body, - }) -} diff --git a/crates/http/src/response.rs b/crates/http/src/response.rs deleted file mode 100644 index 976c5a3..0000000 --- a/crates/http/src/response.rs +++ /dev/null @@ -1,275 +0,0 @@ -pub struct HttpResponse { - pub status_code: u16, - pub headers: std::collections::HashMap, - pub body: Vec, -} - -impl HttpResponse { - pub fn new( - status_code: u16, - headers: std::collections::HashMap, - body: Vec, - ) -> Self { - Self { - status_code, - headers, - body, - } - } - - pub fn to_bytes(&self) -> Vec { - let status_line = format!( - "HTTP/1.1 {} {}\r\n", - self.status_code, - status_text(self.status_code) - ); - - let mut headers_str = String::new(); - for (key, value) in &self.headers { - headers_str.push_str(&format!("{}: {}\r\n", key, value)); - } - - let mut response = Vec::new(); - response.extend_from_slice(status_line.as_bytes()); - response.extend_from_slice(headers_str.as_bytes()); - response.extend_from_slice(b"\r\n"); - response.extend_from_slice(&self.body); - - response - } - - // 2xx Success responses - pub fn ok(body: Vec, mut headers: std::collections::HashMap) -> Self { - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(200, headers, body) - } - - pub fn created(body: Vec, mut headers: std::collections::HashMap) -> Self { - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(201, headers, body) - } - - pub fn accepted(body: Vec, mut headers: std::collections::HashMap) -> Self { - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(202, headers, body) - } - - pub fn no_content(headers: std::collections::HashMap) -> Self { - Self::new(204, headers, Vec::new()) - } - - // 4xx Client Errors - pub fn bad_request( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(400, headers, body) - } - - pub fn unauthorized( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(401, headers, body) - } - - pub fn forbidden( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(403, headers, body) - } - - pub fn not_found( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(404, headers, body) - } - - pub fn method_not_allowed( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(405, headers, body) - } - - pub fn conflict(error: String, mut headers: std::collections::HashMap) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(409, headers, body) - } - - pub fn gone(error: String, mut headers: std::collections::HashMap) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(410, headers, body) - } - - pub fn unprocessable_entity( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(422, headers, body) - } - - pub fn too_many_requests( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(429, headers, body) - } - - // 5xx Server Errors - pub fn internal_server_error( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(500, headers, body) - } - - pub fn not_implemented( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(501, headers, body) - } - - pub fn bad_gateway( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(502, headers, body) - } - - pub fn service_unavailable( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(503, headers, body) - } - - pub fn gateway_timeout( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(504, headers, body) - } - - // Check if response is successful (2xx status code) - pub fn is_success(&self) -> bool { - self.status_code >= 200 && self.status_code < 300 - } - - // Check if response is client error (4xx status code) - pub fn is_client_error(&self) -> bool { - self.status_code >= 400 && self.status_code < 500 - } - - // Check if response is server error (5xx status code) - pub fn is_server_error(&self) -> bool { - self.status_code >= 500 && self.status_code < 600 - } -} - -// Helper function to get status text from status code -fn status_text(status_code: u16) -> &'static str { - match status_code { - 100 => "Continue", - 101 => "Switching Protocols", - 200 => "OK", - 201 => "Created", - 202 => "Accepted", - 204 => "No Content", - 400 => "Bad Request", - 401 => "Unauthorized", - 403 => "Forbidden", - 404 => "Not Found", - 405 => "Method Not Allowed", - 409 => "Conflict", - 410 => "Gone", - 422 => "Unprocessable Entity", - 429 => "Too Many Requests", - 500 => "Internal Server Error", - 501 => "Not Implemented", - 502 => "Bad Gateway", - 503 => "Service Unavailable", - 504 => "Gateway Timeout", - _ => "Unknown Status", - } -} diff --git a/crates/http/src/server.rs b/crates/http/src/server.rs deleted file mode 100644 index 0c22a78..0000000 --- a/crates/http/src/server.rs +++ /dev/null @@ -1,253 +0,0 @@ -use crate::{ - request::{HeaderMap, HttpRequest}, - response::HttpResponse, -}; -use std::{future::Future, net::TcpListener, pin::Pin, sync::Arc}; - -// Handler trait for asynchronous request handling only -pub trait AsyncHandler: Send + Sync + 'static { - fn handle( - &self, - request: HttpRequest, - ) -> Pin> + Send + 'static>>; -} - -// Implement AsyncHandler for async closures -impl AsyncHandler for F -where - F: Fn(HttpRequest) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, -{ - fn handle( - &self, - request: HttpRequest, - ) -> Pin> + Send + 'static>> { - Box::pin(self(request)) - } -} - -pub struct Server { - host: String, - port: u16, - handlers: Arc>>, - shutdown_tx: Option>, -} - -impl Server { - pub fn new(host: String, port: u16) -> Self { - Server { - host, - port, - handlers: Arc::new(Vec::new()), - shutdown_tx: None, - } - } - - /// Register an async handler - pub fn register_handler(&mut self, handler: H) - where - H: AsyncHandler, - { - let handlers = - Arc::get_mut(&mut self.handlers).expect("Cannot register handler after server start"); - handlers.push(Box::new(handler)); - } - - /// Register an async closure as a handler - pub fn register_async_closure(&mut self, closure: F) - where - F: Fn(HttpRequest) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, - { - self.register_handler(closure); - } - - pub async fn start(&mut self) { - let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel(1); - self.shutdown_tx = Some(shutdown_tx); - - self.run_server(&mut shutdown_rx).await; - } - - pub fn shutdown(&self) { - if let Some(ref tx) = self.shutdown_tx { - let _ = tx.send(()); - } - } - - async fn run_server(&self, shutdown_rx: &mut tokio::sync::broadcast::Receiver<()>) { - let url = format!("{}:{}", self.host, self.port); - log::info!("Starting http server on {}", &url); - let listener = match TcpListener::bind(&url) { - Ok(listener) => { - listener - .set_nonblocking(true) - .expect("Failed to set non-blocking"); - listener - } - Err(err) => panic!("Failed to bind to {}: {}", url, err), - }; - - let async_listener = - tokio::net::TcpListener::from_std(listener).expect("Failed to create async listener"); - - loop { - tokio::select! { - _ = shutdown_rx.recv() => { - log::info!("Shutdown signal received, stopping server"); - break; - } - stream_result = async_listener.accept() => { - let (stream, _) = match stream_result { - Ok(connection) => connection, - Err(err) => { - log::error!("Failed to accept incoming connection: {}", err); - continue; - } - }; - - let handlers = self.handlers.clone(); - - tokio::spawn(async move { - println!("New connection accepted, starting to read request..."); - - // Read HTTP request data using tokio's async methods - use tokio::io::{AsyncBufReadExt, BufReader}; - - let mut buf_reader = BufReader::new(stream); - let mut raw_http_request: Vec = Vec::new(); - let mut line = String::new(); - - // Read headers until empty line - while let Ok(bytes) = buf_reader.read_line(&mut line).await { - println!("Read {} bytes: '{}'", bytes, line.trim()); - if bytes == 0 || line.trim().is_empty() { - break; - } - raw_http_request.push(line.trim().to_string()); - line.clear(); - } - - println!("Finished reading request. Raw data: {:?}", raw_http_request); - - // Parse the HTTP request manually here since we can't use convert_to_http_request with tokio stream - let request_result = if let Some(first_line) = raw_http_request.first() { - println!("Parsing first line: '{}'", first_line); - let parts: Vec<&str> = first_line.split_whitespace().collect(); - println!("Split into parts: {:?}", parts); - - if parts.len() >= 3 { - // Extract host from headers or use default - let mut host = "localhost".to_string(); - let header_lines = raw_http_request[1..].to_vec(); - for header_line in &header_lines { - if header_line.to_lowercase().starts_with("host:") { - if let Some(host_value) = header_line.split(':').nth(1) { - host = host_value.trim().to_string(); - } - break; - } - } - - let request = HttpRequest { - method: match parts[0] { - "GET" => crate::request::HttpOption::GET, - "POST" => crate::request::HttpOption::POST, - "PUT" => crate::request::HttpOption::PUT, - "DELETE" => crate::request::HttpOption::DELETE, - _ => crate::request::HttpOption::GET, - }, - path: parts[1].to_string(), - version: parts[2].to_string(), - host, - headers: HeaderMap::from_vec(header_lines), - body: None, - }; - - println!("Successfully parsed request: {:?}", request); - Ok(request) - } else { - println!("Invalid HTTP request - not enough parts"); - let headers = std::collections::HashMap::new(); - Err(HttpResponse::bad_request("Invalid HTTP request".to_string(), headers)) - } - } else { - println!("Empty HTTP request - no first line"); - let headers = std::collections::HashMap::new(); - Err(HttpResponse::bad_request("Empty HTTP request".to_string(), headers)) - }; - - match request_result { - Ok(request) => { - println!("About to call handlers for request: {:?}", request); - - // Try each handler until one handles the request - let mut response = None; - for (i, handler) in handlers.iter().enumerate() { - println!("Calling handler {}", i); - let handler_response = handler.handle(request.clone()).await; - println!("Handler {} returned: {:?}", i, handler_response.is_some()); - if handler_response.is_some() { - response = Some(handler_response); - break; - } - } - - println!("Final response from handlers: {:?}", response.is_some()); - - // Default response if no handler matched - let http_response = match response { - Some(Some(resp)) => { - println!("Using handler response"); - resp - }, - Some(None) => { - println!("Handler returned None, using not found"); - let headers = std::collections::HashMap::new(); - HttpResponse::not_found("No handler found".to_string(), headers) - } - None => { - println!("No handlers matched, using not found"); - let headers = std::collections::HashMap::new(); - HttpResponse::not_found("No handler found".to_string(), headers) - } - }; - - println!("About to write response: {} bytes", http_response.to_bytes().len()); - - use tokio::io::AsyncWriteExt; - let mut stream = buf_reader.into_inner(); - if let Err(e) = stream.write_all(&http_response.to_bytes()).await { - println!("Failed to write response: {}", e); - log::error!("Failed to write response: {}", e); - } else if let Err(e) = stream.flush().await { - println!("Failed to flush response: {}", e); - log::error!("Failed to flush response: {}", e); - } else { - println!("Response written and flushed successfully"); - } - } - Err(response) => { - println!("Request parsing failed, sending error response"); - use tokio::io::AsyncWriteExt; - let mut stream = buf_reader.into_inner(); - if let Err(e) = stream.write_all(&response.to_bytes()).await { - println!("Failed to write error response: {}", e); - log::error!("Failed to write error response: {}", e); - } else if let Err(e) = stream.flush().await { - println!("Failed to flush error response: {}", e); - log::error!("Failed to flush error response: {}", e); - } else { - println!("Error response written and flushed successfully"); - } - } - } - - // Connection will be closed when stream goes out of scope - println!("Request processing completed"); - }); - } - } - } - } -}