From b140b2cc72c5c8e14b97cad8de1f31faeb12c90d Mon Sep 17 00:00:00 2001 From: Raphael Date: Thu, 29 Jan 2026 11:29:08 +0100 Subject: [PATCH 01/15] drop: removed http crate --- crates/http/Cargo.toml | 11 -- crates/http/src/lib.rs | 3 - crates/http/src/request.rs | 309 ------------------------------------ crates/http/src/response.rs | 275 -------------------------------- crates/http/src/server.rs | 253 ----------------------------- 5 files changed, 851 deletions(-) delete mode 100644 crates/http/Cargo.toml delete mode 100644 crates/http/src/lib.rs delete mode 100644 crates/http/src/request.rs delete mode 100644 crates/http/src/response.rs delete mode 100644 crates/http/src/server.rs diff --git a/crates/http/Cargo.toml b/crates/http/Cargo.toml deleted file mode 100644 index 91a13dc..0000000 --- a/crates/http/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "http" -version.workspace = true -edition.workspace = true - -[dependencies] -tucana = { workspace = true } -serde_json = { workspace = true } -log = { workspace = true } -env_logger = { workspace = true } -tokio = { workspace = true } diff --git a/crates/http/src/lib.rs b/crates/http/src/lib.rs deleted file mode 100644 index 4a101cf..0000000 --- a/crates/http/src/lib.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod request; -pub mod response; -pub mod server; diff --git a/crates/http/src/request.rs b/crates/http/src/request.rs deleted file mode 100644 index b770771..0000000 --- a/crates/http/src/request.rs +++ /dev/null @@ -1,309 +0,0 @@ -use super::response::HttpResponse; -use std::{ - collections::HashMap, - io::{BufRead, BufReader, Read}, - net::TcpStream, - str::FromStr, - usize, -}; -use tucana::shared::{Struct, Value, helper::value::from_json_value, value::Kind}; - -#[derive(Debug, Clone)] -pub enum HttpOption { - GET, - POST, - PUT, - DELETE, -} - -impl FromStr for HttpOption { - type Err = (); - - fn from_str(s: &str) -> Result { - match s { - "GET" => Ok(HttpOption::GET), - "POST" => Ok(HttpOption::POST), - "PUT" => Ok(HttpOption::PUT), - "DELETE" => Ok(HttpOption::DELETE), - _ => Err(()), - } - } -} - -impl ToString for HttpOption { - fn to_string(&self) -> String { - match self { - HttpOption::GET => "GET".to_string(), - HttpOption::POST => "POST".to_string(), - HttpOption::PUT => "PUT".to_string(), - HttpOption::DELETE => "DELETE".to_string(), - } - } -} - -#[derive(Debug, Clone)] -pub struct HeaderMap { - pub fields: HashMap, -} - -impl Default for HeaderMap { - fn default() -> Self { - Self::new() - } -} - -impl HeaderMap { - pub fn new() -> Self { - HeaderMap { - fields: HashMap::new(), - } - } - - /// Create a new HeaderMap from a vector of strings. - /// - /// Each string should be in the format "key: value". - /// - /// # Examples - /// - /// ``` - /// use http::request::HeaderMap; - /// - /// let header = vec![ - /// "Content-Type: application/json".to_string(), - /// "User-Agent: Mozilla/5.0".to_string(), - /// ]; - /// let header_map = HeaderMap::from_vec(header); - /// assert_eq!(header_map.get("content-type"), Some(&"application/json".to_string())); - /// assert_eq!(header_map.get("user-agent"), Some(&"mozilla/5.0".to_string())); - /// ``` - pub fn from_vec(header: Vec) -> Self { - let mut header_map = HeaderMap::new(); - - for param in header { - let mut parts = param.split(": "); - let key = match parts.next() { - Some(key) => key.to_lowercase(), - None => continue, - }; - let value = match parts.next() { - Some(value) => value.to_lowercase(), - None => continue, - }; - - header_map.add(key, value); - } - - header_map - } - - #[inline] - pub fn add(&mut self, key: String, value: String) { - self.fields.insert(key, value); - } - - #[inline] - pub fn get(&self, key: &str) -> Option<&String> { - self.fields.get(key) - } -} - -#[derive(Debug, Clone)] -pub struct HttpRequest { - pub method: HttpOption, - pub path: String, - pub version: String, - pub host: String, - pub headers: HeaderMap, - - /// The body of the request. - /// - /// # Example - /// If the url was called: - /// - /// url: .../api/users/123/posts/456?filter=recent&sort=asc - /// - /// from the regex: "^/api/users/(?P\d+)/posts/(?P\d+)(\?.*)?$" - /// - /// With the request body: - /// - /// ```json - /// { - /// "first": 2, - /// "second": 300 - /// } - /// ``` - /// The equivalent HTTP request body will look like: - /// ```json - /// { - /// "url": { - /// "user_id": "123", - /// "post_id": "456", - /// }, - /// "query": { - /// "filter": "recent", - /// "sort": "asc" - /// }, - /// "body": { - /// "first": "1", - /// "second": "2" - /// } - /// } - /// ``` - pub body: Option, -} - -pub fn convert_to_http_request(stream: &TcpStream) -> Result { - let mut buf_reader = BufReader::new(stream); - let mut raw_http_request: Vec = Vec::new(); - let mut line = String::new(); - - // Read headers until empty line - while let Ok(bytes) = buf_reader.read_line(&mut line) { - if bytes == 0 || line.trim().is_empty() { - break; - } - - raw_http_request.push(line.trim().to_string()); - line.clear(); - } - - println!("{:?}", &raw_http_request); - // Parse headers - let http_request = parse_request(raw_http_request, buf_reader)?; - - log::debug!("Received HTTP Request: {:?}", &http_request); - - if http_request.version != "HTTP/1.1" { - return Err(HttpResponse::not_implemented( - "The HTTP version is not supported".to_string(), - HashMap::new(), - )); - } - - Ok(http_request) -} - -#[inline] -fn parse_request( - raw_http_request: Vec, - mut buf_reader: BufReader<&TcpStream>, -) -> Result { - let params = &raw_http_request[0]; - let headers = raw_http_request[1..raw_http_request.len()].to_vec(); - let header_map = HeaderMap::from_vec(headers); - - if params.is_empty() { - return Err(HttpResponse::bad_request( - "Empty HTTP request line".to_string(), - HashMap::new(), - )); - } - - let mut header_params = params.split(" "); - let raw_method = header_params.next().ok_or_else(|| { - HttpResponse::bad_request("Missing HTTP method".to_string(), HashMap::new()) - })?; - let path = header_params.next().ok_or_else(|| { - HttpResponse::bad_request("Missing request path".to_string(), HashMap::new()) - })?; - let version = header_params.next().ok_or_else(|| { - HttpResponse::bad_request("Missing HTTP version".to_string(), HashMap::new()) - })?; - - let method = match HttpOption::from_str(raw_method) { - Ok(method) => method, - Err(_) => { - return Err(HttpResponse::method_not_allowed( - format!("Unsupported HTTP method: {}", raw_method), - HashMap::new(), - )); - } - }; - - let mut body_values: HashMap = HashMap::new(); - - if let Some(content_length) = header_map.get("content-length") { - let size: usize = match content_length.parse() { - Ok(len) => len, - Err(_) => { - return Err(HttpResponse::bad_request( - "Invalid content-length header".to_string(), - HashMap::new(), - )); - } - }; - - let mut body = vec![0; size]; - if buf_reader.read_exact(&mut body).is_ok() - && let Ok(json_value) = serde_json::from_slice::(&body) - { - body_values.insert("body".to_string(), from_json_value(json_value)); - } - }; - - if path.contains("?") { - let mut fields: HashMap = HashMap::new(); - if let Some((_, query)) = path.split_once("?") { - let values = query.split("&"); - - for value in values { - let mut parts = value.split("="); - let key = match parts.next() { - Some(key) => key.to_string(), - None => continue, - }; - - let value = match parts.next() { - Some(value) => value.to_string(), - None => continue, - }; - - fields.insert( - key, - Value { - kind: Some(Kind::StringValue(value)), - }, - ); - } - }; - - if !fields.is_empty() { - let value = Value { - kind: Some(Kind::StructValue(Struct { fields })), - }; - - body_values.insert("query".to_string(), value); - } - }; - - let body = if !body_values.is_empty() { - Some(Value { - kind: Some(Kind::StructValue(Struct { - fields: body_values, - })), - }) - } else { - None - }; - - let host = { - match header_map.get("host") { - Some(host) => host.clone(), - None => { - return Err(HttpResponse::bad_request( - "Missing Host in Headers!".to_string(), - HashMap::new(), - )); - } - } - }; - - Ok(HttpRequest { - method, - path: path.to_string(), - version: version.to_string(), - host, - headers: header_map, - body, - }) -} diff --git a/crates/http/src/response.rs b/crates/http/src/response.rs deleted file mode 100644 index 976c5a3..0000000 --- a/crates/http/src/response.rs +++ /dev/null @@ -1,275 +0,0 @@ -pub struct HttpResponse { - pub status_code: u16, - pub headers: std::collections::HashMap, - pub body: Vec, -} - -impl HttpResponse { - pub fn new( - status_code: u16, - headers: std::collections::HashMap, - body: Vec, - ) -> Self { - Self { - status_code, - headers, - body, - } - } - - pub fn to_bytes(&self) -> Vec { - let status_line = format!( - "HTTP/1.1 {} {}\r\n", - self.status_code, - status_text(self.status_code) - ); - - let mut headers_str = String::new(); - for (key, value) in &self.headers { - headers_str.push_str(&format!("{}: {}\r\n", key, value)); - } - - let mut response = Vec::new(); - response.extend_from_slice(status_line.as_bytes()); - response.extend_from_slice(headers_str.as_bytes()); - response.extend_from_slice(b"\r\n"); - response.extend_from_slice(&self.body); - - response - } - - // 2xx Success responses - pub fn ok(body: Vec, mut headers: std::collections::HashMap) -> Self { - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(200, headers, body) - } - - pub fn created(body: Vec, mut headers: std::collections::HashMap) -> Self { - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(201, headers, body) - } - - pub fn accepted(body: Vec, mut headers: std::collections::HashMap) -> Self { - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(202, headers, body) - } - - pub fn no_content(headers: std::collections::HashMap) -> Self { - Self::new(204, headers, Vec::new()) - } - - // 4xx Client Errors - pub fn bad_request( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(400, headers, body) - } - - pub fn unauthorized( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(401, headers, body) - } - - pub fn forbidden( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(403, headers, body) - } - - pub fn not_found( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(404, headers, body) - } - - pub fn method_not_allowed( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(405, headers, body) - } - - pub fn conflict(error: String, mut headers: std::collections::HashMap) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(409, headers, body) - } - - pub fn gone(error: String, mut headers: std::collections::HashMap) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(410, headers, body) - } - - pub fn unprocessable_entity( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(422, headers, body) - } - - pub fn too_many_requests( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(429, headers, body) - } - - // 5xx Server Errors - pub fn internal_server_error( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(500, headers, body) - } - - pub fn not_implemented( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(501, headers, body) - } - - pub fn bad_gateway( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(502, headers, body) - } - - pub fn service_unavailable( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(503, headers, body) - } - - pub fn gateway_timeout( - error: String, - mut headers: std::collections::HashMap, - ) -> Self { - let body = format!("{{\"error\":\"{}\"}}", error).into_bytes(); - if !headers.contains_key("Content-Type") { - headers.insert("Content-Type".to_string(), "application/json".to_string()); - } - headers.insert("Content-Length".to_string(), body.len().to_string()); - Self::new(504, headers, body) - } - - // Check if response is successful (2xx status code) - pub fn is_success(&self) -> bool { - self.status_code >= 200 && self.status_code < 300 - } - - // Check if response is client error (4xx status code) - pub fn is_client_error(&self) -> bool { - self.status_code >= 400 && self.status_code < 500 - } - - // Check if response is server error (5xx status code) - pub fn is_server_error(&self) -> bool { - self.status_code >= 500 && self.status_code < 600 - } -} - -// Helper function to get status text from status code -fn status_text(status_code: u16) -> &'static str { - match status_code { - 100 => "Continue", - 101 => "Switching Protocols", - 200 => "OK", - 201 => "Created", - 202 => "Accepted", - 204 => "No Content", - 400 => "Bad Request", - 401 => "Unauthorized", - 403 => "Forbidden", - 404 => "Not Found", - 405 => "Method Not Allowed", - 409 => "Conflict", - 410 => "Gone", - 422 => "Unprocessable Entity", - 429 => "Too Many Requests", - 500 => "Internal Server Error", - 501 => "Not Implemented", - 502 => "Bad Gateway", - 503 => "Service Unavailable", - 504 => "Gateway Timeout", - _ => "Unknown Status", - } -} diff --git a/crates/http/src/server.rs b/crates/http/src/server.rs deleted file mode 100644 index 0c22a78..0000000 --- a/crates/http/src/server.rs +++ /dev/null @@ -1,253 +0,0 @@ -use crate::{ - request::{HeaderMap, HttpRequest}, - response::HttpResponse, -}; -use std::{future::Future, net::TcpListener, pin::Pin, sync::Arc}; - -// Handler trait for asynchronous request handling only -pub trait AsyncHandler: Send + Sync + 'static { - fn handle( - &self, - request: HttpRequest, - ) -> Pin> + Send + 'static>>; -} - -// Implement AsyncHandler for async closures -impl AsyncHandler for F -where - F: Fn(HttpRequest) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, -{ - fn handle( - &self, - request: HttpRequest, - ) -> Pin> + Send + 'static>> { - Box::pin(self(request)) - } -} - -pub struct Server { - host: String, - port: u16, - handlers: Arc>>, - shutdown_tx: Option>, -} - -impl Server { - pub fn new(host: String, port: u16) -> Self { - Server { - host, - port, - handlers: Arc::new(Vec::new()), - shutdown_tx: None, - } - } - - /// Register an async handler - pub fn register_handler(&mut self, handler: H) - where - H: AsyncHandler, - { - let handlers = - Arc::get_mut(&mut self.handlers).expect("Cannot register handler after server start"); - handlers.push(Box::new(handler)); - } - - /// Register an async closure as a handler - pub fn register_async_closure(&mut self, closure: F) - where - F: Fn(HttpRequest) -> Fut + Send + Sync + 'static, - Fut: Future> + Send + 'static, - { - self.register_handler(closure); - } - - pub async fn start(&mut self) { - let (shutdown_tx, mut shutdown_rx) = tokio::sync::broadcast::channel(1); - self.shutdown_tx = Some(shutdown_tx); - - self.run_server(&mut shutdown_rx).await; - } - - pub fn shutdown(&self) { - if let Some(ref tx) = self.shutdown_tx { - let _ = tx.send(()); - } - } - - async fn run_server(&self, shutdown_rx: &mut tokio::sync::broadcast::Receiver<()>) { - let url = format!("{}:{}", self.host, self.port); - log::info!("Starting http server on {}", &url); - let listener = match TcpListener::bind(&url) { - Ok(listener) => { - listener - .set_nonblocking(true) - .expect("Failed to set non-blocking"); - listener - } - Err(err) => panic!("Failed to bind to {}: {}", url, err), - }; - - let async_listener = - tokio::net::TcpListener::from_std(listener).expect("Failed to create async listener"); - - loop { - tokio::select! { - _ = shutdown_rx.recv() => { - log::info!("Shutdown signal received, stopping server"); - break; - } - stream_result = async_listener.accept() => { - let (stream, _) = match stream_result { - Ok(connection) => connection, - Err(err) => { - log::error!("Failed to accept incoming connection: {}", err); - continue; - } - }; - - let handlers = self.handlers.clone(); - - tokio::spawn(async move { - println!("New connection accepted, starting to read request..."); - - // Read HTTP request data using tokio's async methods - use tokio::io::{AsyncBufReadExt, BufReader}; - - let mut buf_reader = BufReader::new(stream); - let mut raw_http_request: Vec = Vec::new(); - let mut line = String::new(); - - // Read headers until empty line - while let Ok(bytes) = buf_reader.read_line(&mut line).await { - println!("Read {} bytes: '{}'", bytes, line.trim()); - if bytes == 0 || line.trim().is_empty() { - break; - } - raw_http_request.push(line.trim().to_string()); - line.clear(); - } - - println!("Finished reading request. Raw data: {:?}", raw_http_request); - - // Parse the HTTP request manually here since we can't use convert_to_http_request with tokio stream - let request_result = if let Some(first_line) = raw_http_request.first() { - println!("Parsing first line: '{}'", first_line); - let parts: Vec<&str> = first_line.split_whitespace().collect(); - println!("Split into parts: {:?}", parts); - - if parts.len() >= 3 { - // Extract host from headers or use default - let mut host = "localhost".to_string(); - let header_lines = raw_http_request[1..].to_vec(); - for header_line in &header_lines { - if header_line.to_lowercase().starts_with("host:") { - if let Some(host_value) = header_line.split(':').nth(1) { - host = host_value.trim().to_string(); - } - break; - } - } - - let request = HttpRequest { - method: match parts[0] { - "GET" => crate::request::HttpOption::GET, - "POST" => crate::request::HttpOption::POST, - "PUT" => crate::request::HttpOption::PUT, - "DELETE" => crate::request::HttpOption::DELETE, - _ => crate::request::HttpOption::GET, - }, - path: parts[1].to_string(), - version: parts[2].to_string(), - host, - headers: HeaderMap::from_vec(header_lines), - body: None, - }; - - println!("Successfully parsed request: {:?}", request); - Ok(request) - } else { - println!("Invalid HTTP request - not enough parts"); - let headers = std::collections::HashMap::new(); - Err(HttpResponse::bad_request("Invalid HTTP request".to_string(), headers)) - } - } else { - println!("Empty HTTP request - no first line"); - let headers = std::collections::HashMap::new(); - Err(HttpResponse::bad_request("Empty HTTP request".to_string(), headers)) - }; - - match request_result { - Ok(request) => { - println!("About to call handlers for request: {:?}", request); - - // Try each handler until one handles the request - let mut response = None; - for (i, handler) in handlers.iter().enumerate() { - println!("Calling handler {}", i); - let handler_response = handler.handle(request.clone()).await; - println!("Handler {} returned: {:?}", i, handler_response.is_some()); - if handler_response.is_some() { - response = Some(handler_response); - break; - } - } - - println!("Final response from handlers: {:?}", response.is_some()); - - // Default response if no handler matched - let http_response = match response { - Some(Some(resp)) => { - println!("Using handler response"); - resp - }, - Some(None) => { - println!("Handler returned None, using not found"); - let headers = std::collections::HashMap::new(); - HttpResponse::not_found("No handler found".to_string(), headers) - } - None => { - println!("No handlers matched, using not found"); - let headers = std::collections::HashMap::new(); - HttpResponse::not_found("No handler found".to_string(), headers) - } - }; - - println!("About to write response: {} bytes", http_response.to_bytes().len()); - - use tokio::io::AsyncWriteExt; - let mut stream = buf_reader.into_inner(); - if let Err(e) = stream.write_all(&http_response.to_bytes()).await { - println!("Failed to write response: {}", e); - log::error!("Failed to write response: {}", e); - } else if let Err(e) = stream.flush().await { - println!("Failed to flush response: {}", e); - log::error!("Failed to flush response: {}", e); - } else { - println!("Response written and flushed successfully"); - } - } - Err(response) => { - println!("Request parsing failed, sending error response"); - use tokio::io::AsyncWriteExt; - let mut stream = buf_reader.into_inner(); - if let Err(e) = stream.write_all(&response.to_bytes()).await { - println!("Failed to write error response: {}", e); - log::error!("Failed to write error response: {}", e); - } else if let Err(e) = stream.flush().await { - println!("Failed to flush error response: {}", e); - log::error!("Failed to flush error response: {}", e); - } else { - println!("Error response written and flushed successfully"); - } - } - } - - // Connection will be closed when stream goes out of scope - println!("Request processing completed"); - }); - } - } - } - } -} From ea7a415e24fe64862570664e50ab16c62fd5147f Mon Sep 17 00:00:00 2001 From: Raphael Date: Thu, 29 Jan 2026 11:29:17 +0100 Subject: [PATCH 02/15] drop: removed http crate --- Cargo.lock | 63 +++++++++++++++++++++--------------------------------- Cargo.toml | 5 +---- 2 files changed, 25 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8f8d245..84f57de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -173,7 +173,7 @@ dependencies = [ "axum-core", "bytes", "futures-util", - "http 1.3.1", + "http", "http-body", "http-body-util", "itoa", @@ -198,7 +198,7 @@ checksum = "68464cd0412f486726fb3373129ef5d2993f90c34bc2bc1c1e9943b2f4fc7ca6" dependencies = [ "bytes", "futures-core", - "http 1.3.1", + "http", "http-body", "http-body-util", "mime", @@ -645,7 +645,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http 1.3.1", + "http", "indexmap", "slab", "tokio", @@ -665,17 +665,6 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" -[[package]] -name = "http" -version = "0.0.0" -dependencies = [ - "env_logger", - "log", - "serde_json", - "tokio", - "tucana", -] - [[package]] name = "http" version = "1.3.1" @@ -694,7 +683,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http 1.3.1", + "http", ] [[package]] @@ -705,7 +694,7 @@ checksum = "b021d93e26becf5dc7e1b75b1bed1fd93124b374ceb73f43d4d4eafec896a64a" dependencies = [ "bytes", "futures-core", - "http 1.3.1", + "http", "http-body", "pin-project-lite", ] @@ -724,20 +713,22 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.6.0" +version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" +checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" dependencies = [ + "atomic-waker", "bytes", "futures-channel", - "futures-util", + "futures-core", "h2", - "http 1.3.1", + "http", "http-body", "httparse", "httpdate", "itoa", "pin-project-lite", + "pin-utils", "smallvec", "tokio", "want", @@ -758,19 +749,20 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.11" +version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "497bbc33a26fdd4af9ed9c70d63f61cf56a938375fbb32df34db9b1cd6d643f2" +checksum = "727805d60e7938b76b826a6ef209eb70eaa1812794f9424d4a4e2d740662df5f" dependencies = [ "bytes", "futures-channel", + "futures-core", "futures-util", - "http 1.3.1", + "http", "http-body", "hyper", "libc", "pin-project-lite", - "socket2 0.5.9", + "socket2", "tokio", "tower-service", "tracing", @@ -1404,8 +1396,11 @@ dependencies = [ "anyhow", "base", "code0-flow", - "http 0.0.0", + "http-body-util", + "hyper", + "hyper-util", "log", + "prost", "regex", "serde_json", "tokio", @@ -1691,16 +1686,6 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7fcf8323ef1faaee30a44a340193b1ac6814fd9b7b4e88e9d4519a3e4abe1cfd" -[[package]] -name = "socket2" -version = "0.5.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f5fd57c80058a56cf5c777ab8a126398ece8e442983605d280a44ce79d0edef" -dependencies = [ - "libc", - "windows-sys 0.52.0", -] - [[package]] name = "socket2" version = "0.6.0" @@ -1846,7 +1831,7 @@ dependencies = [ "mio", "pin-project-lite", "signal-hook-registry", - "socket2 0.6.0", + "socket2", "tokio-macros", "windows-sys 0.61.2", ] @@ -1907,7 +1892,7 @@ dependencies = [ "bytes", "futures-core", "futures-sink", - "http 1.3.1", + "http", "httparse", "rand", "ring", @@ -1929,7 +1914,7 @@ dependencies = [ "base64", "bytes", "h2", - "http 1.3.1", + "http", "http-body", "http-body-util", "hyper", @@ -1937,7 +1922,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "socket2 0.6.0", + "socket2", "sync_wrapper", "tokio", "tokio-stream", diff --git a/Cargo.toml b/Cargo.toml index 6b24365..a8986fe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["crates/http", "adapter/rest", "crates/base"] +members = ["adapter/rest", "crates/base"] resolver = "3" [workspace.package] @@ -23,8 +23,5 @@ prost = "0.14.0" tonic-health = "0.14.0" futures-lite = "2.6.1" -[workspace.dependencies.http] -path = "../draco/crates/http" - [workspace.dependencies.base] path = "../draco/crates/base" From 8109b516f4d4978824ff851e1236f34bfd069f48 Mon Sep 17 00:00:00 2001 From: Raphael Date: Thu, 29 Jan 2026 11:29:43 +0100 Subject: [PATCH 03/15] feat: started to work on swtiching from http crate to hyper --- adapter/rest/Cargo.toml | 5 +- adapter/rest/src/config.rs | 18 ++ adapter/rest/src/main.rs | 332 ++++++++++++++++++++++--------------- adapter/rest/src/route.rs | 85 ++++++++++ 4 files changed, 307 insertions(+), 133 deletions(-) create mode 100644 adapter/rest/src/config.rs create mode 100644 adapter/rest/src/route.rs diff --git a/adapter/rest/Cargo.toml b/adapter/rest/Cargo.toml index 9c41825..5c56b26 100644 --- a/adapter/rest/Cargo.toml +++ b/adapter/rest/Cargo.toml @@ -4,7 +4,6 @@ version = "0.1.0" edition.workspace = true [dependencies] -http = { workspace = true } code0-flow = { workspace = true } tokio = { workspace = true } tucana = { workspace = true } @@ -14,3 +13,7 @@ regex = { workspace = true } tonic = { workspace = true } base = { workspace = true } anyhow = { workspace = true } +hyper-util = "0.1.19" +hyper = "1.8.1" +http-body-util = "0.1.3" +prost = { workspace = true } diff --git a/adapter/rest/src/config.rs b/adapter/rest/src/config.rs new file mode 100644 index 0000000..08bfb0d --- /dev/null +++ b/adapter/rest/src/config.rs @@ -0,0 +1,18 @@ +use base::traits::LoadConfig; +use code0_flow::flow_config::env_with_default; + + +#[derive(Clone)] +pub struct HttpServerConfig { + pub port: u16, + pub host: String, +} + +impl LoadConfig for HttpServerConfig { + fn load() -> Self { + Self { + port: env_with_default("HTTP_SERVER_PORT", 8080), + host: env_with_default("HTTP_SERVER_HOST", String::from("127.0.0.1")), + } + } +} diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 060ec62..d94937d 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -1,24 +1,37 @@ use base::{ runner::{ServerContext, ServerRunner}, store::FlowIdentifyResult, - traits::{IdentifiableFlow, LoadConfig, Server as ServerTrait}, + traits::Server as ServerTrait, }; -use code0_flow::flow_config::env_with_default; -use http::{request::HttpRequest, response::HttpResponse, server::Server}; +use http_body_util::{BodyExt, Full}; +use hyper::{header::{HeaderName, HeaderValue}, server::conn::http1}; +use hyper::{Request, Response}; +use hyper::{ + StatusCode, + body::{Bytes, Incoming}, +}; +use hyper_util::rt::TokioIo; use std::collections::HashMap; +use std::convert::Infallible; +use std::net::SocketAddr; use std::sync::Arc; +use tokio::net::TcpListener; use tonic::async_trait; use tucana::shared::value::Kind; use tucana::shared::value::Kind::StructValue; use tucana::shared::{Struct, ValidationFlow, Value}; +mod config; +mod route; + #[tokio::main] async fn main() { - let server = HttpServer { http_server: None }; + let server = HttpServer { addr: None }; let runner = match ServerRunner::new(server).await { Ok(runner) => runner, Err(err) => panic!("Failed to create server runner: {:?}", err), }; + log::info!("Successfully created runner for http service"); match runner.serve().await { Ok(_) => (), Err(err) => panic!("Failed to start server runner: {:?}", err), @@ -26,182 +39,237 @@ async fn main() { } struct HttpServer { - http_server: Option, + addr: Option, } -struct RequestRoute { - url: String, +fn json_error(status: StatusCode, msg: &str) -> Response> { + let body = format!(r#"{{"error": "{}"}}"#, msg); + Response::builder() + .status(status) + .header("content-type", "application/json") + .body(Full::new(Bytes::from(body))) + .unwrap() } -impl IdentifiableFlow for RequestRoute { - fn identify(&self, flow: &ValidationFlow) -> bool { - let regex_str = flow - .settings - .iter() - .find(|s| s.flow_setting_id == "HTTP_URL") - .and_then(|s| s.value.as_ref()) - .and_then(|v| v.kind.as_ref()) - .and_then(|k| match k { - Kind::StringValue(s) => Some(s.as_str()), - _ => None, - }); - let Some(regex_str) = regex_str else { - return false; - }; - print!( - "Comparing regex {} with literal route: {}", - regex_str, self.url - ); - - match regex::Regex::new(regex_str) { - Ok(regex) => regex.is_match(&self.url), - Err(err) => { - log::error!("Failed to compile regex: {}", err); - false - } - } - } -} +fn build_response( + status: StatusCode, + headers: HashMap, + body: Vec, +) -> Response> { + let mut builder = Response::builder().status(status); -#[async_trait] -impl ServerTrait for HttpServer { - async fn init(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { - log::info!("Initializing http server"); - self.http_server = Some(Server::new( - ctx.server_config.host.clone(), - ctx.server_config.port, - )); - Ok(()) - } + { + let h = builder.headers_mut().unwrap(); + for (k, v) in headers { + let name = match HeaderName::from_bytes(k.as_bytes()) { + Ok(n) => n, + Err(_) => { + log::warn!("Dropping invalid header name: {}", k); + continue; + } + }; - async fn run(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { - if let Some(server) = &mut self.http_server { - log::info!("Running http server"); - server.register_async_closure({ - let store = Arc::clone(&ctx.adapter_store); - move |request: HttpRequest| { - let store = Arc::clone(&store); - async move { - //Get slug => host/slug/real_path - - let splits: Vec<_> = request.path.split("/").collect(); - let first = splits.first(); - - if let Some(slug) = first { - let pattern = format!("REST.{}.*", slug); - let route = RequestRoute { - url: request.path.clone(), - }; - - match store.get_possible_flow_match(pattern, route).await { - FlowIdentifyResult::Single(flow) => { - print!("Found flow: {}", flow.flow_id); - execute_flow(flow, request, store).await - } - _ => Some(HttpResponse::internal_server_error( - format!("No flow found for path: {}", request.path), - HashMap::new(), - )), - } - } else { - Some(HttpResponse::internal_server_error( - format!("No flow found for path: {}", request.path), - HashMap::new(), - )) - } - } + let value = match HeaderValue::from_str(&v) { + Ok(v) => v, + Err(_) => { + log::warn!("Dropping invalid header value for {}: {:?}", k, v); + continue; } - }); + }; - server.start().await; + h.insert(name, value); } - Ok(()) } - async fn shutdown(&mut self, _ctx: &ServerContext) -> anyhow::Result<()> { - if let Some(server) = &self.http_server { - server.shutdown(); - } - Ok(()) - } + builder.body(Full::new(Bytes::from(body))).unwrap() } -async fn execute_flow( + +async fn execute_flow_to_hyper_response( flow: ValidationFlow, - request: HttpRequest, + body: Vec, store: Arc, -) -> Option { - match store.validate_and_execute_flow(flow, request.body).await { +) -> Response> { + let value: Option = match prost::Message::decode(body.as_slice()) { + Ok(v) => Some(v), + Err(_) => None, + }; + + match store.validate_and_execute_flow(flow, value).await { Some(result) => { let Value { kind: Some(StructValue(Struct { fields })), } = result else { - return None; + return json_error( + StatusCode::INTERNAL_SERVER_ERROR, + "Flow result was not a struct", + ); }; - let Some(headers) = fields.get("headers") else { - return None; + let Some(headers_val) = fields.get("headers") else { + return json_error( + StatusCode::INTERNAL_SERVER_ERROR, + "Flow result missing headers", + ); }; - - let Some(status_code) = fields.get("status_code") else { - return None; + let Some(status_code_val) = fields.get("status_code") else { + return json_error( + StatusCode::INTERNAL_SERVER_ERROR, + "Flow result missing status_code", + ); }; - - let Some(payload) = fields.get("payload") else { - return None; + let Some(payload_val) = fields.get("payload") else { + return json_error( + StatusCode::INTERNAL_SERVER_ERROR, + "Flow result missing payload", + ); }; + // headers struct let Value { kind: Some(StructValue(Struct { fields: header_fields, })), - } = headers + } = headers_val else { - return None; + return json_error( + StatusCode::INTERNAL_SERVER_ERROR, + "headers was not a struct", + ); }; + let http_headers: HashMap = header_fields .iter() - .filter_map(|(k, v)| { - let value = match &v.kind { - Some(Kind::StringValue(s)) if !s.is_empty() => s.clone(), - _ => return None, - }; - - Some((k.clone(), value)) + .filter_map(|(k, v)| match &v.kind { + Some(Kind::StringValue(s)) if !s.is_empty() => Some((k.clone(), s.clone())), + _ => None, }) .collect(); - let json = serde_json::to_vec_pretty(&payload).unwrap_or_else(|err| { - format!(r#"{{"error": "Serialization failed: {}"}}"#, err).into_bytes() + // status_code number + let Some(Kind::NumberValue(code)) = status_code_val.kind else { + return json_error( + StatusCode::INTERNAL_SERVER_ERROR, + "status_code was not a number", + ); + }; + + // payload -> json bytes + let json = serde_json::to_vec_pretty(payload_val).unwrap_or_else(|err| { + format!(r#"{{"error":"Serialization failed: {}"}}"#, err).into_bytes() }); - let Some(Kind::NumberValue(code)) = status_code.kind else { - return None; - }; - Some(HttpResponse::new(code as u16, http_headers.clone(), json)) + let status = + StatusCode::from_u16(code as u16).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + build_response(status, http_headers, json) } - None => Some(HttpResponse::internal_server_error( - "Flow execution failed".to_string(), - HashMap::new(), - )), + None => json_error(StatusCode::INTERNAL_SERVER_ERROR, "Flow execution failed"), } } -#[derive(Clone)] -struct HttpServerConfig { - port: u16, - host: String, +pub async fn handle_request( + req: Request, + store: Arc, +) -> Result>, Infallible> { + let method = req.method().clone(); + let path = req.uri().path().to_string(); + + // Read full body + let body_bytes = match BodyExt::collect(req.into_body()).await { + Ok(collected) => collected.to_bytes().to_vec(), + Err(err) => { + log::error!("Failed to read request body: {}", err); + return Ok(json_error( + StatusCode::BAD_REQUEST, + "Failed to read request body", + )); + } + }; + + // slug matching + let Some(slug) = route::extract_slug_from_path(&path) else { + return Ok(json_error(StatusCode::BAD_REQUEST, "Missing slug in path")); + }; + + let pattern = format!("REST.{}.*", slug); + let route = route::RequestRoute { + url: path.clone(), + method, + }; + + let resp = match store.get_possible_flow_match(pattern, route).await { + FlowIdentifyResult::Single(flow) => { + execute_flow_to_hyper_response(flow, body_bytes, store).await + } + _ => json_error(StatusCode::NOT_FOUND, "No flow found for path"), + }; + + Ok(resp) } -impl LoadConfig for HttpServerConfig { - fn load() -> Self { - Self { - port: env_with_default("HTTP_SERVER_PORT", 8082), - host: env_with_default("HTTP_SERVER_HOST", String::from("0.0.0.0")), +#[async_trait] +impl ServerTrait for HttpServer { + async fn init(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { + log::info!("Initializing http server"); + let bind = format!("{}:{}", ctx.server_config.host, ctx.server_config.port); + + self.addr = Some( + bind.parse::() + .map_err(|e| anyhow::anyhow!("Invalid bind address '{}': {}", bind, e))?, + ); + + log::debug!("Initizalized with Address: {:?}", self.addr); + Ok(()) + } + + async fn run(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { + let addr = match self.addr { + Some(addr) => addr, + None => panic!("cannot start tcp listener with empty address"), + }; + + let listener = match TcpListener::bind(addr).await { + Ok(listener) => listener, + Err(err) => { + panic!("failed to register tcp listener on address: {:?}", err); + } + }; + + loop { + let (stream, _) = match listener.accept().await { + Ok(res) => res, + Err(e) => { + panic!("listener failed to accept requests: {:?}", e); + } + }; + + let io = TokioIo::new(stream); + + let store = Arc::clone(&ctx.adapter_store); + + tokio::task::spawn(async move { + let store = Arc::clone(&store); + let svc = hyper::service::service_fn(move |req| { + let store = Arc::clone(&store); + async move { handle_request(req, store).await } + }); + + if let Err(err) = http1::Builder::new().serve_connection(io, svc).await { + log::error!("Error serving connection: {:?}", err); + } + }); } } + + async fn shutdown( + &mut self, + _ctx: &ServerContext, + ) -> anyhow::Result<()> { + todo!("Implement shutdown!"); + Ok(()) + } } diff --git a/adapter/rest/src/route.rs b/adapter/rest/src/route.rs new file mode 100644 index 0000000..387e831 --- /dev/null +++ b/adapter/rest/src/route.rs @@ -0,0 +1,85 @@ +use base::traits::IdentifiableFlow; +use tucana::shared::{ValidationFlow, value::Kind}; + +pub struct RequestRoute { + pub url: String, + pub method: hyper::Method, +} + +// Checks if the Method and Url matches any of the +// Flows that matched the original slug pattern for project +// Only if both matched, it will return true +impl IdentifiableFlow for RequestRoute { + fn identify(&self, flow: &ValidationFlow) -> bool { + // Get Method of the FlowSetting + let method_str = flow + .settings + .iter() + .find(|s| s.flow_setting_id == "HTTP_METHOD") + .and_then(|s| s.value.as_ref()) + .and_then(|v| v.kind.as_ref()) + .and_then(|k| match k { + Kind::StringValue(s) => Some(s.as_str()), + _ => None, + }); + + log::debug!( + "Comparing flows method: {:?} with request route: {}", + method_str, + self.method.as_str() + ); + + if let Some(method) = method_str { + if method != self.method.as_str() { + log::debug!("Method didn't eq"); + return false; + } + } else { + log::debug!("Method didn't eq"); + return false; + } + // Get URL of the FlowSetting + let regex_str_v = flow + .settings + .iter() + .find(|s| s.flow_setting_id == "HTTP_URL"); + + log::debug!("Extacted: {:?} as HTTP_URL", ®ex_str_v); + + let regex_str = regex_str_v + .and_then(|s| s.value.as_ref()) + .and_then(|v| v.kind.as_ref()) + .and_then(|k| match k { + Kind::StringValue(s) => Some(s.as_str()), + _ => None, + }); + + let Some(regex_str) = regex_str else { + log::debug!("Regex was empty"); + return false; + }; + + log::debug!( + "Comparing regex {} with literal route: {}", + regex_str, + self.url + ); + + // Check if the request is matching + match regex::Regex::new(regex_str) { + Ok(regex) => { + log::debug!("found a match for {}", regex_str); + return regex.is_match(&self.url); + } + Err(err) => { + log::error!("Failed to compile regex: {}", err); + false + } + } + } +} + +pub fn extract_slug_from_path(path: &str) -> Option<&str> { + let trimmed = path.trim_start_matches('/'); + trimmed.split('/').next().filter(|s| !s.is_empty()) +} From c2ceddc334fef84fd06d3c4aaae0c97a4286f254 Mon Sep 17 00:00:00 2001 From: Raphael Date: Fri, 30 Jan 2026 13:45:10 +0100 Subject: [PATCH 04/15] feat: made header to list --- adapter/rest/src/main.rs | 63 +++++++++++++++++++++++++++++++++------- 1 file changed, 52 insertions(+), 11 deletions(-) diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index d94937d..f05cd00 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -4,12 +4,15 @@ use base::{ traits::Server as ServerTrait, }; use http_body_util::{BodyExt, Full}; -use hyper::{header::{HeaderName, HeaderValue}, server::conn::http1}; use hyper::{Request, Response}; use hyper::{ StatusCode, body::{Bytes, Incoming}, }; +use hyper::{ + header::{HeaderName, HeaderValue}, + server::conn::http1, +}; use hyper_util::rt::TokioIo; use std::collections::HashMap; use std::convert::Infallible; @@ -17,8 +20,8 @@ use std::net::SocketAddr; use std::sync::Arc; use tokio::net::TcpListener; use tonic::async_trait; -use tucana::shared::value::Kind; use tucana::shared::value::Kind::StructValue; +use tucana::shared::{ListValue, value::Kind}; use tucana::shared::{Struct, ValidationFlow, Value}; mod config; @@ -51,8 +54,6 @@ fn json_error(status: StatusCode, msg: &str) -> Response> { .unwrap() } - - fn build_response( status: StatusCode, headers: HashMap, @@ -86,7 +87,6 @@ fn build_response( builder.body(Full::new(Bytes::from(body))).unwrap() } - async fn execute_flow_to_hyper_response( flow: ValidationFlow, body: Vec, @@ -99,6 +99,7 @@ async fn execute_flow_to_hyper_response( match store.validate_and_execute_flow(flow, value).await { Some(result) => { + log::debug!("Recieved Result: {:?}", result); let Value { kind: Some(StructValue(Struct { fields })), } = result @@ -131,8 +132,8 @@ async fn execute_flow_to_hyper_response( // headers struct let Value { kind: - Some(StructValue(Struct { - fields: header_fields, + Some(Kind::ListValue(ListValue { + values: header_fields, })), } = headers_val else { @@ -144,9 +145,46 @@ async fn execute_flow_to_hyper_response( let http_headers: HashMap = header_fields .iter() - .filter_map(|(k, v)| match &v.kind { - Some(Kind::StringValue(s)) if !s.is_empty() => Some((k.clone(), s.clone())), - _ => None, + .filter_map(|x| { + if let Value { + kind: Some(StructValue(Struct { fields: f })), + } = x + { + return Some(f); + } else { + return None; + } + }) + .filter_map(|f| { + let key = match f.get("key") { + Some(value) => { + if let Value { + kind: Some(Kind::StringValue(x)), + } = value + { + x + } else { + return None; + } + } + None => return None, + }; + let value = match f.get("value") { + Some(value) => { + if let Value { + kind: Some(Kind::StringValue(x)), + } = value + { + x + } else { + return None; + } + } + None => return None, + }; + + + return Some((key.clone(), value.clone())); }) .collect(); @@ -167,7 +205,10 @@ async fn execute_flow_to_hyper_response( StatusCode::from_u16(code as u16).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); build_response(status, http_headers, json) } - None => json_error(StatusCode::INTERNAL_SERVER_ERROR, "Flow execution failed"), + None => { + log::error!("flow execution failed"); + return json_error(StatusCode::INTERNAL_SERVER_ERROR, "Flow execution failed"); + } } } From 5a88acfe5b33ae8638a337620536a594433ea004 Mon Sep 17 00:00:00 2001 From: Raphael Date: Fri, 30 Jan 2026 13:45:57 +0100 Subject: [PATCH 05/15] feat: added logs for execution --- crates/base/src/store.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/crates/base/src/store.rs b/crates/base/src/store.rs index 380d85e..7a9c119 100644 --- a/crates/base/src/store.rs +++ b/crates/base/src/store.rs @@ -131,9 +131,15 @@ impl AdapterStore { } let uuid = uuid::Uuid::new_v4().to_string(); + let flow_id = flow.flow_id; let execution_flow: ExecutionFlow = Self::convert_validation_flow(flow, input_value); let bytes = execution_flow.encode_to_vec(); let topic = format!("execution.{}", uuid); + log::info!( + "Requesting the exectition of the flow: {} with an execution id {}", + flow_id, + uuid + ); let result = self.client.request(topic, bytes.into()).await; match result { From a52ac4e7cce4b97448b5f1e7e40f7da3a0751041 Mon Sep 17 00:00:00 2001 From: Raphael Date: Fri, 30 Jan 2026 13:47:30 +0100 Subject: [PATCH 06/15] ref: cargo fmt --- adapter/rest/src/config.rs | 1 - adapter/rest/src/main.rs | 14 +++++--------- adapter/rest/src/route.rs | 2 +- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/adapter/rest/src/config.rs b/adapter/rest/src/config.rs index 08bfb0d..e946c27 100644 --- a/adapter/rest/src/config.rs +++ b/adapter/rest/src/config.rs @@ -1,7 +1,6 @@ use base::traits::LoadConfig; use code0_flow::flow_config::env_with_default; - #[derive(Clone)] pub struct HttpServerConfig { pub port: u16, diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index f05cd00..484b585 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -92,10 +92,7 @@ async fn execute_flow_to_hyper_response( body: Vec, store: Arc, ) -> Response> { - let value: Option = match prost::Message::decode(body.as_slice()) { - Ok(v) => Some(v), - Err(_) => None, - }; + let value: Option = prost::Message::decode(body.as_slice()).ok(); match store.validate_and_execute_flow(flow, value).await { Some(result) => { @@ -150,9 +147,9 @@ async fn execute_flow_to_hyper_response( kind: Some(StructValue(Struct { fields: f })), } = x { - return Some(f); + Some(f) } else { - return None; + None } }) .filter_map(|f| { @@ -183,8 +180,7 @@ async fn execute_flow_to_hyper_response( None => return None, }; - - return Some((key.clone(), value.clone())); + Some((key.clone(), value.clone())) }) .collect(); @@ -207,7 +203,7 @@ async fn execute_flow_to_hyper_response( } None => { log::error!("flow execution failed"); - return json_error(StatusCode::INTERNAL_SERVER_ERROR, "Flow execution failed"); + json_error(StatusCode::INTERNAL_SERVER_ERROR, "Flow execution failed") } } } diff --git a/adapter/rest/src/route.rs b/adapter/rest/src/route.rs index 387e831..96dd62c 100644 --- a/adapter/rest/src/route.rs +++ b/adapter/rest/src/route.rs @@ -69,7 +69,7 @@ impl IdentifiableFlow for RequestRoute { match regex::Regex::new(regex_str) { Ok(regex) => { log::debug!("found a match for {}", regex_str); - return regex.is_match(&self.url); + regex.is_match(&self.url) } Err(err) => { log::error!("Failed to compile regex: {}", err); From f921b1ab39aaeaa56d61a94bfe4168794e391015 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 30 Jan 2026 13:58:41 +0100 Subject: [PATCH 07/15] Update adapter/rest/src/route.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- adapter/rest/src/route.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adapter/rest/src/route.rs b/adapter/rest/src/route.rs index 96dd62c..7fe2d03 100644 --- a/adapter/rest/src/route.rs +++ b/adapter/rest/src/route.rs @@ -44,7 +44,7 @@ impl IdentifiableFlow for RequestRoute { .iter() .find(|s| s.flow_setting_id == "HTTP_URL"); - log::debug!("Extacted: {:?} as HTTP_URL", ®ex_str_v); + log::debug!("Extracted: {:?} as HTTP_URL", ®ex_str_v); let regex_str = regex_str_v .and_then(|s| s.value.as_ref()) From ed79258cf99462497715e912c2f51367c376d066 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 30 Jan 2026 13:59:41 +0100 Subject: [PATCH 08/15] Update adapter/rest/src/main.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- adapter/rest/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 484b585..413d911 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -136,7 +136,7 @@ async fn execute_flow_to_hyper_response( else { return json_error( StatusCode::INTERNAL_SERVER_ERROR, - "headers was not a struct", + "headers was not a list of header entries", ); }; From 18342df2b6026e7bb946c536281dd36076a302c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 30 Jan 2026 13:59:55 +0100 Subject: [PATCH 09/15] Update adapter/rest/src/route.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- adapter/rest/src/route.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adapter/rest/src/route.rs b/adapter/rest/src/route.rs index 7fe2d03..b36e615 100644 --- a/adapter/rest/src/route.rs +++ b/adapter/rest/src/route.rs @@ -68,7 +68,7 @@ impl IdentifiableFlow for RequestRoute { // Check if the request is matching match regex::Regex::new(regex_str) { Ok(regex) => { - log::debug!("found a match for {}", regex_str); + log::debug!("Successfully compiled regex: {}", regex_str); regex.is_match(&self.url) } Err(err) => { From c70fc5d639e52f9df5d85b41d4ccb458f96fb1f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 30 Jan 2026 14:00:05 +0100 Subject: [PATCH 10/15] Update crates/base/src/store.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- crates/base/src/store.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/base/src/store.rs b/crates/base/src/store.rs index 7a9c119..f222d35 100644 --- a/crates/base/src/store.rs +++ b/crates/base/src/store.rs @@ -136,7 +136,7 @@ impl AdapterStore { let bytes = execution_flow.encode_to_vec(); let topic = format!("execution.{}", uuid); log::info!( - "Requesting the exectition of the flow: {} with an execution id {}", + "Requesting execution of flow {} with execution id {}", flow_id, uuid ); From 24fc6218206c762dfd96daa99dbe1f358a3a3163 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 30 Jan 2026 14:01:02 +0100 Subject: [PATCH 11/15] Update adapter/rest/src/main.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- adapter/rest/src/main.rs | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 413d911..204da35 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -92,7 +92,20 @@ async fn execute_flow_to_hyper_response( body: Vec, store: Arc, ) -> Response> { - let value: Option = prost::Message::decode(body.as_slice()).ok(); + let value: Option = if body.is_empty() { + None + } else { + match prost::Message::decode(body.as_slice()) { + Ok(v) => Some(v), + Err(e) => { + log::warn!("Failed to decode request body as protobuf Value: {}", e); + return json_error( + StatusCode::BAD_REQUEST, + "Failed to decode request body as protobuf Value", + ); + } + } + }; match store.validate_and_execute_flow(flow, value).await { Some(result) => { From a113543679e1ddc99bc5d4dc233a593fb7cdb735 Mon Sep 17 00:00:00 2001 From: Raphael Date: Fri, 30 Jan 2026 14:19:05 +0100 Subject: [PATCH 12/15] feat: implemented shutdown --- adapter/rest/src/main.rs | 77 ++++++++++++++++++++++++++++------------ 1 file changed, 54 insertions(+), 23 deletions(-) diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 204da35..18eeab4 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -29,7 +29,10 @@ mod route; #[tokio::main] async fn main() { - let server = HttpServer { addr: None }; + let server = HttpServer { + shutdown_tx: None, + addr: None, + }; let runner = match ServerRunner::new(server).await { Ok(runner) => runner, Err(err) => panic!("Failed to create server runner: {:?}", err), @@ -42,6 +45,7 @@ async fn main() { } struct HttpServer { + shutdown_tx: Option>, addr: Option, } @@ -265,6 +269,10 @@ pub async fn handle_request( impl ServerTrait for HttpServer { async fn init(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { log::info!("Initializing http server"); + + let (shutdown_tx, _) = tokio::sync::broadcast::channel(1); + self.shutdown_tx = Some(shutdown_tx); + let bind = format!("{}:{}", ctx.server_config.host, ctx.server_config.port); self.addr = Some( @@ -277,49 +285,72 @@ impl ServerTrait for HttpServer { } async fn run(&mut self, ctx: &ServerContext) -> anyhow::Result<()> { - let addr = match self.addr { - Some(addr) => addr, - None => panic!("cannot start tcp listener with empty address"), - }; - - let listener = match TcpListener::bind(addr).await { - Ok(listener) => listener, - Err(err) => { - panic!("failed to register tcp listener on address: {:?}", err); - } - }; + let addr = self + .addr + .expect("cannot start tcp listener with empty address"); + + let listener = TcpListener::bind(addr) + .await + .map_err(|e| anyhow::anyhow!("failed to bind {addr}: {e}"))?; + + // Create a receiver for this run loop + let shutdown_tx = self + .shutdown_tx + .as_ref() + .expect("shutdown_tx not initialized; init() must run first") + .clone(); + let mut shutdown_rx = shutdown_tx.subscribe(); loop { - let (stream, _) = match listener.accept().await { - Ok(res) => res, - Err(e) => { - panic!("listener failed to accept requests: {:?}", e); + let (stream, _) = tokio::select! { + _ = shutdown_rx.recv() => { + log::info!("HTTP server: shutdown received, stopping accept loop"); + break; + } + res = listener.accept() => { + res.map_err(|e| anyhow::anyhow!("accept failed: {e}"))? } }; let io = TokioIo::new(stream); - let store = Arc::clone(&ctx.adapter_store); - tokio::task::spawn(async move { - let store = Arc::clone(&store); + let mut conn_shutdown_rx = shutdown_tx.subscribe(); + + tokio::spawn(async move { let svc = hyper::service::service_fn(move |req| { let store = Arc::clone(&store); async move { handle_request(req, store).await } }); - if let Err(err) = http1::Builder::new().serve_connection(io, svc).await { - log::error!("Error serving connection: {:?}", err); + let conn = http1::Builder::new().serve_connection(io, svc); + + tokio::pin!(conn); + + tokio::select! { + res = conn.as_mut() => { + if let Err(err) = res { + log::error!("Error serving connection: {:?}", err); + } + } + _ = conn_shutdown_rx.recv() => { + conn.as_mut().graceful_shutdown(); + } } }); } - } + Ok(()) + } async fn shutdown( &mut self, _ctx: &ServerContext, ) -> anyhow::Result<()> { - todo!("Implement shutdown!"); + if let Some(ref tx) = self.shutdown_tx { + log::info!("Recieved a shutdown signal for Adapter Server"); + let _ = tx.send(()); + } + Ok(()) } } From 79caae503de6395096f4f0acaf25bae0f2d8fa9f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 30 Jan 2026 14:19:54 +0100 Subject: [PATCH 13/15] Update adapter/rest/src/main.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- adapter/rest/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index 18eeab4..fd07ee6 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -113,7 +113,7 @@ async fn execute_flow_to_hyper_response( match store.validate_and_execute_flow(flow, value).await { Some(result) => { - log::debug!("Recieved Result: {:?}", result); + log::debug!("Received Result: {:?}", result); let Value { kind: Some(StructValue(Struct { fields })), } = result From ca8d4dfad162f365c794b1b777d2294e7e727caa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 30 Jan 2026 14:20:09 +0100 Subject: [PATCH 14/15] Update adapter/rest/src/main.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- adapter/rest/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index fd07ee6..ec71519 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -280,7 +280,7 @@ impl ServerTrait for HttpServer { .map_err(|e| anyhow::anyhow!("Invalid bind address '{}': {}", bind, e))?, ); - log::debug!("Initizalized with Address: {:?}", self.addr); + log::debug!("Initialized with Address: {:?}", self.addr); Ok(()) } From 7100a856278eeaf0014c41992b44e983d0399b12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Raphael=20G=C3=B6tz?= <52959657+raphael-goetz@users.noreply.github.com> Date: Fri, 30 Jan 2026 15:09:38 +0100 Subject: [PATCH 15/15] Update adapter/rest/src/main.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Raphael Götz <52959657+raphael-goetz@users.noreply.github.com> --- adapter/rest/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adapter/rest/src/main.rs b/adapter/rest/src/main.rs index ec71519..847a0eb 100644 --- a/adapter/rest/src/main.rs +++ b/adapter/rest/src/main.rs @@ -347,7 +347,7 @@ impl ServerTrait for HttpServer { _ctx: &ServerContext, ) -> anyhow::Result<()> { if let Some(ref tx) = self.shutdown_tx { - log::info!("Recieved a shutdown signal for Adapter Server"); + log::info!("Received a shutdown signal for Adapter Server"); let _ = tx.send(()); }