Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ processors:
database: "default"
table: "canonical_execution_transaction_structlog"
# debug: false # Enable debug logging for ClickHouse queries
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
# Channel-based batching configuration for memory-efficient processing
# bigTransactionThreshold: 500000 # Transactions with more structlogs are considered "big" (default: 500000)
# chunkSize: 10000 # Number of structlogs per batch (default: 10000)
Expand All @@ -69,6 +70,7 @@ processors:
database: "default"
table: "execution_transaction"
# debug: false # Enable debug logging for ClickHouse queries
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)

# Application settings
shutdownTimeout: 6m
42 changes: 42 additions & 0 deletions pkg/clickhouse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,48 @@ func (c *Client) QueryMinMaxUInt64(ctx context.Context, query string) (minVal, m
return minVal, maxVal, nil
}

// QueryUInt64Slice executes a query and returns all UInt64 values from the specified column.
// Returns an empty slice if no rows are found.
func (c *Client) QueryUInt64Slice(ctx context.Context, query string, columnName string) ([]uint64, error) {
start := time.Now()
operation := "query_uint64_slice"
status := statusSuccess

defer func() {
c.recordMetrics(operation, status, time.Since(start), query)
}()

result := make([]uint64, 0)
col := new(proto.ColUInt64)

err := c.doWithRetry(ctx, operation, func(attemptCtx context.Context) error {
col.Reset()

result = result[:0]

return c.pool.Do(attemptCtx, ch.Query{
Body: query,
Result: proto.Results{
{Name: columnName, Data: col},
},
OnResult: func(ctx context.Context, block proto.Block) error {
for i := 0; i < col.Rows(); i++ {
result = append(result, col.Row(i))
}

return nil
},
})
})
if err != nil {
status = statusFailed

return nil, fmt.Errorf("query failed: %w", err)
}

return result, nil
}

// Execute runs a query without expecting results.
func (c *Client) Execute(ctx context.Context, query string) error {
start := time.Now()
Expand Down
3 changes: 3 additions & 0 deletions pkg/clickhouse/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,7 @@ type ClientInterface interface {
// The query must return columns named "min" and "max".
// Returns nil for both values if no rows are found.
QueryMinMaxUInt64(ctx context.Context, query string) (minVal, maxVal *uint64, err error)
// QueryUInt64Slice executes a query and returns all UInt64 values from the specified column.
// Returns an empty slice if no rows are found.
QueryUInt64Slice(ctx context.Context, query string, columnName string) ([]uint64, error)
}
18 changes: 18 additions & 0 deletions pkg/ethereum/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package ethereum

import "errors"

// Sentinel errors for Ethereum client operations.
var (
// ErrNoHealthyNode indicates no healthy execution node is available.
ErrNoHealthyNode = errors.New("no healthy execution node available")

// ErrBlockNotFound indicates a block was not found on the execution client.
ErrBlockNotFound = errors.New("block not found")

// ErrTransactionNotFound indicates a transaction was not found.
ErrTransactionNotFound = errors.New("transaction not found")

// ErrUnsupportedChainID indicates an unsupported chain ID was provided.
ErrUnsupportedChainID = errors.New("unsupported chain ID")
)
41 changes: 0 additions & 41 deletions pkg/processor/common/interfaces.go

This file was deleted.

20 changes: 10 additions & 10 deletions pkg/processor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"time"

"github.com/ethpandaops/execution-processor/pkg/processor/common"
"github.com/ethpandaops/execution-processor/pkg/processor/tracker"
"github.com/ethpandaops/execution-processor/pkg/processor/transaction/simple"
"github.com/ethpandaops/execution-processor/pkg/processor/transaction/structlog"
)
Expand Down Expand Up @@ -55,28 +55,28 @@ type WorkerConfig struct {

func (c *Config) Validate() error {
if c.Interval == 0 {
c.Interval = 10 * time.Second
c.Interval = DefaultInterval
}

if c.Mode == "" {
c.Mode = common.FORWARDS_MODE
c.Mode = tracker.FORWARDS_MODE
}

if c.Mode != common.FORWARDS_MODE && c.Mode != common.BACKWARDS_MODE {
return fmt.Errorf("invalid mode %s, must be '%s' or '%s'", c.Mode, common.FORWARDS_MODE, common.BACKWARDS_MODE)
if c.Mode != tracker.FORWARDS_MODE && c.Mode != tracker.BACKWARDS_MODE {
return fmt.Errorf("invalid mode %s, must be '%s' or '%s'", c.Mode, tracker.FORWARDS_MODE, tracker.BACKWARDS_MODE)
}

if c.Concurrency == 0 {
c.Concurrency = 20
c.Concurrency = DefaultConcurrency
}

// Queue control defaults
if c.MaxProcessQueueSize == 0 {
c.MaxProcessQueueSize = 1000
c.MaxProcessQueueSize = DefaultMaxProcessQueue
}

if c.BackpressureHysteresis == 0 {
c.BackpressureHysteresis = 0.8
c.BackpressureHysteresis = DefaultBackpressureHysteresis
}

// Set leader election defaults
Expand All @@ -86,11 +86,11 @@ func (c *Config) Validate() error {
}

if c.LeaderElection.TTL == 0 {
c.LeaderElection.TTL = 10 * time.Second
c.LeaderElection.TTL = DefaultLeaderTTL
}

if c.LeaderElection.RenewalInterval == 0 {
c.LeaderElection.RenewalInterval = 3 * time.Second
c.LeaderElection.RenewalInterval = DefaultLeaderRenewalInterval
}

// Validate leader election settings
Expand Down
27 changes: 27 additions & 0 deletions pkg/processor/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package processor

import "time"

// Default configuration values for the processor manager.
// Processor-specific defaults are in the tracker package.
const (
// DefaultInterval is the default interval between processing cycles.
DefaultInterval = 10 * time.Second

// DefaultConcurrency is the default number of concurrent workers for task processing.
DefaultConcurrency = 20

// DefaultMaxProcessQueue is the default max queue size for asynq.
DefaultMaxProcessQueue = 1000

// DefaultBackpressureHysteresis is the default hysteresis factor for backpressure.
// When backpressure is triggered, it won't be released until queue size drops
// to this fraction of the threshold.
DefaultBackpressureHysteresis = 0.8

// DefaultLeaderTTL is the default TTL for leader election locks.
DefaultLeaderTTL = 10 * time.Second

// DefaultLeaderRenewalInterval is the default renewal interval for leader election.
DefaultLeaderRenewalInterval = 3 * time.Second
)
Loading