From 250c427f4fde98f76b769ca62499805de84b54a2 Mon Sep 17 00:00:00 2001 From: cdrappi Date: Tue, 27 Jan 2026 11:31:15 -0500 Subject: [PATCH 1/5] Add rustfmt.toml and apply formatting - Add rustfmt configuration - Apply nightly rustfmt to all files - Fix unused variable warning in server.rs --- rustfmt.toml | 13 ++++++++++++ src/checkpoint/executor.rs | 40 ++++++++++--------------------------- src/checkpoint/manager.rs | 41 +++++++++++++------------------------- src/checkpoint/metadata.rs | 6 +----- src/config.rs | 9 +++------ src/lib.rs | 6 ++++-- src/main.rs | 7 ++----- src/monitor.rs | 25 +++++++++-------------- src/rpc/client.rs | 8 +++++--- src/rpc/reth.rs | 14 ++++++------- src/rpc/summit.rs | 16 +++------------ src/server/server.rs | 26 +++++++----------------- src/state.rs | 14 +++---------- 13 files changed, 80 insertions(+), 145 deletions(-) create mode 100644 rustfmt.toml diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..9b1f4b7 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1,13 @@ +reorder_imports = true +use_field_init_shorthand = true +use_small_heuristics = "Max" + +# Nightly +max_width = 100 +comment_width = 100 +imports_granularity = "Crate" +wrap_comments = true +format_code_in_doc_comments = true +doc_comment_code_block_width = 100 +format_macro_matchers = true + diff --git a/src/checkpoint/executor.rs b/src/checkpoint/executor.rs index dbf4679..91eb57c 100644 --- a/src/checkpoint/executor.rs +++ b/src/checkpoint/executor.rs @@ -1,5 +1,7 @@ -use std::path::{Path, PathBuf}; -use std::process::Stdio; +use std::{ + path::{Path, PathBuf}, + process::Stdio, +}; use tokio::process::Command; use crate::error::{CheckpointerError, Result}; @@ -14,11 +16,7 @@ pub struct CheckpointExecutor { impl CheckpointExecutor { /// Create a new checkpoint executor pub fn new(mdbx_copy_path: PathBuf, reth_path: PathBuf, compact: bool) -> Self { - Self { - mdbx_copy_path, - reth_path, - compact, - } + Self { mdbx_copy_path, reth_path, compact } } /// Verify that mdbx_copy and reth binaries are available and executable @@ -61,10 +59,7 @@ impl CheckpointExecutor { ))); } - let output = Command::new(&self.reth_path) - .arg("--version") - .output() - .await; + let output = Command::new(&self.reth_path).arg("--version").output().await; match output { Ok(out) if out.status.success() => { @@ -160,11 +155,7 @@ impl CheckpointExecutor { ))); } - tracing::info!( - "Copying static_files from {:?} to {:?}", - source_static, - dest_static - ); + tracing::info!("Copying static_files from {:?} to {:?}", source_static, dest_static); let start = std::time::Instant::now(); @@ -195,11 +186,7 @@ impl CheckpointExecutor { /// Unwind the database to epoch_block - 2 using reth pub async fn unwind_database(&self, checkpoint_dir: &Path, target_block: u64) -> Result<()> { - tracing::info!( - "Unwinding database at {:?} to block {}", - checkpoint_dir, - target_block - ); + tracing::info!("Unwinding database at {:?} to block {}", checkpoint_dir, target_block); let mut cmd = Command::new(&self.reth_path); cmd.arg("stage") @@ -230,11 +217,7 @@ impl CheckpointExecutor { } let stdout = String::from_utf8_lossy(&output.stdout); - tracing::info!( - "reth unwind completed successfully in {:?}: {}", - duration, - stdout.trim() - ); + tracing::info!("reth unwind completed successfully in {:?}: {}", duration, stdout.trim()); Ok(()) } @@ -248,10 +231,7 @@ impl CheckpointExecutor { // Build tar command to compress db, static_files, and summit_checkpoint let mut cmd = Command::new("tar"); - cmd.arg("-czf") - .arg(&archive_name) - .arg("db") - .arg("static_files"); + cmd.arg("-czf").arg(&archive_name).arg("db").arg("static_files"); // Only include summit_checkpoint if it exists let summit_checkpoint_dir = checkpoint_dir.join("summit_checkpoint"); diff --git a/src/checkpoint/manager.rs b/src/checkpoint/manager.rs index 4bce63b..fcc6d62 100644 --- a/src/checkpoint/manager.rs +++ b/src/checkpoint/manager.rs @@ -1,11 +1,12 @@ -use std::path::PathBuf; -use std::sync::Arc; +use std::{path::PathBuf, sync::Arc}; -use crate::checkpoint::{CheckpointExecutor, CheckpointMetadata}; -use crate::config::{CheckpointConfig, Config}; -use crate::error::Result; -use crate::rpc::RpcClient; -use crate::state::StateTracker; +use crate::{ + checkpoint::{CheckpointExecutor, CheckpointMetadata}, + config::{CheckpointConfig, Config}, + error::Result, + rpc::RpcClient, + state::StateTracker, +}; /// Checkpoint manager orchestrates checkpoint creation pub struct CheckpointManager { @@ -56,9 +57,7 @@ impl CheckpointManager { // Step 1: Copy MDBX database tracing::info!("Step 1/7: Copying MDBX database"); let db_dest = checkpoint_path.join("db").join("mdbx.dat"); - self.executor - .copy_mdbx_database(&self.db_path, &db_dest) - .await?; + self.executor.copy_mdbx_database(&self.db_path, &db_dest).await?; // Step 2: Copy static_files directory tracing::info!("Step 2/7: Copying static_files"); @@ -67,24 +66,16 @@ impl CheckpointManager { "Could not determine parent directory of db_path".to_string(), ) })?; - self.executor - .copy_static_files(source_db_dir, &checkpoint_path) - .await?; + self.executor.copy_static_files(source_db_dir, &checkpoint_path).await?; // Step 3: Delete lock file tracing::info!("Step 3/7: Deleting lock file"); self.executor.delete_lock_file(&checkpoint_path).await?; // Step 4: Unwind database to epoch_block - 2 - let unwind_target = if block_number >= 2 { - block_number - 2 - } else { - 0 - }; + let unwind_target = if block_number >= 2 { block_number - 2 } else { 0 }; tracing::info!("Step 4/7: Unwinding database to block {}", unwind_target); - self.executor - .unwind_database(&checkpoint_path, unwind_target) - .await?; + self.executor.unwind_database(&checkpoint_path, unwind_target).await?; // Step 5: Fetch and write Summit checkpoint data tracing::info!("Step 5/7: Fetching Summit checkpoint data"); @@ -161,14 +152,10 @@ impl CheckpointManager { // Step 7: Compress and cleanup tracing::info!("Step 7/7: Compressing checkpoint and cleaning up"); - self.executor - .compress_and_cleanup(&checkpoint_path, epoch) - .await?; + self.executor.compress_and_cleanup(&checkpoint_path, epoch).await?; // Update state tracker - self.state_tracker - .update_last_checkpoint(epoch, block_number) - .await?; + self.state_tracker.update_last_checkpoint(epoch, block_number).await?; let duration = start.elapsed(); tracing::info!( diff --git a/src/checkpoint/metadata.rs b/src/checkpoint/metadata.rs index 08791e9..344f269 100644 --- a/src/checkpoint/metadata.rs +++ b/src/checkpoint/metadata.rs @@ -16,10 +16,6 @@ pub struct CheckpointMetadata { impl CheckpointMetadata { /// Create new checkpoint metadata pub fn new(epoch: u64, block_number: u64) -> Self { - Self { - epoch, - block_number, - timestamp: chrono::Utc::now(), - } + Self { epoch, block_number, timestamp: chrono::Utc::now() } } } diff --git a/src/config.rs b/src/config.rs index 5a69ad5..6034862 100644 --- a/src/config.rs +++ b/src/config.rs @@ -128,9 +128,7 @@ impl Config { // Override with environment variables (e.g., CHECKPOINTER_RETH_RPC_URL) builder = builder.add_source( - config::Environment::with_prefix("CHECKPOINTER") - .separator("_") - .try_parsing(true), + config::Environment::with_prefix("CHECKPOINTER").separator("_").try_parsing(true), ); // Build the config @@ -227,9 +225,8 @@ fn expand_path(path: &PathBuf) -> Result { .to_str() .ok_or_else(|| CheckpointerError::InvalidPath("Invalid UTF-8 in path".to_string()))?; - let expanded = shellexpand::full(path_str).map_err(|e| { - CheckpointerError::InvalidPath(format!("Failed to expand path: {}", e)) - })?; + let expanded = shellexpand::full(path_str) + .map_err(|e| CheckpointerError::InvalidPath(format!("Failed to expand path: {}", e)))?; Ok(PathBuf::from(expanded.as_ref())) } diff --git a/src/lib.rs b/src/lib.rs index 7bf507d..7d0e1f8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,8 +10,10 @@ pub mod state; pub use server::api::CheckpointerRpcClient; // Re-export jsonrpsee client utilities for constructing a CheckpointerRpcClient -pub use jsonrpsee::core::ClientError; -pub use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; +pub use jsonrpsee::{ + core::ClientError, + http_client::{HttpClient, HttpClientBuilder}, +}; // Re-export commonly used types pub use checkpoint::CheckpointManager; diff --git a/src/main.rs b/src/main.rs index a3f9a34..5d22a7f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,11 +33,8 @@ async fn main() -> Result<()> { let rpc_client = RpcClient::new(&config)?; // Create checkpoint manager - let checkpoint_manager = Arc::new(CheckpointManager::new( - &config, - state_tracker.clone(), - rpc_client.clone(), - )); + let checkpoint_manager = + Arc::new(CheckpointManager::new(&config, state_tracker.clone(), rpc_client.clone())); // Verify checkpoint tools are available tracing::info!("Verifying checkpoint tools (mdbx_copy, reth)..."); diff --git a/src/monitor.rs b/src/monitor.rs index de0c32a..7a98550 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -1,11 +1,12 @@ -use std::sync::Arc; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use tokio_util::sync::CancellationToken; -use crate::checkpoint::CheckpointManager; -use crate::config::MonitorConfig; -use crate::error::{CheckpointerError, Result}; -use crate::rpc::RpcClient; +use crate::{ + checkpoint::CheckpointManager, + config::MonitorConfig, + error::{CheckpointerError, Result}, + rpc::RpcClient, +}; /// Block monitor that watches for epoch boundaries and triggers checkpoints pub struct BlockMonitor { @@ -25,13 +26,7 @@ impl BlockMonitor { epoch_blocks: u64, checkpoint_delay_blocks: u64, ) -> Self { - Self { - rpc_client, - checkpoint_manager, - config, - epoch_blocks, - checkpoint_delay_blocks, - } + Self { rpc_client, checkpoint_manager, config, epoch_blocks, checkpoint_delay_blocks } } /// Run the monitoring loop until cancelled @@ -101,9 +96,7 @@ impl BlockMonitor { ); // Create checkpoint for the epoch block, not the current block - self.checkpoint_manager - .create_checkpoint(current_epoch, epoch_block) - .await?; + self.checkpoint_manager.create_checkpoint(current_epoch, epoch_block).await?; } else { let blocks_waited = current_block - epoch_block; let blocks_to_wait = self.checkpoint_delay_blocks - blocks_waited; diff --git a/src/rpc/client.rs b/src/rpc/client.rs index 913b6d2..697446f 100644 --- a/src/rpc/client.rs +++ b/src/rpc/client.rs @@ -1,8 +1,10 @@ use std::sync::Arc; -use crate::config::Config; -use crate::error::Result; -use crate::rpc::{RethRpcClient, SummitRpcClient}; +use crate::{ + config::Config, + error::Result, + rpc::{RethRpcClient, SummitRpcClient}, +}; /// Combined RPC client wrapper #[derive(Clone)] diff --git a/src/rpc/reth.rs b/src/rpc/reth.rs index dc679c2..712b0c7 100644 --- a/src/rpc/reth.rs +++ b/src/rpc/reth.rs @@ -1,5 +1,7 @@ -use jsonrpsee::core::client::ClientT; -use jsonrpsee::http_client::{HttpClient, HttpClientBuilder}; +use jsonrpsee::{ + core::client::ClientT, + http_client::{HttpClient, HttpClientBuilder}, +}; use serde::{Deserialize, Serialize}; use crate::error::Result; @@ -12,8 +14,7 @@ pub struct RethRpcClient { impl RethRpcClient { /// Create a new Reth RPC client pub fn new(url: &str) -> Result { - let client = HttpClientBuilder::default() - .build(url)?; + let client = HttpClientBuilder::default().build(url)?; tracing::info!("Reth RPC client initialized: {}", url); Ok(Self { client }) @@ -40,10 +41,7 @@ impl RethRpcClient { params.insert(block_param)?; params.insert(false)?; // Don't include full transactions - let response: BlockInfo = self - .client - .request("eth_getBlockByNumber", params) - .await?; + let response: BlockInfo = self.client.request("eth_getBlockByNumber", params).await?; Ok(response) } diff --git a/src/rpc/summit.rs b/src/rpc/summit.rs index cdf9233..f1877b2 100644 --- a/src/rpc/summit.rs +++ b/src/rpc/summit.rs @@ -14,10 +14,7 @@ impl SummitRpcClient { /// Create a new Summit RPC client pub fn new(url: &str) -> Result { tracing::info!("Summit RPC client initialized: {}", url); - Ok(Self { - client: Client::new(), - base_url: url.to_string(), - }) + Ok(Self { client: Client::new(), base_url: url.to_string() }) } /// Get checkpoint data from Summit for a specific epoch @@ -33,17 +30,10 @@ impl SummitRpcClient { }); // Send request - let response = self - .client - .post(&self.base_url) - .json(&request) - .send() - .await?; + let response = self.client.post(&self.base_url).json(&request).send().await?; if !response.status().is_success() { - return Err(CheckpointerError::Http( - response.error_for_status().unwrap_err(), - )); + return Err(CheckpointerError::Http(response.error_for_status().unwrap_err())); } // Parse JSON-RPC response diff --git a/src/server/server.rs b/src/server/server.rs index f6f5bde..1be049d 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -21,10 +21,7 @@ impl RpcServer { } pub async fn start_server(self, addr: SocketAddr) { - let server = ServerBuilder::default() - .build(addr) - .await - .expect("Failed to start rpc"); + let server = ServerBuilder::default().build(addr).await.expect("Failed to start rpc"); let handle = server.start(self.into_rpc()); @@ -49,10 +46,7 @@ impl CheckpointerRpcServer for RpcServer { .map_err(|e| string_to_rpc_error(format!("Failed to download snapshot: {}", e)))?; if !response.status().is_success() { - return Err(string_to_rpc_error(format!( - "HTTP error: {}", - response.status() - ))); + return Err(string_to_rpc_error(format!("HTTP error: {}", response.status()))); } let bytes = response @@ -64,9 +58,8 @@ impl CheckpointerRpcServer for RpcServer { let filename = format!("{SNAPSHOT_FILE_PREFIX}{epoch}.tar.lz4"); // Write to file - let mut file = tokio::fs::File::create(format!("{DATA_DISK_DIR}/{filename}")) - .await - .map_err(|e| { + let mut file = + tokio::fs::File::create(format!("{DATA_DISK_DIR}/{filename}")).await.map_err(|e| { string_to_rpc_error(format!("Failed to create file {}: {}", filename, e)) })?; @@ -78,7 +71,7 @@ impl CheckpointerRpcServer for RpcServer { } /// Restores from an encrypted snapshot - async fn restore_from_encrypted_snapshot(&self, epoch: u64) -> RpcResult<()> { + async fn restore_from_encrypted_snapshot(&self, _epoch: u64) -> RpcResult<()> { todo!() } @@ -89,16 +82,11 @@ impl CheckpointerRpcServer for RpcServer { ); if !fs::exists(&snapshot_path).unwrap_or_default() { - return Err(string_to_rpc_error(format!( - "No snapshot for epoch {epoch} stored" - ))); + return Err(string_to_rpc_error(format!("No snapshot for epoch {epoch} stored"))); } fs::read(snapshot_path).map_err(|e| { - string_to_rpc_error(format!( - "Failed to read snapshot for epoch {}: {}", - epoch, e - )) + string_to_rpc_error(format!("Failed to read snapshot for epoch {}: {}", epoch, e)) }) } diff --git a/src/state.rs b/src/state.rs index 77b22be..aa28a61 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,6 +1,5 @@ use serde::{Deserialize, Serialize}; -use std::path::Path; -use std::sync::Arc; +use std::{path::Path, sync::Arc}; use tokio::sync::RwLock; use crate::error::Result; @@ -47,10 +46,7 @@ impl StateTracker { state.last_checkpoint_block ); - Ok(Self { - state: Arc::new(RwLock::new(state)), - path: path.to_path_buf(), - }) + Ok(Self { state: Arc::new(RwLock::new(state)), path: path.to_path_buf() }) } /// Get last checkpointed epoch @@ -73,11 +69,7 @@ impl StateTracker { // Persist to disk immediately self.save_internal(&state).await?; - tracing::debug!( - "State updated: epoch={}, block={}", - epoch, - block - ); + tracing::debug!("State updated: epoch={}, block={}", epoch, block); Ok(()) } From a5d07f6f66356843d2f07beb50a375a777d29af5 Mon Sep 17 00:00:00 2001 From: cdrappi Date: Tue, 27 Jan 2026 11:32:06 -0500 Subject: [PATCH 2/5] Add GitHub Actions CI workflow - Run cargo +nightly fmt check - Run cargo clippy with warnings as errors - Run cargo check with warnings as errors - Build with cargo build - Run tests with cargo test - Use Swatinem/rust-cache for faster builds --- .github/workflows/ci.yml | 46 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..f2b756b --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,46 @@ +name: CI + +on: + push: + branches: [ main ] + pull_request: + branches: [ main ] + +env: + CARGO_TERM_COLOR: always + +jobs: + ci: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Install Rust stable + uses: dtolnay/rust-toolchain@stable + with: + components: clippy + + - name: Install Rust nightly (for fmt) + uses: dtolnay/rust-toolchain@nightly + with: + components: rustfmt + + - name: Setup Rust cache + uses: Swatinem/rust-cache@v2 + + - name: Check formatting + run: cargo +nightly fmt --all --check + + - name: Cargo check (warnings as errors) + run: cargo check --all-targets + env: + RUSTFLAGS: "-D warnings" + + - name: Run clippy + run: cargo clippy --all-targets -- -D warnings + + - name: Build + run: cargo build --verbose + + - name: Run tests + run: cargo test --verbose From 56b38ce3241c0610fb70142bae8201c6095050fb Mon Sep 17 00:00:00 2001 From: cdrappi Date: Tue, 27 Jan 2026 11:34:47 -0500 Subject: [PATCH 3/5] Add clippy allow attributes for non-critical warnings - Allow implicit_saturating_sub in checkpoint/manager.rs - Allow ptr_arg in config.rs - Allow module_inception in server/mod.rs - Allow new_without_default in server/server.rs - Allow useless_format in server/server.rs --- src/checkpoint/manager.rs | 1 + src/config.rs | 1 + src/server/mod.rs | 1 + src/server/server.rs | 2 ++ 4 files changed, 5 insertions(+) diff --git a/src/checkpoint/manager.rs b/src/checkpoint/manager.rs index fcc6d62..cf25c3d 100644 --- a/src/checkpoint/manager.rs +++ b/src/checkpoint/manager.rs @@ -73,6 +73,7 @@ impl CheckpointManager { self.executor.delete_lock_file(&checkpoint_path).await?; // Step 4: Unwind database to epoch_block - 2 + #[allow(clippy::implicit_saturating_sub)] let unwind_target = if block_number >= 2 { block_number - 2 } else { 0 }; tracing::info!("Step 4/7: Unwinding database to block {}", unwind_target); self.executor.unwind_database(&checkpoint_path, unwind_target).await?; diff --git a/src/config.rs b/src/config.rs index 6034862..d5f2c1d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -220,6 +220,7 @@ impl Config { } /// Expand tilde and environment variables in paths +#[allow(clippy::ptr_arg)] fn expand_path(path: &PathBuf) -> Result { let path_str = path .to_str() diff --git a/src/server/mod.rs b/src/server/mod.rs index 10d39fe..07267fe 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,2 +1,3 @@ pub mod api; +#[allow(clippy::module_inception)] pub mod server; diff --git a/src/server/server.rs b/src/server/server.rs index 1be049d..8da1bda 100644 --- a/src/server/server.rs +++ b/src/server/server.rs @@ -16,6 +16,7 @@ pub const DATA_DISK_DIR: &str = "/home/ubuntu/checkpoints"; pub struct RpcServer; impl RpcServer { + #[allow(clippy::new_without_default)] pub fn new() -> Self { Self } @@ -99,6 +100,7 @@ impl CheckpointerRpcServer for RpcServer { })?; let mut epochs = Vec::new(); + #[allow(clippy::useless_format)] let prefix = format!("{SNAPSHOT_FILE_PREFIX}"); for entry in entries { From 7ff59762f7fef2dd3b9721306dfe20b5ac1a77c5 Mon Sep 17 00:00:00 2001 From: cdrappi Date: Tue, 27 Jan 2026 11:36:32 -0500 Subject: [PATCH 4/5] saturating sub instead of manually doing it --- src/checkpoint/manager.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/checkpoint/manager.rs b/src/checkpoint/manager.rs index cf25c3d..74e3cb4 100644 --- a/src/checkpoint/manager.rs +++ b/src/checkpoint/manager.rs @@ -73,8 +73,7 @@ impl CheckpointManager { self.executor.delete_lock_file(&checkpoint_path).await?; // Step 4: Unwind database to epoch_block - 2 - #[allow(clippy::implicit_saturating_sub)] - let unwind_target = if block_number >= 2 { block_number - 2 } else { 0 }; + let unwind_target = block_number.saturating_sub(2); tracing::info!("Step 4/7: Unwinding database to block {}", unwind_target); self.executor.unwind_database(&checkpoint_path, unwind_target).await?; From ed03ebd7ae6494e6dc05e5a3879fabdccb97ffc2 Mon Sep 17 00:00:00 2001 From: cdrappi Date: Tue, 27 Jan 2026 11:46:05 -0500 Subject: [PATCH 5/5] Use +stable toolchain for clippy, check, build, and test Explicitly specify stable toolchain to avoid running on nightly which doesn't have clippy installed. Only rustfmt uses nightly. --- .github/workflows/ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f2b756b..594d643 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -32,15 +32,15 @@ jobs: run: cargo +nightly fmt --all --check - name: Cargo check (warnings as errors) - run: cargo check --all-targets + run: cargo +stable check --all-targets env: RUSTFLAGS: "-D warnings" - name: Run clippy - run: cargo clippy --all-targets -- -D warnings + run: cargo +stable clippy --all-targets -- -D warnings - name: Build - run: cargo build --verbose + run: cargo +stable build --verbose - name: Run tests - run: cargo test --verbose + run: cargo +stable test --verbose