diff --git a/example_config.yaml b/example_config.yaml index 51b21ff..03b4efd 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -51,6 +51,7 @@ processors: database: "default" table: "canonical_execution_transaction_structlog" # debug: false # Enable debug logging for ClickHouse queries + # maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2) # Channel-based batching configuration for memory-efficient processing # bigTransactionThreshold: 500000 # Transactions with more structlogs are considered "big" (default: 500000) # chunkSize: 10000 # Number of structlogs per batch (default: 10000) @@ -69,6 +70,7 @@ processors: database: "default" table: "execution_transaction" # debug: false # Enable debug logging for ClickHouse queries + # maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2) # Application settings shutdownTimeout: 6m diff --git a/pkg/clickhouse/client.go b/pkg/clickhouse/client.go index d36e78c..798a9b9 100644 --- a/pkg/clickhouse/client.go +++ b/pkg/clickhouse/client.go @@ -437,6 +437,48 @@ func (c *Client) QueryMinMaxUInt64(ctx context.Context, query string) (minVal, m return minVal, maxVal, nil } +// QueryUInt64Slice executes a query and returns all UInt64 values from the specified column. +// Returns an empty slice if no rows are found. +func (c *Client) QueryUInt64Slice(ctx context.Context, query string, columnName string) ([]uint64, error) { + start := time.Now() + operation := "query_uint64_slice" + status := statusSuccess + + defer func() { + c.recordMetrics(operation, status, time.Since(start), query) + }() + + result := make([]uint64, 0) + col := new(proto.ColUInt64) + + err := c.doWithRetry(ctx, operation, func(attemptCtx context.Context) error { + col.Reset() + + result = result[:0] + + return c.pool.Do(attemptCtx, ch.Query{ + Body: query, + Result: proto.Results{ + {Name: columnName, Data: col}, + }, + OnResult: func(ctx context.Context, block proto.Block) error { + for i := 0; i < col.Rows(); i++ { + result = append(result, col.Row(i)) + } + + return nil + }, + }) + }) + if err != nil { + status = statusFailed + + return nil, fmt.Errorf("query failed: %w", err) + } + + return result, nil +} + // Execute runs a query without expecting results. func (c *Client) Execute(ctx context.Context, query string) error { start := time.Now() diff --git a/pkg/clickhouse/interface.go b/pkg/clickhouse/interface.go index ff506d6..b93cc2a 100644 --- a/pkg/clickhouse/interface.go +++ b/pkg/clickhouse/interface.go @@ -28,4 +28,7 @@ type ClientInterface interface { // The query must return columns named "min" and "max". // Returns nil for both values if no rows are found. QueryMinMaxUInt64(ctx context.Context, query string) (minVal, maxVal *uint64, err error) + // QueryUInt64Slice executes a query and returns all UInt64 values from the specified column. + // Returns an empty slice if no rows are found. + QueryUInt64Slice(ctx context.Context, query string, columnName string) ([]uint64, error) } diff --git a/pkg/ethereum/errors.go b/pkg/ethereum/errors.go new file mode 100644 index 0000000..c76d8b5 --- /dev/null +++ b/pkg/ethereum/errors.go @@ -0,0 +1,18 @@ +package ethereum + +import "errors" + +// Sentinel errors for Ethereum client operations. +var ( + // ErrNoHealthyNode indicates no healthy execution node is available. + ErrNoHealthyNode = errors.New("no healthy execution node available") + + // ErrBlockNotFound indicates a block was not found on the execution client. + ErrBlockNotFound = errors.New("block not found") + + // ErrTransactionNotFound indicates a transaction was not found. + ErrTransactionNotFound = errors.New("transaction not found") + + // ErrUnsupportedChainID indicates an unsupported chain ID was provided. + ErrUnsupportedChainID = errors.New("unsupported chain ID") +) diff --git a/pkg/processor/common/interfaces.go b/pkg/processor/common/interfaces.go deleted file mode 100644 index 575f867..0000000 --- a/pkg/processor/common/interfaces.go +++ /dev/null @@ -1,41 +0,0 @@ -package common - -import ( - "context" - - "github.com/hibiken/asynq" -) - -// Processor defines the main interface for block processors. -type Processor interface { - Start(ctx context.Context) error - Stop(ctx context.Context) error - Name() string -} - -// BlockProcessor handles block discovery and processing. -type BlockProcessor interface { - Processor - ProcessNextBlock(ctx context.Context) error - - // Queue management - GetQueues() []QueueInfo - GetHandlers() map[string]asynq.HandlerFunc - - // Task enqueueing (for internal use) - EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error - - // Processing mode configuration - SetProcessingMode(mode string) -} - -// QueueInfo contains information about a processor queue. -type QueueInfo struct { - Name string - Priority int -} - -const ( - BACKWARDS_MODE = "backwards" - FORWARDS_MODE = "forwards" -) diff --git a/pkg/processor/config.go b/pkg/processor/config.go index d64fe57..47be50e 100644 --- a/pkg/processor/config.go +++ b/pkg/processor/config.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "github.com/ethpandaops/execution-processor/pkg/processor/common" + "github.com/ethpandaops/execution-processor/pkg/processor/tracker" "github.com/ethpandaops/execution-processor/pkg/processor/transaction/simple" "github.com/ethpandaops/execution-processor/pkg/processor/transaction/structlog" ) @@ -55,28 +55,28 @@ type WorkerConfig struct { func (c *Config) Validate() error { if c.Interval == 0 { - c.Interval = 10 * time.Second + c.Interval = DefaultInterval } if c.Mode == "" { - c.Mode = common.FORWARDS_MODE + c.Mode = tracker.FORWARDS_MODE } - if c.Mode != common.FORWARDS_MODE && c.Mode != common.BACKWARDS_MODE { - return fmt.Errorf("invalid mode %s, must be '%s' or '%s'", c.Mode, common.FORWARDS_MODE, common.BACKWARDS_MODE) + if c.Mode != tracker.FORWARDS_MODE && c.Mode != tracker.BACKWARDS_MODE { + return fmt.Errorf("invalid mode %s, must be '%s' or '%s'", c.Mode, tracker.FORWARDS_MODE, tracker.BACKWARDS_MODE) } if c.Concurrency == 0 { - c.Concurrency = 20 + c.Concurrency = DefaultConcurrency } // Queue control defaults if c.MaxProcessQueueSize == 0 { - c.MaxProcessQueueSize = 1000 + c.MaxProcessQueueSize = DefaultMaxProcessQueue } if c.BackpressureHysteresis == 0 { - c.BackpressureHysteresis = 0.8 + c.BackpressureHysteresis = DefaultBackpressureHysteresis } // Set leader election defaults @@ -86,11 +86,11 @@ func (c *Config) Validate() error { } if c.LeaderElection.TTL == 0 { - c.LeaderElection.TTL = 10 * time.Second + c.LeaderElection.TTL = DefaultLeaderTTL } if c.LeaderElection.RenewalInterval == 0 { - c.LeaderElection.RenewalInterval = 3 * time.Second + c.LeaderElection.RenewalInterval = DefaultLeaderRenewalInterval } // Validate leader election settings diff --git a/pkg/processor/defaults.go b/pkg/processor/defaults.go new file mode 100644 index 0000000..bd9e5dc --- /dev/null +++ b/pkg/processor/defaults.go @@ -0,0 +1,27 @@ +package processor + +import "time" + +// Default configuration values for the processor manager. +// Processor-specific defaults are in the tracker package. +const ( + // DefaultInterval is the default interval between processing cycles. + DefaultInterval = 10 * time.Second + + // DefaultConcurrency is the default number of concurrent workers for task processing. + DefaultConcurrency = 20 + + // DefaultMaxProcessQueue is the default max queue size for asynq. + DefaultMaxProcessQueue = 1000 + + // DefaultBackpressureHysteresis is the default hysteresis factor for backpressure. + // When backpressure is triggered, it won't be released until queue size drops + // to this fraction of the threshold. + DefaultBackpressureHysteresis = 0.8 + + // DefaultLeaderTTL is the default TTL for leader election locks. + DefaultLeaderTTL = 10 * time.Second + + // DefaultLeaderRenewalInterval is the default renewal interval for leader election. + DefaultLeaderRenewalInterval = 3 * time.Second +) diff --git a/pkg/processor/manager.go b/pkg/processor/manager.go index cf5e2ac..01b771e 100644 --- a/pkg/processor/manager.go +++ b/pkg/processor/manager.go @@ -1,3 +1,26 @@ +// Package processor coordinates block processing across multiple processor types. +// +// # Architecture Overview +// +// The Manager is the central coordinator that: +// - Discovers new blocks from execution nodes +// - Dispatches processing tasks to registered processors +// - Manages distributed task queues via Redis/Asynq +// - Implements leader election for multi-instance deployments +// - Provides backpressure control via queue monitoring +// +// # Processing Flow +// +// 1. Manager.Start() initializes processors and begins the processing loop +// 2. processBlocks() is called on each interval (default 10s) +// 3. Each processor's ProcessNextBlock() discovers and enqueues tasks +// 4. Asynq workers (potentially distributed) process the tasks +// 5. Completion tracking marks blocks done when all tasks finish +// +// # Leader Election +// +// When multiple instances run, only the leader performs block discovery. +// All instances can process tasks as Asynq workers. package processor import ( @@ -11,7 +34,7 @@ import ( "github.com/ethpandaops/execution-processor/pkg/common" "github.com/ethpandaops/execution-processor/pkg/ethereum" "github.com/ethpandaops/execution-processor/pkg/leaderelection" - c "github.com/ethpandaops/execution-processor/pkg/processor/common" + "github.com/ethpandaops/execution-processor/pkg/processor/tracker" transaction_simple "github.com/ethpandaops/execution-processor/pkg/processor/transaction/simple" transaction_structlog "github.com/ethpandaops/execution-processor/pkg/processor/transaction/structlog" s "github.com/ethpandaops/execution-processor/pkg/state" @@ -20,13 +43,33 @@ import ( "github.com/sirupsen/logrus" ) +// retryDelayFunc returns an exponential backoff delay capped at 10 minutes. +// Delays: 5s, 10s, 20s, 40s, 80s, 160s, 320s, 600s (max). +func retryDelayFunc(n int, _ error, _ *asynq.Task) time.Duration { + delay := time.Duration(5< maxDelay { + return maxDelay + } + + return delay +} + // Manager coordinates multiple processors with distributed task processing. +// +// It manages the complete lifecycle: +// - Processor initialization and startup +// - Block discovery loop (when leader) +// - Asynq server for task processing (always) +// - Queue monitoring and backpressure +// - Graceful shutdown with goroutine cleanup type Manager struct { log logrus.FieldLogger config *Config pool *ethereum.Pool state *s.Manager - processors map[string]c.BlockProcessor + processors map[string]tracker.BlockProcessor // Redis/Asynq for distributed processing redisClient *r.Client @@ -80,10 +123,11 @@ func NewManager(log logrus.FieldLogger, config *Config, pool *ethereum.Pool, sta queues := make(map[string]int) asynqServer = asynq.NewServer(asynqRedisOpt, asynq.Config{ - Concurrency: config.Concurrency, - Queues: queues, - LogLevel: asynq.InfoLevel, - Logger: log, + Concurrency: config.Concurrency, + Queues: queues, + LogLevel: asynq.InfoLevel, + Logger: log, + RetryDelayFunc: retryDelayFunc, }) return &Manager{ @@ -91,7 +135,7 @@ func NewManager(log logrus.FieldLogger, config *Config, pool *ethereum.Pool, sta config: config, pool: pool, state: state, - processors: make(map[string]c.BlockProcessor), + processors: make(map[string]tracker.BlockProcessor), redisClient: redis, redisPrefix: redisPrefix, asynqClient: asynqClient, @@ -171,14 +215,26 @@ func (m *Manager) Start(ctx context.Context) error { } // Monitor leadership changes - go m.monitorLeadership(ctx) + m.wg.Add(1) + + go func() { + defer m.wg.Done() + + m.monitorLeadership(ctx) + }() m.log.Debug("Leader election started, monitoring for leadership changes") } else { // If leader election is disabled, always act as leader m.isLeader = true - go m.runBlockProcessing(ctx) + m.wg.Add(1) + + go func() { + defer m.wg.Done() + + m.runBlockProcessing(ctx) + }() m.log.Info("Leader election disabled - running as standalone processor") } @@ -302,6 +358,10 @@ func (m *Manager) Stop(ctx context.Context) error { } } + // Wait for all goroutines to complete + m.wg.Wait() + m.log.Info("All goroutines stopped") + return nil } @@ -317,6 +377,7 @@ func (m *Manager) initializeProcessors(ctx context.Context) error { Pool: m.pool, State: m.state, AsynqClient: m.asynqClient, + RedisClient: m.redisClient, Network: m.network, RedisPrefix: m.redisPrefix, }, &m.config.TransactionStructlog) @@ -347,6 +408,7 @@ func (m *Manager) initializeProcessors(ctx context.Context) error { Pool: m.pool, State: m.state, AsynqClient: m.asynqClient, + RedisClient: m.redisClient, Network: m.network, RedisPrefix: m.redisPrefix, }, &m.config.TransactionSimple) @@ -375,7 +437,7 @@ func (m *Manager) initializeProcessors(ctx context.Context) error { // startProcessorWithRetry starts a processor with infinite retry and capped exponential backoff. // This ensures processors can wait for their dependencies (like ClickHouse) to become available. -func (m *Manager) startProcessorWithRetry(ctx context.Context, processor c.BlockProcessor, name string) error { +func (m *Manager) startProcessorWithRetry(ctx context.Context, processor tracker.BlockProcessor, name string) error { const ( baseDelay = 100 * time.Millisecond maxDelay = 10 * time.Second @@ -564,7 +626,11 @@ func (m *Manager) handleLeadershipGain(ctx context.Context) { // Start block processing loop m.log.Debug("Starting runBlockProcessing goroutine") + m.wg.Add(1) + go func() { + defer m.wg.Done() + m.log.Debug("runBlockProcessing goroutine started") m.runBlockProcessing(ctx) }() @@ -718,9 +784,9 @@ func (m *Manager) monitorQueues(ctx context.Context) { shouldMonitor := false switch m.config.Mode { - case c.FORWARDS_MODE: + case tracker.FORWARDS_MODE: shouldMonitor = strings.Contains(queue.Name, "forwards") - case c.BACKWARDS_MODE: + case tracker.BACKWARDS_MODE: shouldMonitor = strings.Contains(queue.Name, "backwards") } @@ -795,7 +861,10 @@ func (m *Manager) startQueueMonitoring(ctx context.Context) { m.monitorCancel = cancel m.isMonitoring = true + m.wg.Add(1) + go func() { + defer m.wg.Done() defer func() { // Recovery from panics to prevent goroutine leaks if recovered := recover(); recovered != nil { @@ -837,9 +906,9 @@ func (m *Manager) updateAsynqQueues() error { shouldInclude := false switch m.config.Mode { - case c.FORWARDS_MODE: + case tracker.FORWARDS_MODE: shouldInclude = strings.Contains(queue.Name, "forwards") - case c.BACKWARDS_MODE: + case tracker.BACKWARDS_MODE: shouldInclude = strings.Contains(queue.Name, "backwards") } @@ -874,10 +943,11 @@ func (m *Manager) updateAsynqQueues() error { } m.asynqServer = asynq.NewServer(asynqRedisOpt, asynq.Config{ - Concurrency: m.config.Concurrency, - Queues: queues, - LogLevel: asynq.InfoLevel, - Logger: m.log, + Concurrency: m.config.Concurrency, + Queues: queues, + LogLevel: asynq.InfoLevel, + Logger: m.log, + RetryDelayFunc: retryDelayFunc, }) return nil @@ -898,6 +968,13 @@ func isWaitingForBlockError(err error) bool { // shouldSkipBlockProcessing checks if block processing should be skipped due to queue backpressure. func (m *Manager) shouldSkipBlockProcessing(ctx context.Context) (bool, string) { + // Check for context cancellation early + select { + case <-ctx.Done(): + return true, "context cancelled" + default: + } + // Get Redis options for Asynq Inspector redisOpt := m.redisClient.Options() asynqRedisOpt := asynq.RedisClientOpt{ @@ -918,15 +995,21 @@ func (m *Manager) shouldSkipBlockProcessing(ctx context.Context) (bool, string) shouldSkip := false for name := range m.processors { + // Check for context cancellation between processors + select { + case <-ctx.Done(): + return true, "context cancelled" + default: + } // Check all queues based on mode var queuesToCheck []string - if m.config.Mode == c.FORWARDS_MODE { + if m.config.Mode == tracker.FORWARDS_MODE { queuesToCheck = []string{ - c.PrefixedProcessForwardsQueue(name, m.redisPrefix), + tracker.PrefixedProcessForwardsQueue(name, m.redisPrefix), } } else { queuesToCheck = []string{ - c.PrefixedProcessBackwardsQueue(name, m.redisPrefix), + tracker.PrefixedProcessBackwardsQueue(name, m.redisPrefix), } } @@ -988,11 +1071,11 @@ func (m *Manager) shouldSkipBlockProcessing(ctx context.Context) (bool, string) func (m *Manager) GetQueueName() string { // For now we only have one processor processorName := "transaction-structlog" - if m.config.Mode == c.BACKWARDS_MODE { - return c.PrefixedProcessBackwardsQueue(processorName, m.redisPrefix) + if m.config.Mode == tracker.BACKWARDS_MODE { + return tracker.PrefixedProcessBackwardsQueue(processorName, m.redisPrefix) } - return c.PrefixedProcessForwardsQueue(processorName, m.redisPrefix) + return tracker.PrefixedProcessForwardsQueue(processorName, m.redisPrefix) } // QueueBlockManually allows manual queuing of a specific block for processing. @@ -1067,12 +1150,12 @@ func (m *Manager) enqueueSimpleBlockTask(ctx context.Context, p *transaction_sim var err error - if m.config.Mode == c.BACKWARDS_MODE { + if m.config.Mode == tracker.BACKWARDS_MODE { task, err = transaction_simple.NewProcessBackwardsTask(payload) - queue = c.PrefixedProcessBackwardsQueue(transaction_simple.ProcessorName, m.redisPrefix) + queue = tracker.PrefixedProcessBackwardsQueue(transaction_simple.ProcessorName, m.redisPrefix) } else { task, err = transaction_simple.NewProcessForwardsTask(payload) - queue = c.PrefixedProcessForwardsQueue(transaction_simple.ProcessorName, m.redisPrefix) + queue = tracker.PrefixedProcessForwardsQueue(transaction_simple.ProcessorName, m.redisPrefix) } if err != nil { diff --git a/pkg/processor/tracker/completion.go b/pkg/processor/tracker/completion.go new file mode 100644 index 0000000..598ccbb --- /dev/null +++ b/pkg/processor/tracker/completion.go @@ -0,0 +1,68 @@ +package tracker + +import ( + "context" + "errors" + "strings" + + "github.com/sirupsen/logrus" + + "github.com/ethpandaops/execution-processor/pkg/ethereum" +) + +// IsBlockNotFoundError checks if an error indicates a block was not found. +// Uses errors.Is for sentinel errors, with fallback to string matching for wrapped errors. +func IsBlockNotFoundError(err error) bool { + if err == nil { + return false + } + + // Check for sentinel error first + if errors.Is(err, ethereum.ErrBlockNotFound) { + return true + } + + // Fallback to string matching for errors from external clients + errStr := strings.ToLower(err.Error()) + + return strings.Contains(errStr, "not found") || + strings.Contains(errStr, "unknown block") || + strings.Contains(errStr, "block not found") || + strings.Contains(errStr, "header not found") +} + +// TrackBlockCompletion decrements the pending task count and marks the block complete when all tasks finish. +func (l *Limiter) TrackBlockCompletion(ctx context.Context, blockNumber uint64, mode string) { + remaining, err := l.pendingTracker.DecrementPending(ctx, blockNumber, l.network, l.processor, mode) + if err != nil { + l.log.WithError(err).WithFields(logrus.Fields{ + "block_number": blockNumber, + "mode": mode, + }).Warn("Failed to decrement pending count") + + return + } + + // If all tasks are complete, mark the block as complete + if remaining <= 0 { + if err := l.stateProvider.MarkBlockComplete(ctx, blockNumber, l.network, l.processor); err != nil { + l.log.WithError(err).WithFields(logrus.Fields{ + "block_number": blockNumber, + }).Error("Failed to mark block complete") + + return + } + + // Cleanup Redis tracking key + if err := l.pendingTracker.CleanupBlock(ctx, blockNumber, l.network, l.processor, mode); err != nil { + l.log.WithError(err).WithFields(logrus.Fields{ + "block_number": blockNumber, + }).Warn("Failed to cleanup block tracking") + } + + l.log.WithFields(logrus.Fields{ + "block_number": blockNumber, + "mode": mode, + }).Debug("Block marked complete - all tasks finished") + } +} diff --git a/pkg/processor/tracker/limiter.go b/pkg/processor/tracker/limiter.go new file mode 100644 index 0000000..82c3f3b --- /dev/null +++ b/pkg/processor/tracker/limiter.go @@ -0,0 +1,118 @@ +package tracker + +import ( + "context" + + "github.com/ethpandaops/execution-processor/pkg/common" + "github.com/sirupsen/logrus" +) + +// StateProvider defines the state manager methods needed by Limiter. +type StateProvider interface { + GetOldestIncompleteBlock(ctx context.Context, network, processor string, minBlockNumber uint64) (*uint64, error) + GetNewestIncompleteBlock(ctx context.Context, network, processor string, maxBlockNumber uint64) (*uint64, error) + MarkBlockComplete(ctx context.Context, blockNumber uint64, network, processor string) error +} + +// LimiterConfig holds configuration for the Limiter. +type LimiterConfig struct { + MaxPendingBlockRange int +} + +// LimiterDeps holds dependencies for the Limiter. +type LimiterDeps struct { + Log logrus.FieldLogger + StateProvider StateProvider + PendingTracker *PendingTracker + Network string + Processor string +} + +// Limiter provides shared blocking and completion functionality for processors. +type Limiter struct { + log logrus.FieldLogger + stateProvider StateProvider + pendingTracker *PendingTracker + config LimiterConfig + network string + processor string +} + +// NewLimiter creates a new Limiter. +func NewLimiter(deps *LimiterDeps, config LimiterConfig) *Limiter { + return &Limiter{ + log: deps.Log, + stateProvider: deps.StateProvider, + pendingTracker: deps.PendingTracker, + config: config, + network: deps.Network, + processor: deps.Processor, + } +} + +// IsBlockedByIncompleteBlocks checks if processing should be blocked based on distance +// from the oldest/newest incomplete block (depending on processing mode). +// Returns true if blocked, false if processing can proceed. +func (l *Limiter) IsBlockedByIncompleteBlocks(ctx context.Context, nextBlock uint64, mode string) (bool, error) { + // Safe conversion: MaxPendingBlockRange is validated to be > 0 during config validation + if l.config.MaxPendingBlockRange <= 0 { + return false, nil + } + + maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated above + + if mode == BACKWARDS_MODE { + // Backwards mode: check distance from newest incomplete block + // Search range: nextBlock to nextBlock + maxPendingBlockRange + searchMaxBlock := nextBlock + maxPendingBlockRange + + newestIncomplete, err := l.stateProvider.GetNewestIncompleteBlock( + ctx, l.network, l.processor, searchMaxBlock, + ) + if err != nil { + return false, err + } + + if newestIncomplete != nil && (*newestIncomplete-nextBlock) >= maxPendingBlockRange { + l.log.WithFields(logrus.Fields{ + "next_block": nextBlock, + "newest_incomplete": *newestIncomplete, + "distance": *newestIncomplete - nextBlock, + "max_pending_block_range": maxPendingBlockRange, + }).Debug("Max pending block range reached (backwards), waiting for tasks to complete") + + common.BlockProcessingSkipped.WithLabelValues(l.network, l.processor, "max_pending_block_range").Inc() + + return true, nil + } + } else { + // Forwards mode: check distance from oldest incomplete block + // Search range: nextBlock - maxPendingBlockRange to nextBlock + var searchMinBlock uint64 + if nextBlock > maxPendingBlockRange { + searchMinBlock = nextBlock - maxPendingBlockRange + } + + oldestIncomplete, err := l.stateProvider.GetOldestIncompleteBlock( + ctx, l.network, l.processor, searchMinBlock, + ) + if err != nil { + return false, err + } + + if oldestIncomplete != nil && (nextBlock-*oldestIncomplete) >= maxPendingBlockRange { + l.log.WithFields(logrus.Fields{ + "next_block": nextBlock, + "oldest_incomplete": *oldestIncomplete, + "distance": nextBlock - *oldestIncomplete, + "max_pending_block_range": maxPendingBlockRange, + }).Debug("Max pending block range reached, waiting for tasks to complete") + + common.BlockProcessingSkipped.WithLabelValues(l.network, l.processor, "max_pending_block_range").Inc() + + return true, nil + } + } + + return false, nil +} diff --git a/pkg/processor/tracker/limiter_test.go b/pkg/processor/tracker/limiter_test.go new file mode 100644 index 0000000..299e5b7 --- /dev/null +++ b/pkg/processor/tracker/limiter_test.go @@ -0,0 +1,361 @@ +package tracker + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/ethpandaops/execution-processor/pkg/ethereum" +) + +// uint64Ptr is a helper to create uint64 pointer. +func uint64Ptr(v uint64) *uint64 { + return &v +} + +// TestDistanceBasedBlockingLogic_ForwardsMode tests the distance calculation logic +// for forwards processing mode without requiring full processor instantiation. +func TestDistanceBasedBlockingLogic_ForwardsMode(t *testing.T) { + tests := []struct { + name string + nextBlock uint64 + oldestIncomplete *uint64 + maxPendingBlockRange uint64 + expectBlocked bool + }{ + { + name: "no incomplete blocks - should proceed", + nextBlock: 105, + oldestIncomplete: nil, + maxPendingBlockRange: 2, + expectBlocked: false, + }, + { + name: "distance 1, max 2 - should proceed", + nextBlock: 105, + oldestIncomplete: uint64Ptr(104), + maxPendingBlockRange: 2, + expectBlocked: false, + }, + { + name: "distance equals max - should block", + nextBlock: 105, + oldestIncomplete: uint64Ptr(103), + maxPendingBlockRange: 2, + expectBlocked: true, + }, + { + name: "distance exceeds max - should block", + nextBlock: 105, + oldestIncomplete: uint64Ptr(100), + maxPendingBlockRange: 2, + expectBlocked: true, + }, + { + name: "large maxPendingBlockRange - should proceed", + nextBlock: 105, + oldestIncomplete: uint64Ptr(100), + maxPendingBlockRange: 10, + expectBlocked: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Simulate the forwards mode blocking logic + blocked := tt.oldestIncomplete != nil && (tt.nextBlock-*tt.oldestIncomplete) >= tt.maxPendingBlockRange + + assert.Equal(t, tt.expectBlocked, blocked) + }) + } +} + +// TestDistanceBasedBlockingLogic_BackwardsMode tests the distance calculation logic +// for backwards processing mode without requiring full processor instantiation. +func TestDistanceBasedBlockingLogic_BackwardsMode(t *testing.T) { + tests := []struct { + name string + nextBlock uint64 + newestIncomplete *uint64 + maxPendingBlockRange uint64 + expectBlocked bool + }{ + { + name: "no incomplete blocks - should proceed", + nextBlock: 100, + newestIncomplete: nil, + maxPendingBlockRange: 2, + expectBlocked: false, + }, + { + name: "distance 1, max 2 - should proceed", + nextBlock: 100, + newestIncomplete: uint64Ptr(101), + maxPendingBlockRange: 2, + expectBlocked: false, + }, + { + name: "distance equals max - should block", + nextBlock: 100, + newestIncomplete: uint64Ptr(102), + maxPendingBlockRange: 2, + expectBlocked: true, + }, + { + name: "distance exceeds max - should block", + nextBlock: 100, + newestIncomplete: uint64Ptr(105), + maxPendingBlockRange: 2, + expectBlocked: true, + }, + { + name: "large maxPendingBlockRange - should proceed", + nextBlock: 100, + newestIncomplete: uint64Ptr(105), + maxPendingBlockRange: 10, + expectBlocked: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Simulate the backwards mode blocking logic + blocked := tt.newestIncomplete != nil && (*tt.newestIncomplete-tt.nextBlock) >= tt.maxPendingBlockRange + + assert.Equal(t, tt.expectBlocked, blocked) + }) + } +} + +// TestSearchRangeCalculation tests that the search range is calculated correctly +// to optimize startup performance. +func TestSearchRangeCalculation(t *testing.T) { + tests := []struct { + name string + nextBlock uint64 + maxPendingBlockRange uint64 + mode string + expectSearchMin uint64 + expectSearchMax uint64 + }{ + { + name: "forwards mode - normal case", + nextBlock: 500, + maxPendingBlockRange: 2, + mode: FORWARDS_MODE, + expectSearchMin: 498, // 500 - 2 + expectSearchMax: 0, // not used in forwards + }, + { + name: "forwards mode - near zero", + nextBlock: 1, + maxPendingBlockRange: 2, + mode: FORWARDS_MODE, + expectSearchMin: 0, // max(0, 1-2) = 0 + expectSearchMax: 0, + }, + { + name: "forwards mode - exactly at maxPendingBlockRange", + nextBlock: 2, + maxPendingBlockRange: 2, + mode: FORWARDS_MODE, + expectSearchMin: 0, // 2 - 2 = 0 + expectSearchMax: 0, + }, + { + name: "backwards mode - normal case", + nextBlock: 100, + maxPendingBlockRange: 2, + mode: BACKWARDS_MODE, + expectSearchMin: 0, // not used in backwards + expectSearchMax: 102, // 100 + 2 + }, + { + name: "backwards mode - large maxPendingBlockRange", + nextBlock: 100, + maxPendingBlockRange: 10, + mode: BACKWARDS_MODE, + expectSearchMin: 0, + expectSearchMax: 110, // 100 + 10 + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if tt.mode == FORWARDS_MODE { + // Calculate search min for forwards mode + var searchMin uint64 + if tt.nextBlock > tt.maxPendingBlockRange { + searchMin = tt.nextBlock - tt.maxPendingBlockRange + } + + assert.Equal(t, tt.expectSearchMin, searchMin) + } else { + // Calculate search max for backwards mode + searchMax := tt.nextBlock + tt.maxPendingBlockRange + assert.Equal(t, tt.expectSearchMax, searchMax) + } + }) + } +} + +// TestTwoModesInteraction tests scenarios where forwards and backfill might interact. +// They should be naturally isolated by their search ranges. +func TestTwoModesInteraction(t *testing.T) { + tests := []struct { + name string + forwardNext uint64 + backfillIncomplete uint64 + maxPendingBlockRange uint64 + expectBlocked bool + description string + }{ + { + name: "backfill at 249, forward at 500 - forward proceeds", + forwardNext: 500, + backfillIncomplete: 249, + maxPendingBlockRange: 2, + expectBlocked: false, + description: "249 is outside search range [498, 500]", + }, + { + name: "backfill at 249 incomplete, forward at 251 - blocked", + forwardNext: 251, + backfillIncomplete: 249, + maxPendingBlockRange: 2, + expectBlocked: true, + description: "251 - 249 = 2 >= maxPendingBlockRange", + }, + { + name: "backfill at 249 incomplete, forward at 252 - not blocked", + forwardNext: 252, + backfillIncomplete: 249, + maxPendingBlockRange: 2, + expectBlocked: false, + description: "249 is outside search range [250, 252]", + }, + { + name: "backfill right at search boundary - blocked", + forwardNext: 251, + backfillIncomplete: 249, // searchMin = 251 - 2 = 249 + maxPendingBlockRange: 2, + expectBlocked: true, + description: "249 is exactly at searchMin, distance = 2", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Calculate search range for forwards mode + var searchMin uint64 + if tt.forwardNext > tt.maxPendingBlockRange { + searchMin = tt.forwardNext - tt.maxPendingBlockRange + } + + // Check if the backfill incomplete block would be found by the query + // (simulates the database query with >= minBlockNumber filter) + var foundIncomplete *uint64 + if tt.backfillIncomplete >= searchMin { + foundIncomplete = uint64Ptr(tt.backfillIncomplete) + } + + // Apply the blocking logic + blocked := foundIncomplete != nil && (tt.forwardNext-*foundIncomplete) >= tt.maxPendingBlockRange + + assert.Equal(t, tt.expectBlocked, blocked, tt.description) + }) + } +} + +// TestZeroMaxPendingBlockRange verifies that zero maxPendingBlockRange disables blocking. +func TestZeroMaxPendingBlockRange(t *testing.T) { + // When maxPendingBlockRange is 0, the check should be skipped entirely + maxPendingBlockRange := 0 + + // The implementation checks if maxPendingBlockRange <= 0 and returns false immediately + assert.LessOrEqual(t, maxPendingBlockRange, 0, "maxPendingBlockRange should be <= 0 for this test") + + // With maxPendingBlockRange = 0, we should never block + // This is verified by the early return in IsBlockedByIncompleteBlocks +} + +// TestNegativeMaxPendingBlockRange verifies that negative maxPendingBlockRange is handled. +func TestNegativeMaxPendingBlockRange(t *testing.T) { + // When maxPendingBlockRange is negative, the check should be skipped entirely + maxPendingBlockRange := -1 + + // The implementation checks if maxPendingBlockRange <= 0 and returns false immediately + assert.Less(t, maxPendingBlockRange, 0, "maxPendingBlockRange should be negative for this test") +} + +// TestIsBlockNotFoundError tests the error detection function. +func TestIsBlockNotFoundError(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + { + name: "nil error", + err: nil, + expected: false, + }, + { + name: "sentinel ethereum.ErrBlockNotFound", + err: ethereum.ErrBlockNotFound, + expected: true, + }, + { + name: "wrapped sentinel error", + err: errors.Join(errors.New("context"), ethereum.ErrBlockNotFound), + expected: true, + }, + { + name: "not found error", + err: &testError{msg: "block not found"}, + expected: true, + }, + { + name: "header not found error", + err: &testError{msg: "header not found for block 12345"}, + expected: true, + }, + { + name: "unknown block error", + err: &testError{msg: "unknown block requested"}, + expected: true, + }, + { + name: "generic not found", + err: &testError{msg: "resource not found"}, + expected: true, + }, + { + name: "unrelated error", + err: &testError{msg: "connection refused"}, + expected: false, + }, + { + name: "case insensitive - NOT FOUND", + err: &testError{msg: "BLOCK NOT FOUND"}, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := IsBlockNotFoundError(tt.err) + assert.Equal(t, tt.expected, result) + }) + } +} + +// testError is a simple error implementation for testing. +type testError struct { + msg string +} + +func (e *testError) Error() string { + return e.msg +} diff --git a/pkg/processor/tracker/pending.go b/pkg/processor/tracker/pending.go new file mode 100644 index 0000000..446cfa6 --- /dev/null +++ b/pkg/processor/tracker/pending.go @@ -0,0 +1,122 @@ +package tracker + +import ( + "context" + "fmt" + "strconv" + + "github.com/redis/go-redis/v9" + "github.com/sirupsen/logrus" +) + +// PendingTracker tracks pending tasks per block using Redis. +// Used for two-phase completion tracking to know when all tasks for a block are complete. +type PendingTracker struct { + redis *redis.Client + prefix string + log logrus.FieldLogger +} + +// NewPendingTracker creates a new PendingTracker. +func NewPendingTracker(redisClient *redis.Client, prefix string, log logrus.FieldLogger) *PendingTracker { + return &PendingTracker{ + redis: redisClient, + prefix: prefix, + log: log.WithField("component", "pending_tracker"), + } +} + +// blockKey returns the Redis key for tracking a block's pending task count. +// Key pattern: {prefix}:block:{network}:{processor}:{mode}:{block_number}. +func (t *PendingTracker) blockKey(blockNumber uint64, network, processor, mode string) string { + if t.prefix == "" { + return fmt.Sprintf("block:%s:%s:%s:%d", network, processor, mode, blockNumber) + } + + return fmt.Sprintf("%s:block:%s:%s:%s:%d", t.prefix, network, processor, mode, blockNumber) +} + +// InitBlock initializes tracking for a block with the given task count. +// This should be called after enqueueing all tasks for a block. +func (t *PendingTracker) InitBlock(ctx context.Context, blockNumber uint64, taskCount int, network, processor, mode string) error { + key := t.blockKey(blockNumber, network, processor, mode) + + err := t.redis.Set(ctx, key, taskCount, 0).Err() + if err != nil { + return fmt.Errorf("failed to init block tracking: %w", err) + } + + t.log.WithFields(logrus.Fields{ + "block_number": blockNumber, + "task_count": taskCount, + "network": network, + "processor": processor, + "mode": mode, + "key": key, + }).Debug("Initialized block tracking") + + return nil +} + +// DecrementPending decrements the pending task count for a block. +// Returns the remaining count after decrement. +func (t *PendingTracker) DecrementPending(ctx context.Context, blockNumber uint64, network, processor, mode string) (int64, error) { + key := t.blockKey(blockNumber, network, processor, mode) + + remaining, err := t.redis.DecrBy(ctx, key, 1).Result() + if err != nil { + return 0, fmt.Errorf("failed to decrement pending count: %w", err) + } + + t.log.WithFields(logrus.Fields{ + "block_number": blockNumber, + "remaining": remaining, + "network": network, + "processor": processor, + "mode": mode, + }).Debug("Decremented pending task count") + + return remaining, nil +} + +// GetPendingCount returns the current pending task count for a block. +// Returns 0 if the key doesn't exist (block not being tracked or already cleaned up). +func (t *PendingTracker) GetPendingCount(ctx context.Context, blockNumber uint64, network, processor, mode string) (int64, error) { + key := t.blockKey(blockNumber, network, processor, mode) + + val, err := t.redis.Get(ctx, key).Result() + if err == redis.Nil { + return 0, nil + } + + if err != nil { + return 0, fmt.Errorf("failed to get pending count: %w", err) + } + + count, err := strconv.ParseInt(val, 10, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse pending count: %w", err) + } + + return count, nil +} + +// CleanupBlock removes the tracking key for a block. +// Should be called after a block is marked complete. +func (t *PendingTracker) CleanupBlock(ctx context.Context, blockNumber uint64, network, processor, mode string) error { + key := t.blockKey(blockNumber, network, processor, mode) + + err := t.redis.Del(ctx, key).Err() + if err != nil { + return fmt.Errorf("failed to cleanup block tracking: %w", err) + } + + t.log.WithFields(logrus.Fields{ + "block_number": blockNumber, + "network": network, + "processor": processor, + "mode": mode, + }).Debug("Cleaned up block tracking") + + return nil +} diff --git a/pkg/processor/tracker/pending_test.go b/pkg/processor/tracker/pending_test.go new file mode 100644 index 0000000..4c81399 --- /dev/null +++ b/pkg/processor/tracker/pending_test.go @@ -0,0 +1,228 @@ +package tracker + +import ( + "context" + "testing" + + "github.com/alicebob/miniredis/v2" + "github.com/redis/go-redis/v9" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func setupTestRedis(t *testing.T) (*redis.Client, func()) { + t.Helper() + + mr, err := miniredis.Run() + require.NoError(t, err) + + client := redis.NewClient(&redis.Options{ + Addr: mr.Addr(), + }) + + return client, func() { + client.Close() + mr.Close() + } +} + +func TestPendingTracker_InitBlock(t *testing.T) { + ctx := context.Background() + + redisClient, cleanup := setupTestRedis(t) + defer cleanup() + + log := logrus.New() + tracker := NewPendingTracker(redisClient, "test", log) + + err := tracker.InitBlock(ctx, 100, 5, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + + // Verify the value was set + count, err := tracker.GetPendingCount(ctx, 100, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(5), count) +} + +func TestPendingTracker_DecrementPending(t *testing.T) { + ctx := context.Background() + + redisClient, cleanup := setupTestRedis(t) + defer cleanup() + + log := logrus.New() + tracker := NewPendingTracker(redisClient, "test", log) + + // Initialize with 3 tasks + err := tracker.InitBlock(ctx, 100, 3, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + + // Decrement once + remaining, err := tracker.DecrementPending(ctx, 100, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(2), remaining) + + // Decrement again + remaining, err = tracker.DecrementPending(ctx, 100, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(1), remaining) + + // Decrement to zero + remaining, err = tracker.DecrementPending(ctx, 100, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(0), remaining) +} + +func TestPendingTracker_DecrementToZero(t *testing.T) { + ctx := context.Background() + + redisClient, cleanup := setupTestRedis(t) + defer cleanup() + + log := logrus.New() + tracker := NewPendingTracker(redisClient, "test", log) + + // Initialize with 1 task + err := tracker.InitBlock(ctx, 100, 1, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + + // Decrement to zero + remaining, err := tracker.DecrementPending(ctx, 100, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(0), remaining) +} + +func TestPendingTracker_GetPendingCount(t *testing.T) { + ctx := context.Background() + + redisClient, cleanup := setupTestRedis(t) + defer cleanup() + + log := logrus.New() + tracker := NewPendingTracker(redisClient, "test", log) + + // Key doesn't exist - should return 0 + count, err := tracker.GetPendingCount(ctx, 999, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(0), count) + + // Initialize and check + err = tracker.InitBlock(ctx, 100, 10, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + + count, err = tracker.GetPendingCount(ctx, 100, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(10), count) +} + +func TestPendingTracker_CleanupBlock(t *testing.T) { + ctx := context.Background() + + redisClient, cleanup := setupTestRedis(t) + defer cleanup() + + log := logrus.New() + tracker := NewPendingTracker(redisClient, "test", log) + + // Initialize + err := tracker.InitBlock(ctx, 100, 5, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + + // Verify it exists + count, err := tracker.GetPendingCount(ctx, 100, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(5), count) + + // Cleanup + err = tracker.CleanupBlock(ctx, 100, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + + // Verify it's gone + count, err = tracker.GetPendingCount(ctx, 100, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(0), count) +} + +func TestPendingTracker_KeyPattern(t *testing.T) { + log := logrus.New() + + tests := []struct { + name string + prefix string + blockNumber uint64 + network string + processor string + mode string + expectedKey string + }{ + { + name: "with prefix", + prefix: "myapp", + blockNumber: 12345, + network: "mainnet", + processor: "test_processor", + mode: "forwards", + expectedKey: "myapp:block:mainnet:test_processor:forwards:12345", + }, + { + name: "without prefix", + prefix: "", + blockNumber: 12345, + network: "mainnet", + processor: "test_processor", + mode: "backwards", + expectedKey: "block:mainnet:test_processor:backwards:12345", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tracker := NewPendingTracker(nil, tt.prefix, log) + key := tracker.blockKey(tt.blockNumber, tt.network, tt.processor, tt.mode) + assert.Equal(t, tt.expectedKey, key) + }) + } +} + +func TestPendingTracker_MultipleBlocks(t *testing.T) { + ctx := context.Background() + + redisClient, cleanup := setupTestRedis(t) + defer cleanup() + + log := logrus.New() + tracker := NewPendingTracker(redisClient, "test", log) + + // Initialize multiple blocks + require.NoError(t, tracker.InitBlock(ctx, 100, 5, "mainnet", "test_processor", "forwards")) + require.NoError(t, tracker.InitBlock(ctx, 101, 3, "mainnet", "test_processor", "forwards")) + require.NoError(t, tracker.InitBlock(ctx, 102, 7, "mainnet", "test_processor", "forwards")) + + // Verify all blocks have correct counts + count, err := tracker.GetPendingCount(ctx, 100, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(5), count) + + count, err = tracker.GetPendingCount(ctx, 101, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(3), count) + + count, err = tracker.GetPendingCount(ctx, 102, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(7), count) + + // Decrement one block + remaining, err := tracker.DecrementPending(ctx, 101, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(2), remaining) + + // Other blocks unchanged + count, err = tracker.GetPendingCount(ctx, 100, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(5), count) + + count, err = tracker.GetPendingCount(ctx, 102, "mainnet", "test_processor", "forwards") + require.NoError(t, err) + assert.Equal(t, int64(7), count) +} diff --git a/pkg/processor/tracker/processor.go b/pkg/processor/tracker/processor.go new file mode 100644 index 0000000..6e86737 --- /dev/null +++ b/pkg/processor/tracker/processor.go @@ -0,0 +1,137 @@ +// Package tracker provides block processing coordination and tracking. +// +// # Processing Pipeline +// +// The execution-processor uses a multi-stage pipeline for block processing: +// +// ┌─────────────────┐ +// │ Block Source │ Execution node provides blocks +// └────────┬────────┘ +// │ +// ▼ +// ┌─────────────────┐ +// │ ProcessNextBlock│ Manager calls processor to discover next block +// └────────┬────────┘ +// │ +// ▼ +// ┌─────────────────┐ +// │ Task Creation │ Processor creates tasks for each transaction +// └────────┬────────┘ +// │ +// ▼ +// ┌─────────────────┐ +// │ Redis/Asynq │ Tasks enqueued to distributed queue +// └────────┬────────┘ +// │ +// ▼ +// ┌─────────────────┐ +// │ Worker Handler │ Asynq workers process tasks (may be distributed) +// └────────┬────────┘ +// │ +// ▼ +// ┌─────────────────┐ +// │ ClickHouse │ Data inserted using columnar protocol +// └────────┬────────┘ +// │ +// ▼ +// ┌─────────────────┐ +// │Block Completion │ Pending tracker marks block complete when all tasks finish +// └─────────────────┘ +// +// # Processing Modes +// +// - FORWARDS_MODE: Process blocks from current head forward (real-time) +// - BACKWARDS_MODE: Process blocks from start point backward (backfill) +// +// # Backpressure Control +// +// The pipeline uses MaxPendingBlockRange to control backpressure: +// - Limits concurrent incomplete blocks +// - Prevents memory exhaustion during slow ClickHouse inserts +// - Uses Redis to track pending tasks per block +package tracker + +import ( + "context" + "time" + + "github.com/hibiken/asynq" +) + +// Default configuration values for processors. +// These provide a single source of truth for default configuration. +const ( + // DefaultMaxPendingBlockRange is the maximum distance between the oldest + // incomplete block and the current block before blocking new block processing. + DefaultMaxPendingBlockRange = 2 + + // DefaultChunkSize is the default number of rows per ClickHouse insert batch. + DefaultChunkSize = 10000 + + // DefaultProgressLogThreshold is the default threshold for logging progress + // on large transactions (structlog processor). + DefaultProgressLogThreshold = 100000 + + // DefaultClickHouseTimeout is the default timeout for ClickHouse operations. + DefaultClickHouseTimeout = 30 * time.Second + + // DefaultTraceTimeout is the default timeout for trace fetching operations. + DefaultTraceTimeout = 30 * time.Second +) + +// Processor defines the base interface for all processors. +// Processors are responsible for transforming blockchain data +// into a format suitable for storage and analysis. +type Processor interface { + // Start initializes the processor and its dependencies (e.g., ClickHouse). + Start(ctx context.Context) error + + // Stop gracefully shuts down the processor. + Stop(ctx context.Context) error + + // Name returns the unique identifier for this processor. + Name() string +} + +// BlockProcessor extends Processor with block-level processing capabilities. +// It coordinates the full pipeline from block discovery through task completion. +// +// Pipeline stages managed by BlockProcessor: +// 1. Block Discovery: ProcessNextBlock identifies the next block to process +// 2. Task Creation: Creates distributed tasks for each unit of work +// 3. Queue Management: Manages Asynq queues for forwards/backwards processing +// 4. Task Handling: Worker handlers process individual tasks +// 5. Completion Tracking: Marks blocks complete when all tasks finish +type BlockProcessor interface { + Processor + + // ProcessNextBlock discovers and enqueues tasks for the next block. + // Returns an error if no block is available or processing fails. + ProcessNextBlock(ctx context.Context) error + + // GetQueues returns the Asynq queues used by this processor. + // Queues have priorities to control processing order. + GetQueues() []QueueInfo + + // GetHandlers returns task handlers for Asynq worker registration. + // These handlers process the distributed tasks. + GetHandlers() map[string]asynq.HandlerFunc + + // EnqueueTask adds a task to the distributed queue. + // Uses infinite retries to ensure eventual processing. + EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error + + // SetProcessingMode configures forwards or backwards processing. + SetProcessingMode(mode string) +} + +// QueueInfo contains information about a processor queue. +type QueueInfo struct { + Name string + Priority int +} + +const ( + BACKWARDS_MODE = "backwards" + FORWARDS_MODE = "forwards" +) diff --git a/pkg/processor/common/queues.go b/pkg/processor/tracker/queues.go similarity index 98% rename from pkg/processor/common/queues.go rename to pkg/processor/tracker/queues.go index a12ea74..933bcf6 100644 --- a/pkg/processor/common/queues.go +++ b/pkg/processor/tracker/queues.go @@ -1,4 +1,4 @@ -package common +package tracker import "fmt" diff --git a/pkg/processor/transaction/simple/block_processing.go b/pkg/processor/transaction/simple/block_processing.go index fbce92f..ac6dc35 100644 --- a/pkg/processor/transaction/simple/block_processing.go +++ b/pkg/processor/transaction/simple/block_processing.go @@ -5,13 +5,12 @@ import ( "errors" "fmt" "math/big" - "strings" "github.com/hibiken/asynq" "github.com/sirupsen/logrus" "github.com/ethpandaops/execution-processor/pkg/common" - c "github.com/ethpandaops/execution-processor/pkg/processor/common" + "github.com/ethpandaops/execution-processor/pkg/processor/tracker" "github.com/ethpandaops/execution-processor/pkg/state" ) @@ -50,6 +49,15 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { return nil } + // Distance-based pending block range check + // Only allow processing if distance between oldest incomplete and next block < maxPendingBlockRange + blocked, err := p.IsBlockedByIncompleteBlocks(ctx, nextBlock.Uint64(), p.processingMode) + if err != nil { + p.log.WithError(err).Warn("Failed to check incomplete blocks distance, proceeding anyway") + } else if blocked { + return nil + } + p.log.WithFields(logrus.Fields{ "block_number": nextBlock.String(), "network": p.network.Name, @@ -77,7 +85,7 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { // Get block data block, err := node.BlockByNumber(ctx, nextBlock) if err != nil { - if isBlockNotFoundError(err) { + if tracker.IsBlockNotFoundError(err) { p.log.WithFields(logrus.Fields{ "block_number": nextBlock.String(), "network": p.network.Name, @@ -89,11 +97,11 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { return fmt.Errorf("failed to get block %d: %w", nextBlock.Uint64(), err) } - // Handle empty blocks + // Handle empty blocks - mark complete immediately (no task tracking needed) if len(block.Transactions()) == 0 { - p.log.WithField("block", nextBlock.Uint64()).Debug("Empty block, marking as processed") + p.log.WithField("block", nextBlock.Uint64()).Debug("Empty block, marking as complete") - return p.stateManager.MarkBlockProcessed(ctx, nextBlock.Uint64(), p.network.Name, p.Name()) + return p.stateManager.MarkBlockComplete(ctx, nextBlock.Uint64(), p.network.Name, p.Name()) } // Enqueue block processing task @@ -107,7 +115,7 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { var queue string - if p.processingMode == c.BACKWARDS_MODE { + if p.processingMode == tracker.BACKWARDS_MODE { task, err = NewProcessBackwardsTask(payload) queue = p.getProcessBackwardsQueue() } else { @@ -125,25 +133,30 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { common.TasksEnqueued.WithLabelValues(p.network.Name, ProcessorName, queue, task.Type()).Inc() - p.log.WithFields(logrus.Fields{ - "block_number": nextBlock.Uint64(), - "tx_count": len(block.Transactions()), - }).Info("Enqueued block for processing") + // Initialize block tracking in Redis (1 task per block for simple processor) + if err := p.pendingTracker.InitBlock(ctx, nextBlock.Uint64(), 1, p.network.Name, p.Name(), p.processingMode); err != nil { + p.log.WithError(err).WithFields(logrus.Fields{ + "network": p.network.Name, + "block_number": nextBlock, + }).Error("could not init block tracking in Redis") - // Mark block as processed (task is enqueued) - return p.stateManager.MarkBlockProcessed(ctx, nextBlock.Uint64(), p.network.Name, p.Name()) -} + return err + } -// isBlockNotFoundError checks if an error indicates a block was not found. -func isBlockNotFoundError(err error) bool { - if err == nil { - return false + // Mark block as enqueued (phase 1 of two-phase completion) + if err := p.stateManager.MarkBlockEnqueued(ctx, nextBlock.Uint64(), 1, p.network.Name, p.Name()); err != nil { + p.log.WithError(err).WithFields(logrus.Fields{ + "network": p.network.Name, + "block_number": nextBlock, + }).Error("could not mark block as enqueued") + + return err } - errStr := strings.ToLower(err.Error()) + p.log.WithFields(logrus.Fields{ + "block_number": nextBlock.Uint64(), + "tx_count": len(block.Transactions()), + }).Info("Enqueued block for processing") - return strings.Contains(errStr, "not found") || - strings.Contains(errStr, "unknown block") || - strings.Contains(errStr, "block not found") || - strings.Contains(errStr, "header not found") + return nil } diff --git a/pkg/processor/transaction/simple/config.go b/pkg/processor/transaction/simple/config.go index cdba978..6896310 100644 --- a/pkg/processor/transaction/simple/config.go +++ b/pkg/processor/transaction/simple/config.go @@ -11,6 +11,9 @@ type Config struct { clickhouse.Config `yaml:",inline"` Enabled bool `yaml:"enabled"` Table string `yaml:"table"` + + // Block completion tracking + MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2 } // Validate validates the configuration. diff --git a/pkg/processor/transaction/simple/handlers.go b/pkg/processor/transaction/simple/handlers.go index 18d4ef3..f33eef6 100644 --- a/pkg/processor/transaction/simple/handlers.go +++ b/pkg/processor/transaction/simple/handlers.go @@ -12,7 +12,7 @@ import ( "github.com/sirupsen/logrus" "github.com/ethpandaops/execution-processor/pkg/common" - c "github.com/ethpandaops/execution-processor/pkg/processor/common" + "github.com/ethpandaops/execution-processor/pkg/processor/tracker" ) // ClickHouseDateTime is a time.Time wrapper that formats correctly for ClickHouse JSON. @@ -71,7 +71,7 @@ func (p *Processor) handleProcessTask(ctx context.Context, task *asynq.Task) err common.TaskProcessingDuration.WithLabelValues( p.network.Name, ProcessorName, - c.ProcessForwardsQueue(ProcessorName), + tracker.ProcessForwardsQueue(ProcessorName), task.Type(), ).Observe(duration.Seconds()) }() @@ -81,7 +81,7 @@ func (p *Processor) handleProcessTask(ctx context.Context, task *asynq.Task) err common.TasksErrored.WithLabelValues( p.network.Name, ProcessorName, - c.ProcessForwardsQueue(ProcessorName), + tracker.ProcessForwardsQueue(ProcessorName), task.Type(), "unmarshal_error", ).Inc() @@ -149,7 +149,7 @@ func (p *Processor) handleProcessTask(ctx context.Context, task *asynq.Task) err common.TasksErrored.WithLabelValues( p.network.Name, ProcessorName, - c.ProcessForwardsQueue(ProcessorName), + tracker.ProcessForwardsQueue(ProcessorName), task.Type(), "insert_error", ).Inc() @@ -161,11 +161,14 @@ func (p *Processor) handleProcessTask(ctx context.Context, task *asynq.Task) err common.TasksProcessed.WithLabelValues( p.network.Name, ProcessorName, - c.ProcessForwardsQueue(ProcessorName), + tracker.ProcessForwardsQueue(ProcessorName), task.Type(), "success", ).Inc() + // Track block completion using embedded Blocker + p.TrackBlockCompletion(ctx, blockNumber.Uint64(), payload.ProcessingMode) + p.log.WithFields(logrus.Fields{ "block_number": blockNumber.Uint64(), "tx_count": len(transactions), @@ -336,6 +339,10 @@ func (p *Processor) insertTransactions(ctx context.Context, transactions []Trans return nil } + // Add timeout for ClickHouse operation + insertCtx, cancel := context.WithTimeout(ctx, tracker.DefaultClickHouseTimeout) + defer cancel() + cols := NewColumns() for _, tx := range transactions { cols.Append(tx) @@ -343,7 +350,7 @@ func (p *Processor) insertTransactions(ctx context.Context, transactions []Trans input := cols.Input() - if err := p.clickhouse.Do(ctx, ch.Query{ + if err := p.clickhouse.Do(insertCtx, ch.Query{ Body: input.Into(p.config.Table), Input: input, }); err != nil { diff --git a/pkg/processor/transaction/simple/handlers_test.go b/pkg/processor/transaction/simple/handlers_test.go new file mode 100644 index 0000000..cb0b3fb --- /dev/null +++ b/pkg/processor/transaction/simple/handlers_test.go @@ -0,0 +1,278 @@ +package simple_test + +import ( + "math/big" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + transaction_simple "github.com/ethpandaops/execution-processor/pkg/processor/transaction/simple" +) + +func TestClickHouseDateTime_MarshalJSON(t *testing.T) { + tests := []struct { + name string + time time.Time + expected string + }{ + { + name: "UTC time", + time: time.Date(2024, 1, 15, 10, 30, 45, 0, time.UTC), + expected: `"2024-01-15 10:30:45"`, + }, + { + name: "epoch time", + time: time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC), + expected: `"1970-01-01 00:00:00"`, + }, + { + name: "local time converted to UTC", + time: time.Date(2024, 6, 15, 12, 0, 0, 0, time.FixedZone("PST", -8*3600)), + expected: `"2024-06-15 20:00:00"`, // +8 hours to UTC + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + dt := transaction_simple.ClickHouseDateTime(tc.time) + result, err := dt.MarshalJSON() + + assert.NoError(t, err) + assert.Equal(t, tc.expected, string(result)) + }) + } +} + +func TestCalculateEffectiveGasPrice_Logic(t *testing.T) { + // Test the effective gas price calculation logic + // This tests the algorithm without requiring actual block/tx objects + tests := []struct { + name string + txType uint8 + txGasPrice *big.Int + txGasTipCap *big.Int + txGasFeeCap *big.Int + blockBaseFee *big.Int + expectedGasPrice *big.Int + }{ + { + name: "legacy transaction", + txType: 0, // LegacyTxType + txGasPrice: big.NewInt(20000000000), + expectedGasPrice: big.NewInt(20000000000), + }, + { + name: "access list transaction", + txType: 1, // AccessListTxType + txGasPrice: big.NewInt(30000000000), + expectedGasPrice: big.NewInt(30000000000), + }, + { + name: "EIP-1559 transaction - base fee + tip < max fee", + txType: 2, // DynamicFeeTxType + txGasTipCap: big.NewInt(2000000000), + txGasFeeCap: big.NewInt(100000000000), + blockBaseFee: big.NewInt(10000000000), + expectedGasPrice: big.NewInt(12000000000), // baseFee + tipCap + }, + { + name: "EIP-1559 transaction - base fee + tip > max fee (capped)", + txType: 2, + txGasTipCap: big.NewInt(50000000000), + txGasFeeCap: big.NewInt(30000000000), + blockBaseFee: big.NewInt(10000000000), + expectedGasPrice: big.NewInt(30000000000), // capped at feeCap + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + var effectiveGasPrice *big.Int + + // Simulate the calculateEffectiveGasPrice logic + switch tc.txType { + case 0, 1: // Legacy and AccessList + effectiveGasPrice = tc.txGasPrice + default: // EIP-1559 and later + if tc.blockBaseFee == nil { + effectiveGasPrice = tc.txGasPrice + } else { + effectiveGasPrice = new(big.Int).Add(tc.blockBaseFee, tc.txGasTipCap) + if effectiveGasPrice.Cmp(tc.txGasFeeCap) > 0 { + effectiveGasPrice = tc.txGasFeeCap + } + } + } + + assert.Equal(t, tc.expectedGasPrice.Int64(), effectiveGasPrice.Int64()) + }) + } +} + +func TestInputByteStats_Logic(t *testing.T) { + // Test the input byte statistics calculation logic + tests := []struct { + name string + inputData []byte + expectedSize uint32 + expectedZero uint32 + expectedNonzero uint32 + }{ + { + name: "empty input", + inputData: []byte{}, + expectedSize: 0, + expectedZero: 0, + expectedNonzero: 0, + }, + { + name: "all zeros", + inputData: []byte{0, 0, 0, 0}, + expectedSize: 4, + expectedZero: 4, + expectedNonzero: 0, + }, + { + name: "all nonzero", + inputData: []byte{1, 2, 3, 4}, + expectedSize: 4, + expectedZero: 0, + expectedNonzero: 4, + }, + { + name: "mixed", + inputData: []byte{0, 1, 0, 2, 0, 0, 3}, + expectedSize: 7, + expectedZero: 4, + expectedNonzero: 3, + }, + { + name: "typical function selector", + inputData: []byte{0xa9, 0x05, 0x9c, 0xbb, 0, 0, 0, 0}, + expectedSize: 8, + expectedZero: 4, + expectedNonzero: 4, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + callDataSize := uint32(len(tc.inputData)) + + var nZero, nNonzero uint32 + + for _, b := range tc.inputData { + if b == 0 { + nZero++ + } else { + nNonzero++ + } + } + + assert.Equal(t, tc.expectedSize, callDataSize) + assert.Equal(t, tc.expectedZero, nZero) + assert.Equal(t, tc.expectedNonzero, nNonzero) + assert.Equal(t, callDataSize, nZero+nNonzero) + }) + } +} + +func TestTransactionStruct(t *testing.T) { + // Test that Transaction struct fields have correct types and can be populated + now := time.Now() + toAddr := "0x1234567890abcdef1234567890abcdef12345678" + tipCap := "1000000000" + feeCap := "2000000000" + blobGas := uint64(131072) + blobGasFeeCap := "1000000" + + tx := transaction_simple.Transaction{ + UpdatedDateTime: transaction_simple.ClickHouseDateTime(now), + BlockNumber: 12345, + BlockHash: "0xabcdef123456", + ParentHash: "0xfedcba654321", + Position: 0, + Hash: "0xtxhash123", + From: "0xfrom1234", + To: &toAddr, + Nonce: 42, + GasPrice: "1500000000", + Gas: 21000, + GasTipCap: &tipCap, + GasFeeCap: &feeCap, + Value: "1000000000000000000", + Type: 2, + Size: 250, + CallDataSize: 100, + BlobGas: &blobGas, + BlobGasFeeCap: &blobGasFeeCap, + BlobHashes: []string{"0xblob1", "0xblob2"}, + Success: true, + NInputBytes: 100, + NInputZeroBytes: 40, + NInputNonzeroBytes: 60, + MetaNetworkName: "mainnet", + } + + assert.Equal(t, uint64(12345), tx.BlockNumber) + assert.NotNil(t, tx.To) + assert.Equal(t, toAddr, *tx.To) + assert.Equal(t, uint8(2), tx.Type) + assert.Len(t, tx.BlobHashes, 2) + assert.True(t, tx.Success) + assert.Equal(t, tx.NInputBytes, tx.NInputZeroBytes+tx.NInputNonzeroBytes) +} + +func TestTransactionStruct_ContractCreation(t *testing.T) { + // Test transaction with nil To address (contract creation) + tx := transaction_simple.Transaction{ + UpdatedDateTime: transaction_simple.ClickHouseDateTime(time.Now()), + BlockNumber: 12345, + BlockHash: "0xabcdef123456", + ParentHash: "0xfedcba654321", + Position: 0, + Hash: "0xtxhash123", + From: "0xfrom1234", + To: nil, // Contract creation + Nonce: 0, + GasPrice: "1000000000", + Gas: 100000, + Value: "0", + Type: 0, + Size: 500, + CallDataSize: 400, + BlobHashes: []string{}, + Success: true, + NInputBytes: 400, + NInputZeroBytes: 200, + NInputNonzeroBytes: 200, + MetaNetworkName: "mainnet", + } + + assert.Nil(t, tx.To) + assert.Len(t, tx.BlobHashes, 0) +} + +func TestBlobTransactionFields(t *testing.T) { + // Test blob transaction specific fields + blobGas := uint64(131072) + blobGasFeeCap := "1000000000" + + tx := transaction_simple.Transaction{ + UpdatedDateTime: transaction_simple.ClickHouseDateTime(time.Now()), + BlockNumber: 12345, + Type: 3, // BlobTxType + BlobGas: &blobGas, + BlobGasFeeCap: &blobGasFeeCap, + BlobHashes: []string{"0xblob1", "0xblob2", "0xblob3", "0xblob4", "0xblob5", "0xblob6"}, + Success: true, + MetaNetworkName: "mainnet", + } + + assert.Equal(t, uint8(3), tx.Type) + assert.NotNil(t, tx.BlobGas) + assert.Equal(t, uint64(131072), *tx.BlobGas) + assert.NotNil(t, tx.BlobGasFeeCap) + assert.Len(t, tx.BlobHashes, 6) +} diff --git a/pkg/processor/transaction/simple/processor.go b/pkg/processor/transaction/simple/processor.go index aa6d9bd..ebd8de5 100644 --- a/pkg/processor/transaction/simple/processor.go +++ b/pkg/processor/transaction/simple/processor.go @@ -3,19 +3,24 @@ package simple import ( "context" "fmt" + "math" "github.com/hibiken/asynq" + "github.com/redis/go-redis/v9" "github.com/sirupsen/logrus" "github.com/ethpandaops/execution-processor/pkg/clickhouse" "github.com/ethpandaops/execution-processor/pkg/ethereum" - c "github.com/ethpandaops/execution-processor/pkg/processor/common" + "github.com/ethpandaops/execution-processor/pkg/processor/tracker" "github.com/ethpandaops/execution-processor/pkg/state" ) // ProcessorName is the name of the simple transaction processor. const ProcessorName = "transaction_simple" +// Compile-time interface compliance check. +var _ tracker.BlockProcessor = (*Processor)(nil) + // Dependencies contains the dependencies needed for the processor. type Dependencies struct { Log logrus.FieldLogger @@ -23,6 +28,7 @@ type Dependencies struct { Network *ethereum.Network State *state.Manager AsynqClient *asynq.Client + RedisClient *redis.Client RedisPrefix string } @@ -37,6 +43,10 @@ type Processor struct { asynqClient *asynq.Client processingMode string redisPrefix string + pendingTracker *tracker.PendingTracker + + // Embedded limiter for shared blocking/completion logic + *tracker.Limiter } // New creates a new simple transaction processor. @@ -55,16 +65,40 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { return nil, fmt.Errorf("failed to create clickhouse client: %w", err) } + // Set default for MaxPendingBlockRange + if config.MaxPendingBlockRange <= 0 { + config.MaxPendingBlockRange = tracker.DefaultMaxPendingBlockRange + } + + log := deps.Log.WithField("processor", ProcessorName) + pendingTracker := tracker.NewPendingTracker(deps.RedisClient, deps.RedisPrefix, log) + + // Create the limiter for shared functionality + limiter := tracker.NewLimiter( + &tracker.LimiterDeps{ + Log: log, + StateProvider: deps.State, + PendingTracker: pendingTracker, + Network: deps.Network.Name, + Processor: ProcessorName, + }, + tracker.LimiterConfig{ + MaxPendingBlockRange: config.MaxPendingBlockRange, + }, + ) + return &Processor{ - log: deps.Log.WithField("processor", ProcessorName), + log: log, pool: deps.Pool, stateManager: deps.State, clickhouse: clickhouseClient, config: config, network: deps.Network, asynqClient: deps.AsynqClient, - processingMode: c.FORWARDS_MODE, // Default mode + processingMode: tracker.FORWARDS_MODE, // Default mode redisPrefix: deps.RedisPrefix, + pendingTracker: pendingTracker, + Limiter: limiter, }, nil } @@ -101,22 +135,24 @@ func (p *Processor) SetProcessingMode(mode string) { p.log.WithField("mode", mode).Info("Processing mode updated") } -// EnqueueTask enqueues a task to the specified queue. +// EnqueueTask enqueues a task to the specified queue with infinite retries. func (p *Processor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error { + opts = append(opts, asynq.MaxRetry(math.MaxInt32)) + _, err := p.asynqClient.EnqueueContext(ctx, task, opts...) return err } // GetQueues returns the queues used by this processor. -func (p *Processor) GetQueues() []c.QueueInfo { - return []c.QueueInfo{ +func (p *Processor) GetQueues() []tracker.QueueInfo { + return []tracker.QueueInfo{ { - Name: c.PrefixedProcessForwardsQueue(ProcessorName, p.redisPrefix), + Name: tracker.PrefixedProcessForwardsQueue(ProcessorName, p.redisPrefix), Priority: 10, }, { - Name: c.PrefixedProcessBackwardsQueue(ProcessorName, p.redisPrefix), + Name: tracker.PrefixedProcessBackwardsQueue(ProcessorName, p.redisPrefix), Priority: 5, }, } @@ -124,10 +160,10 @@ func (p *Processor) GetQueues() []c.QueueInfo { // getProcessForwardsQueue returns the prefixed process forwards queue name. func (p *Processor) getProcessForwardsQueue() string { - return c.PrefixedProcessForwardsQueue(ProcessorName, p.redisPrefix) + return tracker.PrefixedProcessForwardsQueue(ProcessorName, p.redisPrefix) } // getProcessBackwardsQueue returns the prefixed process backwards queue name. func (p *Processor) getProcessBackwardsQueue() string { - return c.PrefixedProcessBackwardsQueue(ProcessorName, p.redisPrefix) + return tracker.PrefixedProcessBackwardsQueue(ProcessorName, p.redisPrefix) } diff --git a/pkg/processor/transaction/simple/processor_test.go b/pkg/processor/transaction/simple/processor_test.go new file mode 100644 index 0000000..a454c22 --- /dev/null +++ b/pkg/processor/transaction/simple/processor_test.go @@ -0,0 +1,254 @@ +package simple_test + +import ( + "encoding/json" + "math/big" + "testing" + + "github.com/hibiken/asynq" + "github.com/stretchr/testify/assert" + + "github.com/ethpandaops/execution-processor/pkg/clickhouse" + "github.com/ethpandaops/execution-processor/pkg/processor/tracker" + transaction_simple "github.com/ethpandaops/execution-processor/pkg/processor/transaction/simple" +) + +func TestProcessor_ConfigValidation(t *testing.T) { + testCases := []struct { + name string + config transaction_simple.Config + expectError bool + }{ + { + name: "valid config", + config: transaction_simple.Config{ + Enabled: true, + Table: "test_table", + Config: clickhouse.Config{ + Addr: "localhost:9000", + }, + }, + expectError: false, + }, + { + name: "disabled config", + config: transaction_simple.Config{ + Enabled: false, + }, + expectError: false, + }, + { + name: "missing addr", + config: transaction_simple.Config{ + Enabled: true, + Table: "test_table", + }, + expectError: true, + }, + { + name: "missing table", + config: transaction_simple.Config{ + Enabled: true, + Config: clickhouse.Config{ + Addr: "localhost:9000", + }, + }, + expectError: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.config.Validate() + if tc.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestProcessor_ConcurrentConfigValidation(t *testing.T) { + const numGoroutines = 10 + + results := make(chan error, numGoroutines) + + for i := 0; i < numGoroutines; i++ { + go func() { + cfg := transaction_simple.Config{ + Enabled: true, + Table: "test_concurrent", + Config: clickhouse.Config{ + Addr: "localhost:9000", + }, + } + results <- cfg.Validate() + }() + } + + for i := 0; i < numGoroutines; i++ { + err := <-results + assert.NoError(t, err) + } +} + +func TestProcessPayload(t *testing.T) { + payload := &transaction_simple.ProcessPayload{ + BlockNumber: *big.NewInt(12345), + NetworkName: "mainnet", + ProcessingMode: tracker.FORWARDS_MODE, + } + + // Test JSON marshaling + data, err := json.Marshal(payload) + assert.NoError(t, err) + + var unmarshaled transaction_simple.ProcessPayload + + err = json.Unmarshal(data, &unmarshaled) + assert.NoError(t, err) + + assert.Equal(t, payload.NetworkName, unmarshaled.NetworkName) + assert.Equal(t, payload.ProcessingMode, unmarshaled.ProcessingMode) + assert.Equal(t, int64(12345), unmarshaled.BlockNumber.Int64()) + + // Test binary marshaling + binData, err := payload.MarshalBinary() + assert.NoError(t, err) + + var binUnmarshaled transaction_simple.ProcessPayload + + err = binUnmarshaled.UnmarshalBinary(binData) + assert.NoError(t, err) + + assert.Equal(t, payload.NetworkName, binUnmarshaled.NetworkName) + assert.Equal(t, int64(12345), binUnmarshaled.BlockNumber.Int64()) +} + +func TestNewProcessForwardsTask(t *testing.T) { + payload := &transaction_simple.ProcessPayload{ + BlockNumber: *big.NewInt(12345), + NetworkName: "mainnet", + } + + task, err := transaction_simple.NewProcessForwardsTask(payload) + assert.NoError(t, err) + assert.NotNil(t, task) + assert.Equal(t, transaction_simple.ProcessForwardsTaskType, task.Type()) + + // Verify payload can be unmarshaled from task + var unmarshaled transaction_simple.ProcessPayload + + err = json.Unmarshal(task.Payload(), &unmarshaled) + assert.NoError(t, err) + + assert.Equal(t, int64(12345), unmarshaled.BlockNumber.Int64()) + assert.Equal(t, tracker.FORWARDS_MODE, unmarshaled.ProcessingMode) +} + +func TestNewProcessBackwardsTask(t *testing.T) { + payload := &transaction_simple.ProcessPayload{ + BlockNumber: *big.NewInt(12345), + NetworkName: "mainnet", + } + + task, err := transaction_simple.NewProcessBackwardsTask(payload) + assert.NoError(t, err) + assert.NotNil(t, task) + assert.Equal(t, transaction_simple.ProcessBackwardsTaskType, task.Type()) + + // Verify payload can be unmarshaled from task + var unmarshaled transaction_simple.ProcessPayload + + err = json.Unmarshal(task.Payload(), &unmarshaled) + assert.NoError(t, err) + + assert.Equal(t, int64(12345), unmarshaled.BlockNumber.Int64()) + assert.Equal(t, tracker.BACKWARDS_MODE, unmarshaled.ProcessingMode) +} + +func TestTaskTypes(t *testing.T) { + assert.NotEmpty(t, transaction_simple.ProcessForwardsTaskType) + assert.NotEmpty(t, transaction_simple.ProcessBackwardsTaskType) + assert.NotEmpty(t, transaction_simple.ProcessorName) + + // Verify unique task types + assert.NotEqual(t, transaction_simple.ProcessForwardsTaskType, transaction_simple.ProcessBackwardsTaskType) +} + +func TestAsynqTaskCreation(t *testing.T) { + payload := map[string]interface{}{ + "block_number": "12345", + "network_name": "mainnet", + "processing_mode": tracker.FORWARDS_MODE, + } + + data, err := json.Marshal(payload) + assert.NoError(t, err) + + task := asynq.NewTask(transaction_simple.ProcessForwardsTaskType, data) + assert.NotNil(t, task) + assert.Equal(t, transaction_simple.ProcessForwardsTaskType, task.Type()) +} + +func TestProcessPayloadEdgeCases(t *testing.T) { + tests := []struct { + name string + payload transaction_simple.ProcessPayload + }{ + { + name: "zero block number", + payload: transaction_simple.ProcessPayload{ + BlockNumber: *big.NewInt(0), + NetworkName: "mainnet", + }, + }, + { + name: "large block number", + payload: transaction_simple.ProcessPayload{ + BlockNumber: *big.NewInt(9223372036854775807), // max int64 + NetworkName: "mainnet", + }, + }, + { + name: "empty network name", + payload: transaction_simple.ProcessPayload{ + BlockNumber: *big.NewInt(12345), + NetworkName: "", + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + // Test marshaling roundtrip + data, err := tc.payload.MarshalBinary() + assert.NoError(t, err) + + var unmarshaled transaction_simple.ProcessPayload + + err = unmarshaled.UnmarshalBinary(data) + assert.NoError(t, err) + + assert.Equal(t, tc.payload.BlockNumber.Int64(), unmarshaled.BlockNumber.Int64()) + assert.Equal(t, tc.payload.NetworkName, unmarshaled.NetworkName) + }) + } +} + +func TestConfigDefaults(t *testing.T) { + cfg := transaction_simple.Config{ + Enabled: true, + Table: "test_table", + Config: clickhouse.Config{ + Addr: "localhost:9000", + }, + } + + err := cfg.Validate() + assert.NoError(t, err) + + // MaxPendingBlockRange can be 0 as the processor will set the default + assert.GreaterOrEqual(t, cfg.MaxPendingBlockRange, 0) +} diff --git a/pkg/processor/transaction/simple/tasks.go b/pkg/processor/transaction/simple/tasks.go index 97c2cfd..12eac0d 100644 --- a/pkg/processor/transaction/simple/tasks.go +++ b/pkg/processor/transaction/simple/tasks.go @@ -6,7 +6,7 @@ import ( "github.com/hibiken/asynq" - c "github.com/ethpandaops/execution-processor/pkg/processor/common" + "github.com/ethpandaops/execution-processor/pkg/processor/tracker" ) const ( @@ -37,7 +37,7 @@ func (p *ProcessPayload) UnmarshalBinary(data []byte) error { // NewProcessForwardsTask creates a new forwards process task. func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error) { - payload.ProcessingMode = c.FORWARDS_MODE + payload.ProcessingMode = tracker.FORWARDS_MODE data, err := payload.MarshalBinary() if err != nil { @@ -49,7 +49,7 @@ func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error) { // NewProcessBackwardsTask creates a new backwards process task. func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error) { - payload.ProcessingMode = c.BACKWARDS_MODE + payload.ProcessingMode = tracker.BACKWARDS_MODE data, err := payload.MarshalBinary() if err != nil { diff --git a/pkg/processor/transaction/structlog/block_processing.go b/pkg/processor/transaction/structlog/block_processing.go index e04e1ef..55141a7 100644 --- a/pkg/processor/transaction/structlog/block_processing.go +++ b/pkg/processor/transaction/structlog/block_processing.go @@ -5,14 +5,13 @@ import ( "errors" "fmt" "math/big" - "strings" "github.com/ethereum/go-ethereum/core/types" "github.com/hibiken/asynq" "github.com/sirupsen/logrus" "github.com/ethpandaops/execution-processor/pkg/common" - c "github.com/ethpandaops/execution-processor/pkg/processor/common" + "github.com/ethpandaops/execution-processor/pkg/processor/tracker" "github.com/ethpandaops/execution-processor/pkg/state" ) @@ -55,6 +54,15 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { return nil } + // Distance-based pending block range check + // Only allow processing if distance between oldest incomplete and next block < maxPendingBlockRange + blocked, err := p.IsBlockedByIncompleteBlocks(ctx, nextBlock.Uint64(), p.processingMode) + if err != nil { + p.log.WithError(err).Warn("Failed to check incomplete blocks distance, proceeding anyway") + } else if blocked { + return nil + } + p.log.WithFields(logrus.Fields{ "block_number": nextBlock.String(), "network": p.network.Name, @@ -87,7 +95,7 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { block, err := node.BlockByNumber(ctx, nextBlock) if err != nil { // Check if this is a "not found" error indicating we're at head - if isBlockNotFoundError(err) { + if tracker.IsBlockNotFoundError(err) { // Check if we're close to chain tip to determine if this is expected if latestBlock, latestErr := node.BlockNumber(ctx); latestErr == nil && latestBlock != nil { chainTip := new(big.Int).SetUint64(*latestBlock) @@ -115,37 +123,48 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { return err } - // Handle empty blocks efficiently + // Handle empty blocks - mark complete immediately (no task tracking needed) if len(block.Transactions()) == 0 { p.log.WithFields(logrus.Fields{ "network": p.network.Name, "block_number": nextBlock, }).Debug("skipping empty block") - // Mark the block as processed - if err := p.stateManager.MarkBlockProcessed(ctx, nextBlock.Uint64(), p.network.Name, p.Name()); err != nil { - p.log.WithError(err).WithFields(logrus.Fields{ + // Mark the block as complete immediately (no tasks to track) + if markErr := p.stateManager.MarkBlockComplete(ctx, nextBlock.Uint64(), p.network.Name, p.Name()); markErr != nil { + p.log.WithError(markErr).WithFields(logrus.Fields{ "network": p.network.Name, "block_number": nextBlock, - }).Error("could not mark empty block as processed") + }).Error("could not mark empty block as complete") - return err + return markErr } return nil } // Enqueue tasks for each transaction - if _, err := p.EnqueueTransactionTasks(ctx, block); err != nil { + taskCount, err := p.EnqueueTransactionTasks(ctx, block) + if err != nil { return fmt.Errorf("failed to enqueue transaction tasks: %w", err) } - // Mark the block as processed - if err := p.stateManager.MarkBlockProcessed(ctx, nextBlock.Uint64(), p.network.Name, p.Name()); err != nil { + // Initialize block tracking in Redis + if err := p.pendingTracker.InitBlock(ctx, nextBlock.Uint64(), taskCount, p.network.Name, p.Name(), p.processingMode); err != nil { p.log.WithError(err).WithFields(logrus.Fields{ "network": p.network.Name, "block_number": nextBlock, - }).Error("could not mark block as processed") + }).Error("could not init block tracking in Redis") + + return err + } + + // Mark the block as enqueued (phase 1 of two-phase completion) + if err := p.stateManager.MarkBlockEnqueued(ctx, nextBlock.Uint64(), taskCount, p.network.Name, p.Name()); err != nil { + p.log.WithError(err).WithFields(logrus.Fields{ + "network": p.network.Name, + "block_number": nextBlock, + }).Error("could not mark block as enqueued") return err } @@ -154,25 +173,12 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { "network": p.network.Name, "block_number": nextBlock, "tx_count": len(block.Transactions()), - }).Info("processed block with transactions") + "task_count": taskCount, + }).Info("enqueued block for processing") return nil } -// isBlockNotFoundError checks if an error indicates a block was not found. -func isBlockNotFoundError(err error) bool { - if err == nil { - return false - } - - errStr := strings.ToLower(err.Error()) - - return strings.Contains(errStr, "not found") || - strings.Contains(errStr, "unknown block") || - strings.Contains(errStr, "block not found") || - strings.Contains(errStr, "header not found") -} - // enqueueTransactionTasks enqueues tasks for all transactions in a block. // EnqueueTransactionTasks enqueues transaction processing tasks for a given block. func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block *types.Block) (int, error) { @@ -199,7 +205,7 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block *types.Bl var err error - if p.processingMode == c.BACKWARDS_MODE { + if p.processingMode == tracker.BACKWARDS_MODE { task, err = NewProcessBackwardsTask(payload) queue = p.getProcessBackwardsQueue() taskType = ProcessBackwardsTaskType diff --git a/pkg/processor/transaction/structlog/config.go b/pkg/processor/transaction/structlog/config.go index 57d809f..ed041b4 100644 --- a/pkg/processor/transaction/structlog/config.go +++ b/pkg/processor/transaction/structlog/config.go @@ -15,6 +15,9 @@ type Config struct { // Streaming settings ChunkSize int `yaml:"chunkSize"` // Default: 10,000 rows per OnInput iteration ProgressLogThreshold int `yaml:"progressLogThreshold"` // Default: 100,000 - log progress for large txs + + // Block completion tracking + MaxPendingBlockRange int `yaml:"maxPendingBlockRange"` // Max distance between oldest incomplete and current block. Default: 2 } // Validate validates the configuration. diff --git a/pkg/processor/transaction/structlog/handlers.go b/pkg/processor/transaction/structlog/handlers.go index e4cecbf..c919a97 100644 --- a/pkg/processor/transaction/structlog/handlers.go +++ b/pkg/processor/transaction/structlog/handlers.go @@ -9,7 +9,7 @@ import ( "github.com/sirupsen/logrus" "github.com/ethpandaops/execution-processor/pkg/common" - c "github.com/ethpandaops/execution-processor/pkg/processor/common" + "github.com/ethpandaops/execution-processor/pkg/processor/tracker" ) // handleProcessForwardsTask handles the forwards processing of a single transaction. @@ -18,12 +18,12 @@ func (p *Processor) handleProcessForwardsTask(ctx context.Context, task *asynq.T defer func() { duration := time.Since(start) - common.TaskProcessingDuration.WithLabelValues(p.network.Name, ProcessorName, c.ProcessForwardsQueue(ProcessorName), ProcessForwardsTaskType).Observe(duration.Seconds()) + common.TaskProcessingDuration.WithLabelValues(p.network.Name, ProcessorName, tracker.ProcessForwardsQueue(ProcessorName), ProcessForwardsTaskType).Observe(duration.Seconds()) }() var payload ProcessPayload if err := payload.UnmarshalBinary(task.Payload()); err != nil { - common.TasksErrored.WithLabelValues(p.network.Name, ProcessorName, c.ProcessForwardsQueue(ProcessorName), ProcessForwardsTaskType, "unmarshal_error").Inc() + common.TasksErrored.WithLabelValues(p.network.Name, ProcessorName, tracker.ProcessForwardsQueue(ProcessorName), ProcessForwardsTaskType, "unmarshal_error").Inc() return fmt.Errorf("failed to unmarshal process payload: %w", err) } @@ -55,13 +55,16 @@ func (p *Processor) handleProcessForwardsTask(ctx context.Context, task *asynq.T // Process transaction using ch-go streaming structlogCount, err := p.ProcessTransaction(ctx, block, int(payload.TransactionIndex), tx) if err != nil { - common.TasksErrored.WithLabelValues(p.network.Name, ProcessorName, c.ProcessForwardsQueue(ProcessorName), ProcessForwardsTaskType, "processing_error").Inc() + common.TasksErrored.WithLabelValues(p.network.Name, ProcessorName, tracker.ProcessForwardsQueue(ProcessorName), ProcessForwardsTaskType, "processing_error").Inc() return fmt.Errorf("failed to process transaction: %w", err) } // Record successful processing - common.TasksProcessed.WithLabelValues(p.network.Name, ProcessorName, c.ProcessForwardsQueue(ProcessorName), ProcessForwardsTaskType, "success").Inc() + common.TasksProcessed.WithLabelValues(p.network.Name, ProcessorName, tracker.ProcessForwardsQueue(ProcessorName), ProcessForwardsTaskType, "success").Inc() + + // Track block completion using embedded Blocker + p.TrackBlockCompletion(ctx, blockNumber.Uint64(), tracker.FORWARDS_MODE) p.log.WithFields(logrus.Fields{ "transaction_hash": payload.TransactionHash, @@ -77,12 +80,12 @@ func (p *Processor) handleProcessBackwardsTask(ctx context.Context, task *asynq. defer func() { duration := time.Since(start) - common.TaskProcessingDuration.WithLabelValues(p.network.Name, ProcessorName, c.ProcessBackwardsQueue(ProcessorName), ProcessBackwardsTaskType).Observe(duration.Seconds()) + common.TaskProcessingDuration.WithLabelValues(p.network.Name, ProcessorName, tracker.ProcessBackwardsQueue(ProcessorName), ProcessBackwardsTaskType).Observe(duration.Seconds()) }() var payload ProcessPayload if err := payload.UnmarshalBinary(task.Payload()); err != nil { - common.TasksErrored.WithLabelValues(p.network.Name, ProcessorName, c.ProcessBackwardsQueue(ProcessorName), ProcessBackwardsTaskType, "unmarshal_error").Inc() + common.TasksErrored.WithLabelValues(p.network.Name, ProcessorName, tracker.ProcessBackwardsQueue(ProcessorName), ProcessBackwardsTaskType, "unmarshal_error").Inc() return fmt.Errorf("failed to unmarshal process payload: %w", err) } @@ -114,13 +117,16 @@ func (p *Processor) handleProcessBackwardsTask(ctx context.Context, task *asynq. // Process transaction using ch-go streaming structlogCount, err := p.ProcessTransaction(ctx, block, int(payload.TransactionIndex), tx) if err != nil { - common.TasksErrored.WithLabelValues(p.network.Name, ProcessorName, c.ProcessBackwardsQueue(ProcessorName), ProcessBackwardsTaskType, "processing_error").Inc() + common.TasksErrored.WithLabelValues(p.network.Name, ProcessorName, tracker.ProcessBackwardsQueue(ProcessorName), ProcessBackwardsTaskType, "processing_error").Inc() return fmt.Errorf("failed to process transaction: %w", err) } // Record successful processing - common.TasksProcessed.WithLabelValues(p.network.Name, ProcessorName, c.ProcessBackwardsQueue(ProcessorName), ProcessBackwardsTaskType, "success").Inc() + common.TasksProcessed.WithLabelValues(p.network.Name, ProcessorName, tracker.ProcessBackwardsQueue(ProcessorName), ProcessBackwardsTaskType, "success").Inc() + + // Track block completion using embedded Blocker + p.TrackBlockCompletion(ctx, blockNumber.Uint64(), tracker.BACKWARDS_MODE) p.log.WithFields(logrus.Fields{ "transaction_hash": payload.TransactionHash, diff --git a/pkg/processor/transaction/structlog/processor.go b/pkg/processor/transaction/structlog/processor.go index 65ba613..5a2b66c 100644 --- a/pkg/processor/transaction/structlog/processor.go +++ b/pkg/processor/transaction/structlog/processor.go @@ -3,15 +3,21 @@ package structlog import ( "context" "fmt" + "math" + + "github.com/hibiken/asynq" + "github.com/redis/go-redis/v9" + "github.com/sirupsen/logrus" "github.com/ethpandaops/execution-processor/pkg/clickhouse" "github.com/ethpandaops/execution-processor/pkg/ethereum" - c "github.com/ethpandaops/execution-processor/pkg/processor/common" + "github.com/ethpandaops/execution-processor/pkg/processor/tracker" "github.com/ethpandaops/execution-processor/pkg/state" - "github.com/hibiken/asynq" - "github.com/sirupsen/logrus" ) +// Compile-time interface compliance check. +var _ tracker.BlockProcessor = (*Processor)(nil) + // Dependencies contains the dependencies needed for the processor. type Dependencies struct { Log logrus.FieldLogger @@ -19,6 +25,7 @@ type Dependencies struct { Network *ethereum.Network State *state.Manager AsynqClient *asynq.Client + RedisClient *redis.Client RedisPrefix string } @@ -33,6 +40,10 @@ type Processor struct { asynqClient *asynq.Client processingMode string redisPrefix string + pendingTracker *tracker.PendingTracker + + // Embedded limiter for shared blocking/completion logic + *tracker.Limiter } // New creates a new transaction structlog processor. @@ -47,21 +58,46 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { return nil, fmt.Errorf("failed to create clickhouse client for transaction_structlog: %w", err) } + // Set default for MaxPendingBlockRange + if config.MaxPendingBlockRange <= 0 { + config.MaxPendingBlockRange = tracker.DefaultMaxPendingBlockRange + } + + log := deps.Log.WithField("processor", ProcessorName) + pendingTracker := tracker.NewPendingTracker(deps.RedisClient, deps.RedisPrefix, log) + + // Create the limiter for shared functionality + limiter := tracker.NewLimiter( + &tracker.LimiterDeps{ + Log: log, + StateProvider: deps.State, + PendingTracker: pendingTracker, + Network: deps.Network.Name, + Processor: ProcessorName, + }, + tracker.LimiterConfig{ + MaxPendingBlockRange: config.MaxPendingBlockRange, + }, + ) + processor := &Processor{ - log: deps.Log.WithField("processor", ProcessorName), + log: log, pool: deps.Pool, stateManager: deps.State, clickhouse: clickhouseClient, config: config, asynqClient: deps.AsynqClient, - processingMode: c.FORWARDS_MODE, // Default mode + processingMode: tracker.FORWARDS_MODE, // Default mode redisPrefix: deps.RedisPrefix, + pendingTracker: pendingTracker, + Limiter: limiter, } processor.network = deps.Network processor.log.WithFields(logrus.Fields{ - "network": processor.network.Name, + "network": processor.network.Name, + "max_pending_block_range": config.MaxPendingBlockRange, }).Info("Detected network") return processor, nil @@ -93,14 +129,14 @@ func (p *Processor) Name() string { } // GetQueues returns the queues used by this processor. -func (p *Processor) GetQueues() []c.QueueInfo { - return []c.QueueInfo{ +func (p *Processor) GetQueues() []tracker.QueueInfo { + return []tracker.QueueInfo{ { - Name: c.PrefixedProcessForwardsQueue(ProcessorName, p.redisPrefix), + Name: tracker.PrefixedProcessForwardsQueue(ProcessorName, p.redisPrefix), Priority: 10, // Highest priority for forwards processing }, { - Name: c.PrefixedProcessBackwardsQueue(ProcessorName, p.redisPrefix), + Name: tracker.PrefixedProcessBackwardsQueue(ProcessorName, p.redisPrefix), Priority: 5, // Medium priority for backwards processing }, } @@ -114,8 +150,10 @@ func (p *Processor) GetHandlers() map[string]asynq.HandlerFunc { } } -// EnqueueTask enqueues a task to the specified queue. +// EnqueueTask enqueues a task to the specified queue with infinite retries. func (p *Processor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error { + opts = append(opts, asynq.MaxRetry(math.MaxInt32)) + _, err := p.asynqClient.EnqueueContext(ctx, task, opts...) return err @@ -129,10 +167,10 @@ func (p *Processor) SetProcessingMode(mode string) { // getProcessForwardsQueue returns the prefixed process forwards queue name. func (p *Processor) getProcessForwardsQueue() string { - return c.PrefixedProcessForwardsQueue(ProcessorName, p.redisPrefix) + return tracker.PrefixedProcessForwardsQueue(ProcessorName, p.redisPrefix) } // getProcessBackwardsQueue returns the prefixed process backwards queue name. func (p *Processor) getProcessBackwardsQueue() string { - return c.PrefixedProcessBackwardsQueue(ProcessorName, p.redisPrefix) + return tracker.PrefixedProcessBackwardsQueue(ProcessorName, p.redisPrefix) } diff --git a/pkg/processor/transaction/structlog/processor_test.go b/pkg/processor/transaction/structlog/processor_test.go index add904d..571039e 100644 --- a/pkg/processor/transaction/structlog/processor_test.go +++ b/pkg/processor/transaction/structlog/processor_test.go @@ -7,11 +7,12 @@ import ( "testing" "time" - "github.com/ethpandaops/execution-processor/pkg/clickhouse" - c "github.com/ethpandaops/execution-processor/pkg/processor/common" - transaction_structlog "github.com/ethpandaops/execution-processor/pkg/processor/transaction/structlog" "github.com/hibiken/asynq" "github.com/stretchr/testify/assert" + + "github.com/ethpandaops/execution-processor/pkg/clickhouse" + "github.com/ethpandaops/execution-processor/pkg/processor/tracker" + transaction_structlog "github.com/ethpandaops/execution-processor/pkg/processor/transaction/structlog" ) func TestProcessor_Creation(t *testing.T) { @@ -431,7 +432,7 @@ func TestProcessPayload(t *testing.T) { TransactionIndex: 5, NetworkName: "mainnet", Network: "mainnet", - ProcessingMode: c.FORWARDS_MODE, + ProcessingMode: tracker.FORWARDS_MODE, } // Test JSON marshaling @@ -503,7 +504,7 @@ func TestNewProcessForwardsTask(t *testing.T) { t.Errorf("expected transaction hash %s, got %s", payload.TransactionHash, unmarshaled.TransactionHash) } - if unmarshaled.ProcessingMode != c.FORWARDS_MODE { + if unmarshaled.ProcessingMode != tracker.FORWARDS_MODE { t.Errorf("expected processing mode 'forwards', got %s", unmarshaled.ProcessingMode) } } @@ -538,7 +539,7 @@ func TestNewProcessBackwardsTask(t *testing.T) { t.Errorf("expected transaction hash %s, got %s", payload.TransactionHash, unmarshaled.TransactionHash) } - if unmarshaled.ProcessingMode != c.BACKWARDS_MODE { + if unmarshaled.ProcessingMode != tracker.BACKWARDS_MODE { t.Errorf("expected processing mode 'backwards', got %s", unmarshaled.ProcessingMode) } } @@ -566,7 +567,7 @@ func TestAsynqTaskCreation(t *testing.T) { "transaction_index": 5, "network_id": 1, "network_name": "mainnet", - "processing_mode": c.FORWARDS_MODE, + "processing_mode": tracker.FORWARDS_MODE, } data, err := json.Marshal(payload) diff --git a/pkg/processor/transaction/structlog/tasks.go b/pkg/processor/transaction/structlog/tasks.go index 2a0ff67..22c61d7 100644 --- a/pkg/processor/transaction/structlog/tasks.go +++ b/pkg/processor/transaction/structlog/tasks.go @@ -4,8 +4,9 @@ import ( "encoding/json" "math/big" - c "github.com/ethpandaops/execution-processor/pkg/processor/common" "github.com/hibiken/asynq" + + "github.com/ethpandaops/execution-processor/pkg/processor/tracker" ) const ( @@ -38,7 +39,7 @@ func (p *ProcessPayload) UnmarshalBinary(data []byte) error { // NewProcessForwardsTask creates a new forwards process task. func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error) { - payload.ProcessingMode = c.FORWARDS_MODE + payload.ProcessingMode = tracker.FORWARDS_MODE data, err := json.Marshal(payload) if err != nil { @@ -50,7 +51,7 @@ func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error) { // NewProcessBackwardsTask creates a new backwards process task. func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error) { - payload.ProcessingMode = c.BACKWARDS_MODE + payload.ProcessingMode = tracker.BACKWARDS_MODE data, err := json.Marshal(payload) if err != nil { diff --git a/pkg/processor/transaction/structlog/transaction_processing.go b/pkg/processor/transaction/structlog/transaction_processing.go index ce0f54c..6902d18 100644 --- a/pkg/processor/transaction/structlog/transaction_processing.go +++ b/pkg/processor/transaction/structlog/transaction_processing.go @@ -12,6 +12,7 @@ import ( "github.com/ethpandaops/execution-processor/pkg/common" "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" + "github.com/ethpandaops/execution-processor/pkg/processor/tracker" ) // ProcessTransaction processes a transaction using ch-go columnar streaming. @@ -31,7 +32,7 @@ func (p *Processor) ProcessTransaction(ctx context.Context, block *types.Block, chunkSize := p.config.ChunkSize if chunkSize == 0 { - chunkSize = 10_000 + chunkSize = tracker.DefaultChunkSize } cols := NewColumns() @@ -92,7 +93,7 @@ func (p *Processor) ProcessTransaction(ctx context.Context, block *types.Block, // Log progress for large transactions progressThreshold := p.config.ProgressLogThreshold if progressThreshold == 0 { - progressThreshold = 100_000 + progressThreshold = tracker.DefaultProgressLogThreshold } if totalCount > progressThreshold && end%progressThreshold < chunkSize { @@ -127,7 +128,7 @@ func (p *Processor) getTransactionTrace(ctx context.Context, tx *types.Transacti } // Process transaction with timeout - processCtx, cancel := context.WithTimeout(ctx, 30*time.Second) + processCtx, cancel := context.WithTimeout(ctx, tracker.DefaultTraceTimeout) defer cancel() // Get transaction trace diff --git a/pkg/state/manager.go b/pkg/state/manager.go index dcaa4d9..e7e99e2 100644 --- a/pkg/state/manager.go +++ b/pkg/state/manager.go @@ -8,7 +8,7 @@ import ( "time" "github.com/ethpandaops/execution-processor/pkg/clickhouse" - "github.com/ethpandaops/execution-processor/pkg/processor/common" + "github.com/ethpandaops/execution-processor/pkg/processor/tracker" "github.com/sirupsen/logrus" ) @@ -160,7 +160,7 @@ func (s *Manager) NextBlock(ctx context.Context, processor, network, mode string var err error - if mode == common.BACKWARDS_MODE { + if mode == tracker.BACKWARDS_MODE { progressiveNext, err = s.getProgressiveNextBlockBackwards(ctx, processor, network, chainHead) } else { // Default to forwards mode @@ -173,7 +173,7 @@ func (s *Manager) NextBlock(ctx context.Context, processor, network, mode string // If limiter is disabled or backwards mode, return progressive next block // Limiter only applies to forwards processing to prevent exceeding beacon chain - if !s.limiterEnabled || mode == common.BACKWARDS_MODE { + if !s.limiterEnabled || mode == tracker.BACKWARDS_MODE { return progressiveNext, nil } @@ -447,6 +447,173 @@ func (s *Manager) MarkBlockProcessed(ctx context.Context, blockNumber uint64, ne return nil } +// MarkBlockEnqueued inserts a block with complete=false to track that tasks have been enqueued. +// This is the first phase of two-phase completion tracking. +func (s *Manager) MarkBlockEnqueued(ctx context.Context, blockNumber uint64, taskCount int, network, processor string) error { + query := fmt.Sprintf( + "INSERT INTO %s (updated_date_time, block_number, processor, meta_network_name, complete, task_count) VALUES ('%s', %d, '%s', '%s', 0, %d)", + s.storageTable, + time.Now().Format("2006-01-02 15:04:05"), + blockNumber, + processor, + network, + taskCount, + ) + + err := s.storageClient.Execute(ctx, query) + if err != nil { + return fmt.Errorf("failed to mark block as enqueued in %s: %w", s.storageTable, err) + } + + s.log.WithFields(logrus.Fields{ + "block_number": blockNumber, + "processor": processor, + "network": network, + "task_count": taskCount, + }).Debug("Marked block as enqueued (complete=false)") + + return nil +} + +// MarkBlockComplete inserts a block with complete=true to indicate all tasks finished. +// This is the second phase of two-phase completion tracking. +// ReplacingMergeTree will keep the latest row per (processor, network, block_number). +func (s *Manager) MarkBlockComplete(ctx context.Context, blockNumber uint64, network, processor string) error { + query := fmt.Sprintf( + "INSERT INTO %s (updated_date_time, block_number, processor, meta_network_name, complete, task_count) VALUES ('%s', %d, '%s', '%s', 1, 0)", + s.storageTable, + time.Now().Format("2006-01-02 15:04:05"), + blockNumber, + processor, + network, + ) + + err := s.storageClient.Execute(ctx, query) + if err != nil { + return fmt.Errorf("failed to mark block as complete in %s: %w", s.storageTable, err) + } + + s.log.WithFields(logrus.Fields{ + "block_number": blockNumber, + "processor": processor, + "network": network, + }).Debug("Marked block as complete") + + return nil +} + +// CountIncompleteBlocks returns the count of blocks that are not yet complete. +func (s *Manager) CountIncompleteBlocks(ctx context.Context, network, processor string) (int, error) { + query := fmt.Sprintf(` + SELECT count(*) as count + FROM %s FINAL + WHERE processor = '%s' + AND meta_network_name = '%s' + AND complete = 0 + `, s.storageTable, processor, network) + + count, err := s.storageClient.QueryUInt64(ctx, query, "count") + if err != nil { + return 0, fmt.Errorf("failed to count incomplete blocks: %w", err) + } + + if count == nil { + return 0, nil + } + + // Safe conversion - count of incomplete blocks will never exceed int max + if *count > uint64(^uint(0)>>1) { + return 0, fmt.Errorf("incomplete block count exceeds int max: %d", *count) + } + + return int(*count), nil +} + +// GetOldestIncompleteBlock returns the oldest incomplete block >= minBlockNumber. +// Returns nil if no incomplete blocks exist within the range. +// The minBlockNumber parameter enables startup optimization by limiting the search range. +func (s *Manager) GetOldestIncompleteBlock(ctx context.Context, network, processor string, minBlockNumber uint64) (*uint64, error) { + query := fmt.Sprintf(` + SELECT block_number + FROM %s FINAL + WHERE processor = '%s' + AND meta_network_name = '%s' + AND complete = 0 + AND block_number >= %d + ORDER BY block_number ASC + LIMIT 1 + `, s.storageTable, processor, network, minBlockNumber) + + s.log.WithFields(logrus.Fields{ + "processor": processor, + "network": network, + "min_block_number": minBlockNumber, + }).Debug("Querying for oldest incomplete block") + + blockNumber, err := s.storageClient.QueryUInt64(ctx, query, "block_number") + if err != nil { + return nil, fmt.Errorf("failed to get oldest incomplete block: %w", err) + } + + return blockNumber, nil +} + +// GetNewestIncompleteBlock returns the newest incomplete block <= maxBlockNumber. +// Returns nil if no incomplete blocks exist within the range. +// The maxBlockNumber parameter enables startup optimization by limiting the search range. +// This method is used for backwards processing mode. +func (s *Manager) GetNewestIncompleteBlock(ctx context.Context, network, processor string, maxBlockNumber uint64) (*uint64, error) { + query := fmt.Sprintf(` + SELECT block_number + FROM %s FINAL + WHERE processor = '%s' + AND meta_network_name = '%s' + AND complete = 0 + AND block_number <= %d + ORDER BY block_number DESC + LIMIT 1 + `, s.storageTable, processor, network, maxBlockNumber) + + s.log.WithFields(logrus.Fields{ + "processor": processor, + "network": network, + "max_block_number": maxBlockNumber, + }).Debug("Querying for newest incomplete block") + + blockNumber, err := s.storageClient.QueryUInt64(ctx, query, "block_number") + if err != nil { + return nil, fmt.Errorf("failed to get newest incomplete block: %w", err) + } + + return blockNumber, nil +} + +// GetIncompleteBlocks returns block numbers that are not yet complete, ordered by block_number. +func (s *Manager) GetIncompleteBlocks(ctx context.Context, network, processor string, limit int) ([]uint64, error) { + query := fmt.Sprintf(` + SELECT block_number + FROM %s FINAL + WHERE processor = '%s' + AND meta_network_name = '%s' + AND complete = 0 + ORDER BY block_number ASC + LIMIT %d + `, s.storageTable, processor, network, limit) + + s.log.WithFields(logrus.Fields{ + "processor": processor, + "network": network, + "limit": limit, + }).Debug("Querying for incomplete blocks") + + blocks, err := s.storageClient.QueryUInt64Slice(ctx, query, "block_number") + if err != nil { + return nil, fmt.Errorf("failed to get incomplete blocks: %w", err) + } + + return blocks, nil +} + func (s *Manager) GetMinMaxStoredBlocks(ctx context.Context, network, processor string) (minBlock, maxBlock *big.Int, err error) { query := fmt.Sprintf(` SELECT min(block_number) as min, max(block_number) as max @@ -532,7 +699,7 @@ func (s *Manager) GetHeadDistance(ctx context.Context, processor, network, mode } // Determine which head to use based on limiter status and mode - if !s.limiterEnabled || mode == common.BACKWARDS_MODE { + if !s.limiterEnabled || mode == tracker.BACKWARDS_MODE { // Use execution node head if executionHead == nil { return 0, "", fmt.Errorf("execution head not available") diff --git a/pkg/state/manager_test.go b/pkg/state/manager_test.go index da33f5b..a97e6f6 100644 --- a/pkg/state/manager_test.go +++ b/pkg/state/manager_test.go @@ -44,6 +44,18 @@ func (m *MockClickHouseClient) QueryMinMaxUInt64(ctx context.Context, query stri return minResult, maxResult, args.Error(2) } +func (m *MockClickHouseClient) QueryUInt64Slice(ctx context.Context, query string, columnName string) ([]uint64, error) { + args := m.Called(ctx, query, columnName) + + if args.Get(0) == nil { + return nil, args.Error(1) + } + + result, _ := args.Get(0).([]uint64) + + return result, args.Error(1) +} + func (m *MockClickHouseClient) Execute(ctx context.Context, query string) error { args := m.Called(ctx, query) @@ -391,3 +403,149 @@ func TestGetProgressiveNextBlock_NoResultsVsBlock0(t *testing.T) { }) } } + +func TestGetOldestIncompleteBlock(t *testing.T) { + ctx := context.Background() + log := logrus.New() + log.SetLevel(logrus.DebugLevel) + + tests := []struct { + name string + minBlockNumber uint64 + mockReturn *uint64 + expected *uint64 + expectError bool + }{ + { + name: "no incomplete blocks", + minBlockNumber: 0, + mockReturn: nil, + expected: nil, + expectError: false, + }, + { + name: "finds oldest at 101", + minBlockNumber: 0, + mockReturn: uint64Ptr(101), + expected: uint64Ptr(101), + expectError: false, + }, + { + name: "respects minBlockNumber filter", + minBlockNumber: 100, + mockReturn: uint64Ptr(105), + expected: uint64Ptr(105), + expectError: false, + }, + { + name: "no matches above minBlockNumber", + minBlockNumber: 500, + mockReturn: nil, + expected: nil, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockClient := new(MockClickHouseClient) + mockClient.On("QueryUInt64", ctx, mock.AnythingOfType("string"), "block_number").Return(tt.mockReturn, nil) + + manager := &Manager{ + log: log.WithField("test", tt.name), + storageClient: mockClient, + storageTable: "test_table", + } + + result, err := manager.GetOldestIncompleteBlock(ctx, "mainnet", "test_processor", tt.minBlockNumber) + + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + + if tt.expected == nil { + assert.Nil(t, result) + } else { + assert.NotNil(t, result) + assert.Equal(t, *tt.expected, *result) + } + } + + mockClient.AssertExpectations(t) + }) + } +} + +func TestGetNewestIncompleteBlock(t *testing.T) { + ctx := context.Background() + log := logrus.New() + log.SetLevel(logrus.DebugLevel) + + tests := []struct { + name string + maxBlockNumber uint64 + mockReturn *uint64 + expected *uint64 + expectError bool + }{ + { + name: "no incomplete blocks", + maxBlockNumber: 1000, + mockReturn: nil, + expected: nil, + expectError: false, + }, + { + name: "finds newest at 250", + maxBlockNumber: 1000, + mockReturn: uint64Ptr(250), + expected: uint64Ptr(250), + expectError: false, + }, + { + name: "respects maxBlockNumber filter", + maxBlockNumber: 300, + mockReturn: uint64Ptr(250), + expected: uint64Ptr(250), + expectError: false, + }, + { + name: "no matches below maxBlockNumber", + maxBlockNumber: 100, + mockReturn: nil, + expected: nil, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockClient := new(MockClickHouseClient) + mockClient.On("QueryUInt64", ctx, mock.AnythingOfType("string"), "block_number").Return(tt.mockReturn, nil) + + manager := &Manager{ + log: log.WithField("test", tt.name), + storageClient: mockClient, + storageTable: "test_table", + } + + result, err := manager.GetNewestIncompleteBlock(ctx, "mainnet", "test_processor", tt.maxBlockNumber) + + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + + if tt.expected == nil { + assert.Nil(t, result) + } else { + assert.NotNil(t, result) + assert.Equal(t, *tt.expected, *result) + } + } + + mockClient.AssertExpectations(t) + }) + } +}