From 18545b99867239463c80c24284c6dc2bee498474 Mon Sep 17 00:00:00 2001 From: Pablo Mendez Date: Fri, 16 Jan 2026 12:15:42 +0100 Subject: [PATCH 01/42] Refactor to integrate SnapshotsAdapter and streamline environment management --- cmd/main.go | 28 ++-- .../apis/snapshots/snapshots_adapter.go | 136 ++++++++++++++++++ .../adapters/composite/cleaner/cleaner.go | 45 ++---- internal/adapters/composite/composite.go | 18 ++- .../adapters/composite/ensurer/ensurer.go | 45 +++--- internal/application/domain/stakers.go | 41 +++--- internal/application/ports/ports.go | 4 +- internal/application/services/test_runner.go | 8 +- 8 files changed, 216 insertions(+), 109 deletions(-) create mode 100644 internal/adapters/apis/snapshots/snapshots_adapter.go diff --git a/cmd/main.go b/cmd/main.go index 4fbcbe6..30b392d 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -7,9 +7,8 @@ import ( "clients-test/internal/adapters/apis/docker" "clients-test/internal/adapters/apis/execution" "clients-test/internal/adapters/apis/ipfs" - "clients-test/internal/adapters/apis/tropidatooor" + "clients-test/internal/adapters/apis/snapshots" "clients-test/internal/adapters/composite" - "clients-test/internal/adapters/system/mount" "clients-test/internal/application/domain" "clients-test/internal/application/services" "clients-test/internal/logger" @@ -33,12 +32,11 @@ func main() { // CLI flags ipfsGatewayUrl := flag.String("ipfs-gateway-url", "", "IPFS gateway URL (required)") - tropidatooorUrl := flag.String("tropidatooor-url", "", "Tropidatooor API URL (required)") ipfsHash := flag.String("ipfs-hash", "", "IPFS hash for the test package (required)") flag.Parse() - if *ipfsGatewayUrl == "" || *tropidatooorUrl == "" || *ipfsHash == "" { - logger.FatalWithPrefix(logPrefix, "All flags --ipfs-gateway-url, --tropidatooor-url, and --ipfs-hash are required.") + if *ipfsGatewayUrl == "" || *ipfsHash == "" { + logger.FatalWithPrefix(logPrefix, "All flags --ipfs-gateway-url and --ipfs-hash are required.") } ctx := context.Background() @@ -56,15 +54,8 @@ func main() { // print the staker config for debugging with each item on a new line printStakerConfig(logPrefix, stakerConfig) - // Get mount path - tropidatooorAdapter := tropidatooor.NewTropidatooorAdapter(*tropidatooorUrl) - mountConfig, err := tropidatooorAdapter.DataRequest(ctx, stakerConfig.DataBackendName) - if err != nil { - logger.FatalWithPrefix(logPrefix, "Failed to get mount path: %v", err) - } - // Initialize API adapters - mountAdapter := mount.NewMountAdapter() + snapshotsAdapter := snapshots.NewSnapshotsAdapter() dappManagerAdapter := dappmanager.NewDappManagerAdapter() brainAdapter := brain.NewBrainAdapter(stakerConfig.Urls.BrainURL) beaconchainAdapter := beaconchain.NewBeaconchainAdapter(stakerConfig.Urls.BeaconchainURL) @@ -78,9 +69,8 @@ func main() { compositeAdapter := composite.NewCompositeAdapter( dappManagerAdapter, brainAdapter, - tropidatooorAdapter, dockerAdapter, - mountAdapter, + snapshotsAdapter, beaconchainAdapter, executionAdapter, ipfsAdapter, @@ -90,7 +80,7 @@ func main() { go func() { sig := <-sigs logger.InfoWithPrefix(logPrefix, "Received signal: %v, running cleanup...", sig) - err := compositeAdapter.CleanEnvironment(ctx, stakerConfig, *mountConfig) + err := compositeAdapter.CleanEnvironment(ctx, stakerConfig) if err != nil { logger.ErrorWithPrefix(logPrefix, "Cleanup failed: %v", err) } else { @@ -103,7 +93,7 @@ func main() { // Initialize and run the service testRunner := services.NewTestRunner(compositeAdapter) - if err := testRunner.RunTest(ctx, *mountConfig, stakerConfig, pkg); err != nil { + if err := testRunner.RunTest(ctx, stakerConfig, pkg); err != nil { logger.FatalWithPrefix(logPrefix, "Test run failed: %v", err) } @@ -120,7 +110,7 @@ func printStakerConfig(prefix string, sc domain.StakerConfig) { MevBoostDnpName: %s Network: %s ExecutionContainerName: %s - DataBackendName: %s + ExecutionClientShortName: %s Urls: ExecutionURL: %s BrainURL: %s @@ -133,7 +123,7 @@ func printStakerConfig(prefix string, sc domain.StakerConfig) { sc.MevBoostDnpName, sc.Network, sc.ExecutionContainerName, - sc.DataBackendName, + sc.ExecutionClientShortName, sc.Urls.ExecutionURL, sc.Urls.BrainURL, sc.Urls.BeaconchainURL, diff --git a/internal/adapters/apis/snapshots/snapshots_adapter.go b/internal/adapters/apis/snapshots/snapshots_adapter.go new file mode 100644 index 0000000..af6470e --- /dev/null +++ b/internal/adapters/apis/snapshots/snapshots_adapter.go @@ -0,0 +1,136 @@ +package snapshots + +import ( + "context" + "fmt" + "io" + "net/http" + "os/exec" + "strings" + + "clients-test/internal/logger" +) + +const ( + defaultBaseURL = "https://snapshots.ethpandaops.io" + alpineImage = "alpine:latest" +) + +type SnapshotsAdapter struct { + baseURL string + logPrefix string +} + +func NewSnapshotsAdapter() *SnapshotsAdapter { + return &SnapshotsAdapter{ + baseURL: defaultBaseURL, + logPrefix: "SnapshotsAdapter", + } +} + +func NewSnapshotsAdapterWithURL(baseURL string) *SnapshotsAdapter { + return &SnapshotsAdapter{ + baseURL: baseURL, + logPrefix: "SnapshotsAdapter", + } +} + +// GetLatestBlockNumber fetches the latest available block number for a given network and client +func (s *SnapshotsAdapter) GetLatestBlockNumber(ctx context.Context, network, client string) (string, error) { + url := fmt.Sprintf("%s/%s/%s/latest", s.baseURL, network, client) + logger.DebugWithPrefix(s.logPrefix, "Fetching latest block number from %s", url) + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return "", fmt.Errorf("failed to create request: %w", err) + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return "", fmt.Errorf("failed to fetch latest block number: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("failed to fetch latest block number: status %s", resp.Status) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", fmt.Errorf("failed to read response body: %w", err) + } + + blockNumber := strings.TrimSpace(string(body)) + logger.DebugWithPrefix(s.logPrefix, "Latest block number for %s/%s: %s", network, client, blockNumber) + return blockNumber, nil +} + +// DownloadAndExtract downloads the snapshot for the given network/client and extracts it to targetPath +// It uses a Docker container with alpine to handle the download and extraction +func (s *SnapshotsAdapter) DownloadAndExtract(ctx context.Context, network, client, targetPath string) error { + logger.InfoWithPrefix(s.logPrefix, "Starting snapshot download for %s/%s to %s", network, client, targetPath) + + // Get the latest block number first + blockNumber, err := s.GetLatestBlockNumber(ctx, network, client) + if err != nil { + return fmt.Errorf("failed to get latest block number: %w", err) + } + + snapshotURL := fmt.Sprintf("%s/%s/%s/%s/snapshot.tar.zst", s.baseURL, network, client, blockNumber) + logger.InfoWithPrefix(s.logPrefix, "Downloading snapshot from %s", snapshotURL) + + // Build the shell command to run inside the alpine container + // This installs necessary tools, downloads the snapshot, and extracts it + shellScript := fmt.Sprintf(` +set -e +apk add --no-cache wget curl tar zstd +echo "Downloading snapshot for block number: %s" +wget --tries=0 --retry-connrefused -O - %s | tar -I zstd -xvf - -C /data +echo "Snapshot extraction complete" +`, blockNumber, snapshotURL) + + // Run docker command + // docker run --rm -v :/data alpine /bin/sh -c '