Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: CI

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

env:
CARGO_TERM_COLOR: always

jobs:
ci:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4

- name: Install Rust stable
uses: dtolnay/rust-toolchain@stable
with:
components: clippy

- name: Install Rust nightly (for fmt)
uses: dtolnay/rust-toolchain@nightly
with:
components: rustfmt

- name: Setup Rust cache
uses: Swatinem/rust-cache@v2

- name: Check formatting
run: cargo +nightly fmt --all --check

- name: Cargo check (warnings as errors)
run: cargo +stable check --all-targets
env:
RUSTFLAGS: "-D warnings"

- name: Run clippy
run: cargo +stable clippy --all-targets -- -D warnings

- name: Build
run: cargo +stable build --verbose

- name: Run tests
run: cargo +stable test --verbose
13 changes: 13 additions & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
reorder_imports = true
use_field_init_shorthand = true
use_small_heuristics = "Max"

# Nightly
max_width = 100
comment_width = 100
imports_granularity = "Crate"
wrap_comments = true
format_code_in_doc_comments = true
doc_comment_code_block_width = 100
format_macro_matchers = true

40 changes: 10 additions & 30 deletions src/checkpoint/executor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::{
path::{Path, PathBuf},
process::Stdio,
};
use tokio::process::Command;

use crate::error::{CheckpointerError, Result};
Expand All @@ -14,11 +16,7 @@ pub struct CheckpointExecutor {
impl CheckpointExecutor {
/// Create a new checkpoint executor
pub fn new(mdbx_copy_path: PathBuf, reth_path: PathBuf, compact: bool) -> Self {
Self {
mdbx_copy_path,
reth_path,
compact,
}
Self { mdbx_copy_path, reth_path, compact }
}

/// Verify that mdbx_copy and reth binaries are available and executable
Expand Down Expand Up @@ -61,10 +59,7 @@ impl CheckpointExecutor {
)));
}

let output = Command::new(&self.reth_path)
.arg("--version")
.output()
.await;
let output = Command::new(&self.reth_path).arg("--version").output().await;

match output {
Ok(out) if out.status.success() => {
Expand Down Expand Up @@ -160,11 +155,7 @@ impl CheckpointExecutor {
)));
}

tracing::info!(
"Copying static_files from {:?} to {:?}",
source_static,
dest_static
);
tracing::info!("Copying static_files from {:?} to {:?}", source_static, dest_static);

let start = std::time::Instant::now();

Expand Down Expand Up @@ -195,11 +186,7 @@ impl CheckpointExecutor {

/// Unwind the database to epoch_block - 2 using reth
pub async fn unwind_database(&self, checkpoint_dir: &Path, target_block: u64) -> Result<()> {
tracing::info!(
"Unwinding database at {:?} to block {}",
checkpoint_dir,
target_block
);
tracing::info!("Unwinding database at {:?} to block {}", checkpoint_dir, target_block);

let mut cmd = Command::new(&self.reth_path);
cmd.arg("stage")
Expand Down Expand Up @@ -230,11 +217,7 @@ impl CheckpointExecutor {
}

let stdout = String::from_utf8_lossy(&output.stdout);
tracing::info!(
"reth unwind completed successfully in {:?}: {}",
duration,
stdout.trim()
);
tracing::info!("reth unwind completed successfully in {:?}: {}", duration, stdout.trim());

Ok(())
}
Expand All @@ -248,10 +231,7 @@ impl CheckpointExecutor {

// Build tar command to compress db, static_files, and summit_checkpoint
let mut cmd = Command::new("tar");
cmd.arg("-czf")
.arg(&archive_name)
.arg("db")
.arg("static_files");
cmd.arg("-czf").arg(&archive_name).arg("db").arg("static_files");

// Only include summit_checkpoint if it exists
let summit_checkpoint_dir = checkpoint_dir.join("summit_checkpoint");
Expand Down
41 changes: 14 additions & 27 deletions src/checkpoint/manager.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::{path::PathBuf, sync::Arc};

use crate::checkpoint::{CheckpointExecutor, CheckpointMetadata};
use crate::config::{CheckpointConfig, Config};
use crate::error::Result;
use crate::rpc::RpcClient;
use crate::state::StateTracker;
use crate::{
checkpoint::{CheckpointExecutor, CheckpointMetadata},
config::{CheckpointConfig, Config},
error::Result,
rpc::RpcClient,
state::StateTracker,
};

/// Checkpoint manager orchestrates checkpoint creation
pub struct CheckpointManager {
Expand Down Expand Up @@ -56,9 +57,7 @@ impl CheckpointManager {
// Step 1: Copy MDBX database
tracing::info!("Step 1/7: Copying MDBX database");
let db_dest = checkpoint_path.join("db").join("mdbx.dat");
self.executor
.copy_mdbx_database(&self.db_path, &db_dest)
.await?;
self.executor.copy_mdbx_database(&self.db_path, &db_dest).await?;

// Step 2: Copy static_files directory
tracing::info!("Step 2/7: Copying static_files");
Expand All @@ -67,24 +66,16 @@ impl CheckpointManager {
"Could not determine parent directory of db_path".to_string(),
)
})?;
self.executor
.copy_static_files(source_db_dir, &checkpoint_path)
.await?;
self.executor.copy_static_files(source_db_dir, &checkpoint_path).await?;

// Step 3: Delete lock file
tracing::info!("Step 3/7: Deleting lock file");
self.executor.delete_lock_file(&checkpoint_path).await?;

// Step 4: Unwind database to epoch_block - 2
let unwind_target = if block_number >= 2 {
block_number - 2
} else {
0
};
let unwind_target = block_number.saturating_sub(2);
tracing::info!("Step 4/7: Unwinding database to block {}", unwind_target);
self.executor
.unwind_database(&checkpoint_path, unwind_target)
.await?;
self.executor.unwind_database(&checkpoint_path, unwind_target).await?;

// Step 5: Fetch and write Summit checkpoint data
tracing::info!("Step 5/7: Fetching Summit checkpoint data");
Expand Down Expand Up @@ -161,14 +152,10 @@ impl CheckpointManager {

// Step 7: Compress and cleanup
tracing::info!("Step 7/7: Compressing checkpoint and cleaning up");
self.executor
.compress_and_cleanup(&checkpoint_path, epoch)
.await?;
self.executor.compress_and_cleanup(&checkpoint_path, epoch).await?;

// Update state tracker
self.state_tracker
.update_last_checkpoint(epoch, block_number)
.await?;
self.state_tracker.update_last_checkpoint(epoch, block_number).await?;

let duration = start.elapsed();
tracing::info!(
Expand Down
6 changes: 1 addition & 5 deletions src/checkpoint/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@ pub struct CheckpointMetadata {
impl CheckpointMetadata {
/// Create new checkpoint metadata
pub fn new(epoch: u64, block_number: u64) -> Self {
Self {
epoch,
block_number,
timestamp: chrono::Utc::now(),
}
Self { epoch, block_number, timestamp: chrono::Utc::now() }
}
}
10 changes: 4 additions & 6 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ impl Config {

// Override with environment variables (e.g., CHECKPOINTER_RETH_RPC_URL)
builder = builder.add_source(
config::Environment::with_prefix("CHECKPOINTER")
.separator("_")
.try_parsing(true),
config::Environment::with_prefix("CHECKPOINTER").separator("_").try_parsing(true),
);

// Build the config
Expand Down Expand Up @@ -222,14 +220,14 @@ impl Config {
}

/// Expand tilde and environment variables in paths
#[allow(clippy::ptr_arg)]
fn expand_path(path: &PathBuf) -> Result<PathBuf> {
let path_str = path
.to_str()
.ok_or_else(|| CheckpointerError::InvalidPath("Invalid UTF-8 in path".to_string()))?;

let expanded = shellexpand::full(path_str).map_err(|e| {
CheckpointerError::InvalidPath(format!("Failed to expand path: {}", e))
})?;
let expanded = shellexpand::full(path_str)
.map_err(|e| CheckpointerError::InvalidPath(format!("Failed to expand path: {}", e)))?;

Ok(PathBuf::from(expanded.as_ref()))
}
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ pub mod state;
pub use server::api::CheckpointerRpcClient;

// Re-export jsonrpsee client utilities for constructing a CheckpointerRpcClient
pub use jsonrpsee::core::ClientError;
pub use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
pub use jsonrpsee::{
core::ClientError,
http_client::{HttpClient, HttpClientBuilder},
};

// Re-export commonly used types
pub use checkpoint::CheckpointManager;
Expand Down
7 changes: 2 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,8 @@ async fn main() -> Result<()> {
let rpc_client = RpcClient::new(&config)?;

// Create checkpoint manager
let checkpoint_manager = Arc::new(CheckpointManager::new(
&config,
state_tracker.clone(),
rpc_client.clone(),
));
let checkpoint_manager =
Arc::new(CheckpointManager::new(&config, state_tracker.clone(), rpc_client.clone()));

// Verify checkpoint tools are available
tracing::info!("Verifying checkpoint tools (mdbx_copy, reth)...");
Expand Down
25 changes: 9 additions & 16 deletions src/monitor.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::sync::Arc;
use std::time::Duration;
use std::{sync::Arc, time::Duration};
use tokio_util::sync::CancellationToken;

use crate::checkpoint::CheckpointManager;
use crate::config::MonitorConfig;
use crate::error::{CheckpointerError, Result};
use crate::rpc::RpcClient;
use crate::{
checkpoint::CheckpointManager,
config::MonitorConfig,
error::{CheckpointerError, Result},
rpc::RpcClient,
};

/// Block monitor that watches for epoch boundaries and triggers checkpoints
pub struct BlockMonitor {
Expand All @@ -25,13 +26,7 @@ impl BlockMonitor {
epoch_blocks: u64,
checkpoint_delay_blocks: u64,
) -> Self {
Self {
rpc_client,
checkpoint_manager,
config,
epoch_blocks,
checkpoint_delay_blocks,
}
Self { rpc_client, checkpoint_manager, config, epoch_blocks, checkpoint_delay_blocks }
}

/// Run the monitoring loop until cancelled
Expand Down Expand Up @@ -101,9 +96,7 @@ impl BlockMonitor {
);

// Create checkpoint for the epoch block, not the current block
self.checkpoint_manager
.create_checkpoint(current_epoch, epoch_block)
.await?;
self.checkpoint_manager.create_checkpoint(current_epoch, epoch_block).await?;
} else {
let blocks_waited = current_block - epoch_block;
let blocks_to_wait = self.checkpoint_delay_blocks - blocks_waited;
Expand Down
8 changes: 5 additions & 3 deletions src/rpc/client.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::sync::Arc;

use crate::config::Config;
use crate::error::Result;
use crate::rpc::{RethRpcClient, SummitRpcClient};
use crate::{
config::Config,
error::Result,
rpc::{RethRpcClient, SummitRpcClient},
};

/// Combined RPC client wrapper
#[derive(Clone)]
Expand Down
14 changes: 6 additions & 8 deletions src/rpc/reth.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::{HttpClient, HttpClientBuilder};
use jsonrpsee::{
core::client::ClientT,
http_client::{HttpClient, HttpClientBuilder},
};
use serde::{Deserialize, Serialize};

use crate::error::Result;
Expand All @@ -12,8 +14,7 @@ pub struct RethRpcClient {
impl RethRpcClient {
/// Create a new Reth RPC client
pub fn new(url: &str) -> Result<Self> {
let client = HttpClientBuilder::default()
.build(url)?;
let client = HttpClientBuilder::default().build(url)?;

tracing::info!("Reth RPC client initialized: {}", url);
Ok(Self { client })
Expand All @@ -40,10 +41,7 @@ impl RethRpcClient {
params.insert(block_param)?;
params.insert(false)?; // Don't include full transactions

let response: BlockInfo = self
.client
.request("eth_getBlockByNumber", params)
.await?;
let response: BlockInfo = self.client.request("eth_getBlockByNumber", params).await?;

Ok(response)
}
Expand Down
Loading