⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
a978c2c
feat(structlog): add call frame tracking to identify EVM call contexts
mattevans Jan 14, 2026
3b880bc
test: add comprehensive unit tests for extractCallAddress function
mattevans Jan 14, 2026
fe336bf
test: add unit tests for CREATE/CREATE2 address extraction
mattevans Jan 14, 2026
2e59fcf
style(transaction_processing.go): move comment to line above log to m…
mattevans Jan 14, 2026
a43dac7
refactor(processor): replace receipt-based CREATE address lookup with…
mattevans Jan 14, 2026
b74db36
fix(call_tracker): align root frame depth with EVM traces (depth 1)
mattevans Jan 14, 2026
cbc6942
feat(structlog): add GasSelf field to isolate CALL/CREATE overhead fr…
mattevans Jan 15, 2026
618519b
test(structlog): fix and expand address extraction tests for CALL opc…
mattevans Jan 19, 2026
0308dbc
WIP: current changes
mattevans Jan 21, 2026
54994f3
fix(structlog): correct EOA call detection to prevent phantom synthet…
mattevans Jan 21, 2026
4c633d2
chore: accidental commit
mattevans Jan 21, 2026
33528d0
refactor: remove unused ParityTrace types and helper from execution p…
mattevans Jan 21, 2026
14d2d34
feat: add Node interface and EmbeddedNode for library embedding
Savid Jan 22, 2026
b796d44
refactor: abstract execution types to remove CGO dependency
Savid Jan 22, 2026
d78f1e1
refactor(structlog): add embedded-mode support to eliminate 99% of st…
mattevans Jan 22, 2026
f535991
perf(embedded_node): force DisableStack=true in DebugTraceTransaction…
mattevans Jan 22, 2026
4d3f78b
feat: add support for pre-computed GasUsed values from embedded tracer
mattevans Jan 22, 2026
237eba7
style(transaction_processing.go): add blank line after batch slice cr…
mattevans Jan 22, 2026
43e1b54
Merge branch 'master' into feat/call-frames
mattevans Jan 22, 2026
1ced807
Merge pull request #46 from ethpandaops/feat/embedded-node-interface-…
mattevans Jan 22, 2026
aa1b063
Merge branch 'master' into feat/embedded-node-interface
mattevans Jan 22, 2026
63f256b
refactor: change ChainID return type from int32 to int64
mattevans Jan 22, 2026
759d6eb
test(embedded_node_test.go): remove obsolete TestEmbeddedNode_Delegat…
mattevans Jan 23, 2026
c583e0a
Merge branch 'feat/call-frames' into feat/embedded-node-interface
mattevans Jan 23, 2026
0f7509b
fix(pool): register OnReady callbacks before spawning goroutines
mattevans Jan 23, 2026
23bb5a5
refactor(embedded_node): add debug logging for OnReady callback execu…
mattevans Jan 23, 2026
88b9ea4
fix(transaction_processing.go): extract pre-computed GasUsed values f…
mattevans Jan 23, 2026
1b73163
test(gas_cost_test.go): add unit tests for hasPrecomputedGasUsed func…
mattevans Jan 23, 2026
47e62b4
feat: detect pre-computed CREATE/CREATE2 addresses to skip expensive …
mattevans Jan 23, 2026
e3bc1c9
style(embedded_node.go): add blank line before callback execution for…
mattevans Jan 23, 2026
baa62fe
merge latest master
Savid Jan 23, 2026
e8d972b
lint
Savid Jan 23, 2026
d471495
refactor(structlog): remove ProgramCounter field from Structlog struct
mattevans Jan 26, 2026
bf89895
Merge branch 'feat/embedded-node-interface' into feat/trace-create
mattevans Jan 26, 2026
f73bdcd
Merge pull request #49 from ethpandaops/feat/trace-create
mattevans Jan 26, 2026
908f3d4
feat: introduce transaction call_frame processor for EVM execution an…
mattevans Jan 27, 2026
9c23ef5
refactor(call_frame): unify root frame CallType to empty string and d…
mattevans Jan 27, 2026
97302d6
fix(aggregator): skip synthetic EOA rows in opcode count
mattevans Jan 27, 2026
f698cef
refactor(processor): replace call_frame processor with structlog_agg …
mattevans Jan 27, 2026
7b3b385
fix: replace leftover "call_frame" references with "structlog_agg"
mattevans Jan 27, 2026
293932e
Merge pull request #51 from ethpandaops/feat/callframe-aggs
mattevans Jan 27, 2026
b384357
fix(aggregator): skip gas refund and intrinsic gas for failed transac…
mattevans Jan 28, 2026
624fce9
fix: prevent uint64 underflow in gas calculations and correct intrins…
mattevans Jan 28, 2026
8c54a87
Merge pull request #53 from ethpandaops/fix/overflow-gas
mattevans Jan 28, 2026
0ef9905
Merge pull request #52 from ethpandaops/fix/intrinsic-gas-calc-error-tx
mattevans Jan 28, 2026
382bb90
Merge branch 'master' into release/embed-mode-structlog-agg
mattevans Jan 28, 2026
0c33e62
Merge branch 'master' into release/embed-mode-structlog-agg
mattevans Jan 28, 2026
c292ca9
fix(geth): sanitize corrupted gasCost values from Erigon underflow bug
mattevans Jan 28, 2026
e8642fa
fix(aggregator): count REVERT as error even when opcode has no error …
mattevans Jan 28, 2026
fb619bf
Merge pull request #57 from ethpandaops/fix/revert-recorded-as-fails
mattevans Jan 29, 2026
09a176c
Merge pull request #56 from ethpandaops/fix/overflow-gas-cost
mattevans Jan 29, 2026
e7830f2
fix(state): handle nullable UInt32 column in getLimiterMaxBlock
mattevans Jan 30, 2026
5b862dc
fix(manager): start state manager in embedded mode to ensure ClickHou…
mattevans Jan 30, 2026
85b2e51
Merge pull request #58 from ethpandaops/fix/type-mismatch
mattevans Jan 30, 2026
ffcdc41
fix(state): replace toUInt64OrZero with ifNull+toUInt64 for nullable …
mattevans Jan 30, 2026
91a2a6d
fix(leaderelection): add callback-based notification for guaranteed d…
Savid Jan 30, 2026
90d2bb8
Merge pull request #59 from ethpandaops/fix/leadership-callback-notif…
mattevans Jan 30, 2026
22f22c4
fix: reorder block processing steps to prevent race condition
mattevans Jan 30, 2026
a6d6294
Merge pull request #62 from ethpandaops/fix/enqueue-order
mattevans Jan 30, 2026
681625c
refactor(leaderelection): replace hand-rolled Redis locking with reds…
Savid Jan 30, 2026
6124fe8
perf: move expensive metrics updates to background workers
mattevans Jan 30, 2026
3bb1c2a
perf(manager.go): remove FINAL keyword from queryLimiterMaxBlock query
mattevans Jan 30, 2026
89872f1
style(manager.go): remove "(optimized)" from debug log message
mattevans Jan 30, 2026
f92bc27
Merge pull request #63 from ethpandaops/refactor/next-block-hot-path
mattevans Jan 30, 2026
6f329fa
fix(pending.go): use SetNX with 30min TTL to prevent duplicate block …
mattevans Jan 30, 2026
f64e8cb
feat(processor): zero-interval processing mode as default (#65)
Savid Jan 30, 2026
5bc68cd
Add async insert settings to processors (#67)
Savid Jan 31, 2026
3b9afba
feat(rowbuffer): add row batching system for ClickHouse inserts (#68)
Savid Feb 1, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 91 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ A distributed system for processing Ethereum execution layer data with support f
### Core Components

- **Ethereum Nodes**: Configure execution node endpoints
- **Redis**: Task queue and leader election coordination
- **Redis**: Task queue and leader election coordination
- **State Manager**: Track processing progress in ClickHouse
- **Processors**: Configure structlog extraction settings

Expand All @@ -57,6 +57,93 @@ A distributed system for processing Ethereum execution layer data with support f
└─────────────────────────────────────────┘
```

## Embedded Mode (Library Usage)

The execution-processor can be embedded as a library within an execution client, providing direct data access without JSON-RPC overhead.

### Implementing DataSource

```go
import (
"context"
"math/big"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
)

type MyDataSource struct {
client *MyExecutionClient
}

func (ds *MyDataSource) BlockNumber(ctx context.Context) (*uint64, error) {
num := ds.client.CurrentBlock()
return &num, nil
}

func (ds *MyDataSource) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
return ds.client.GetBlock(number), nil
}

func (ds *MyDataSource) BlockReceipts(ctx context.Context, number *big.Int) ([]*types.Receipt, error) {
return ds.client.GetBlockReceipts(number), nil
}

func (ds *MyDataSource) TransactionReceipt(ctx context.Context, hash string) (*types.Receipt, error) {
return ds.client.GetReceipt(hash), nil
}

func (ds *MyDataSource) DebugTraceTransaction(
ctx context.Context,
hash string,
blockNumber *big.Int,
opts execution.TraceOptions,
) (*execution.TraceTransaction, error) {
return ds.client.TraceTransaction(hash, opts), nil
}

func (ds *MyDataSource) ChainID() int64 {
return ds.client.ChainID()
}

func (ds *MyDataSource) ClientType() string {
return "my-client/1.0.0"
}

func (ds *MyDataSource) IsSynced() bool {
return ds.client.IsSynced()
}
```

### Creating an Embedded Pool

```go
import (
"github.com/ethpandaops/execution-processor/pkg/ethereum"
"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
)

// Create embedded node with your data source
dataSource := &MyDataSource{client: myClient}
node := execution.NewEmbeddedNode(log, "embedded", dataSource)

// Create pool with the embedded node
pool := ethereum.NewPoolWithNodes(log, "processor", []execution.Node{node}, nil)
pool.Start(ctx)

// Mark ready when your client is synced and ready to serve data
node.MarkReady(ctx)
```

### Embedded vs RPC Mode

| Aspect | RPC Mode | Embedded Mode |
|--------|----------|---------------|
| Data Access | JSON-RPC over HTTP | Direct function calls |
| Readiness | Auto-detected via RPC health checks | Host calls MarkReady() |
| Performance | Network + serialization overhead | Zero serialization overhead |
| Use Case | External execution clients | Library integration |

## Manual Block Queue API

The execution processor provides an HTTP API for manually queuing blocks for reprocessing. This is useful for fixing data issues or reprocessing specific blocks.
Expand All @@ -80,7 +167,7 @@ curl -X POST http://localhost:8080/api/v1/queue/block/transaction_structlog/1234
"status": "queued",
"block_number": 12345,
"processor": "transaction_structlog",
"queue": "process:forwards",
"queue": "process:forwards",
"transaction_count": 150,
"tasks_created": 150
}
Expand Down Expand Up @@ -158,7 +245,7 @@ curl -X POST http://localhost:8080/api/v1/queue/blocks/transaction_structlog \
# Run tests
go test ./...

# Run with race detector
# Run with race detector
go test ./... --race

# Build
Expand All @@ -167,4 +254,4 @@ go build .

## License

See LICENSE file.
See LICENSE file.
31 changes: 18 additions & 13 deletions example_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,32 @@ processors:
addr: "localhost:9000"
database: "default"
table: "canonical_execution_transaction_structlog"
# debug: false # Enable debug logging for ClickHouse queries
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
# Channel-based batching configuration for memory-efficient processing
# bigTransactionThreshold: 500000 # Transactions with more structlogs are considered "big" (default: 500000)
# chunkSize: 10000 # Number of structlogs per batch (default: 10000)
# channelBufferSize: 2 # Number of chunks to buffer in channel (default: 2)
# progressLogThreshold: 100000 # Log progress every N structlogs for large transactions (default: 100000)
# debug: false # Enable debug logging for ClickHouse queries
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)

# Small transaction batching configuration
# batchInsertThreshold: 50000 # Transactions with fewer structlogs than this will be batched (default: 50000)
# batchFlushInterval: 5s # Maximum time to wait before flushing a batch (default: 5s)
# batchMaxSize: 100000 # Maximum number of structlogs to accumulate in a batch (default: 100000)
# Aggregated structlog processor (call frame level aggregation)
transactionStructlogAgg:
enabled: false
addr: "localhost:9000"
database: "default"
table: "canonical_execution_transaction_structlog_agg"
# debug: false # Enable debug logging for ClickHouse queries
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)

# Simple transaction processor (lightweight - no debug traces)
transactionSimple:
enabled: false
addr: "localhost:9000"
database: "default"
table: "execution_transaction"
# debug: false # Enable debug logging for ClickHouse queries
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
# debug: false # Enable debug logging for ClickHouse queries
# maxPendingBlockRange: 2 # Max distance between oldest incomplete and current block (default: 2)
# bufferMaxRows: 100000 # Max rows before flush (default: 100000)
# bufferFlushInterval: "1s" # Max time before flush (default: 1s)

# Application settings
shutdownTimeout: 6m
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ require (
github.com/creasty/defaults v1.8.0
github.com/ethereum/go-ethereum v1.16.7
github.com/go-co-op/gocron v1.37.0
github.com/go-redsync/redsync/v4 v4.15.0
github.com/hibiken/asynq v0.25.1
github.com/lib/pq v1.10.9
github.com/prometheus/client_golang v1.20.5
github.com/redis/go-redis/v9 v9.14.0
github.com/redis/go-redis/v9 v9.17.2
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.10.1
github.com/stretchr/testify v1.11.1
Expand Down Expand Up @@ -60,6 +61,8 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.5 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-version v1.7.0 // indirect
github.com/holiman/uint256 v1.3.2 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down
25 changes: 23 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE=
github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v7 v7.4.1 h1:PASvf36gyUpr2zdOUS/9Zqc80GbM+9BDyiJSJDDOrTI=
github.com/go-redis/redis/v7 v7.4.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-redsync/redsync/v4 v4.15.0 h1:KH/XymuxSV7vyKs6z1Cxxj+N+N18JlPxgXeP6x4JY54=
github.com/go-redsync/redsync/v4 v4.15.0/go.mod h1:qNp+lLs3vkfZbtA/aM/OjlZHfEr5YTAYhRktFPKHC7s=
github.com/gofrs/flock v0.12.1 h1:MTLVXXHf8ekldpJk3AKicLij9MdwOWkZ+a/jHHZby9E=
github.com/gofrs/flock v0.12.1/go.mod h1:9zxTsyu5xtJ9DK+1tFZyibEV7y3uwDxPPfbxeeHCoD0=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
Expand All @@ -132,6 +140,8 @@ github.com/golang-jwt/jwt/v4 v4.5.2 h1:YtQM7lnr8iZ+j5q71MGKkNw9Mn7AjHM68uc9g5fXe
github.com/golang-jwt/jwt/v4 v4.5.2/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs=
github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/gomodule/redigo v1.9.3 h1:dNPSXeXv6HCq2jdyWfjgmhBdqnR6PRO3m/G05nvpPC8=
github.com/gomodule/redigo v1.9.3/go.mod h1:KsU3hiK/Ay8U42qpaJk+kuNa3C+spxapWpM+ywhcgtw=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand All @@ -141,8 +151,13 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aN
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.5 h1:jP1RStw811EvUDzsUQ9oESqw2e4RqCjSAD9qIL8eMns=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.5/go.mod h1:WXNBZ64q3+ZUemCMXD9kYnr56H7CgZxDBHCVwstfl3s=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-bexpr v0.1.10 h1:9kuI5PFotCboP3dkDYFr/wi0gg0QVbSNz5oFRpxn4uE=
github.com/hashicorp/go-bexpr v0.1.10/go.mod h1:oxlubA2vC/gFVfX1A6JGp7ls7uCDlfJn732ehYYg+g0=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=
github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/hibiken/asynq v0.25.1 h1:phj028N0nm15n8O2ims+IvJ2gz4k2auvermngh9JhTw=
Expand Down Expand Up @@ -255,8 +270,12 @@ github.com/prometheus/common v0.64.0 h1:pdZeA+g617P7oGv1CzdTzyeShxAGrTBsolKNOLQP
github.com/prometheus/common v0.64.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8=
github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg=
github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is=
github.com/redis/go-redis/v9 v9.14.0 h1:u4tNCjXOyzfgeLN+vAZaW1xUooqWDqVEsZN0U01jfAE=
github.com/redis/go-redis/v9 v9.14.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw=
github.com/redis/go-redis/v9 v9.17.2 h1:P2EGsA4qVIM3Pp+aPocCJ7DguDHhqrXNhVcEp4ViluI=
github.com/redis/go-redis/v9 v9.17.2/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370=
github.com/redis/rueidis v1.0.69 h1:WlUefRhuDekji5LsD387ys3UCJtSFeBVf0e5yI0B8b4=
github.com/redis/rueidis v1.0.69/go.mod h1:Lkhr2QTgcoYBhxARU7kJRO8SyVlgUuEkcJO1Y8MCluA=
github.com/redis/rueidis/rueidiscompat v1.0.69 h1:IWVYY9lXdjNO3do2VpJT7aDFi8zbCUuQxZB6E2Grahs=
github.com/redis/rueidis/rueidiscompat v1.0.69/go.mod h1:iC4Y8DoN0Uth0Uezg9e2trvNRC7QAgGeuP2OPLb5ccI=
github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY=
github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc=
github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
Expand Down Expand Up @@ -299,6 +318,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 h1:QVqDTf3h2WHt08YuiTGPZLls0Wq99X9bWd0Q5ZSBesM=
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203/go.mod h1:oqN97ltKNihBbwlX8dLpwxCl3+HnXKV/R0e+sRLd9C8=
github.com/supranational/blst v0.3.16 h1:bTDadT+3fK497EvLdWRQEjiGnUtzJ7jjIUMF0jqwYhE=
github.com/supranational/blst v0.3.16/go.mod h1:jZJtfjgudtNl4en1tzwPIV3KjUnQUvG3/j+w+fVonLw=
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 h1:epCh84lMvA70Z7CTTCmYQn2CKbY8j86K7/FAIr141uY=
Expand Down
28 changes: 28 additions & 0 deletions pkg/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,4 +205,32 @@ var (
Name: "execution_processor_clickhouse_pool_empty_acquire_wait_duration_seconds",
Help: "Cumulative time spent waiting for a resource when pool was empty",
}, []string{"network", "processor"})

// Row buffer metrics for batched ClickHouse inserts.
RowBufferFlushTotal = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "execution_processor_row_buffer_flush_total",
Help: "Total number of row buffer flushes",
}, []string{"network", "processor", "table", "trigger", "status"})

RowBufferFlushDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "execution_processor_row_buffer_flush_duration_seconds",
Help: "Duration of row buffer flushes",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 15),
}, []string{"network", "processor", "table"})

RowBufferFlushSize = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "execution_processor_row_buffer_flush_size_rows",
Help: "Number of rows per flush",
Buckets: prometheus.ExponentialBuckets(100, 2, 12),
}, []string{"network", "processor", "table"})

RowBufferPendingRows = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "execution_processor_row_buffer_pending_rows",
Help: "Current number of rows waiting in the buffer",
}, []string{"network", "processor", "table"})

RowBufferPendingTasks = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "execution_processor_row_buffer_pending_tasks",
Help: "Current number of tasks waiting for their rows to be flushed",
}, []string{"network", "processor", "table"})
)
84 changes: 84 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Package config provides configuration types for execution-processor.
// This package is designed to be imported without pulling in go-ethereum dependencies,
// making it suitable for embedded mode integrations.
package config

import (
"fmt"
"time"

"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
"github.com/ethpandaops/execution-processor/pkg/processor"
"github.com/ethpandaops/execution-processor/pkg/redis"
"github.com/ethpandaops/execution-processor/pkg/state"
)

// EthereumConfig is the ethereum network configuration.
// This is a copy of ethereum.Config to avoid importing pkg/ethereum
// which would pull in go-ethereum dependencies.
type EthereumConfig struct {
// Execution configuration
Execution []*execution.Config `yaml:"execution"`
// Override network name for custom networks (bypasses networkMap)
OverrideNetworkName *string `yaml:"overrideNetworkName"`
}

// Validate validates the ethereum configuration.
func (c *EthereumConfig) Validate() error {
for i, exec := range c.Execution {
if err := exec.Validate(); err != nil {
return fmt.Errorf("invalid execution configuration at index %d: %w", i, err)
}
}

return nil
}

// Config is the main configuration for execution-processor.
type Config struct {
// MetricsAddr is the address to listen on for metrics.
MetricsAddr string `yaml:"metricsAddr" default:":9090"`
// HealthCheckAddr is the address to listen on for healthcheck.
HealthCheckAddr *string `yaml:"healthCheckAddr"`
// PProfAddr is the address to listen on for pprof.
PProfAddr *string `yaml:"pprofAddr"`
// APIAddr is the address to listen on for the API server.
APIAddr *string `yaml:"apiAddr"`
// LoggingLevel is the logging level to use.
LoggingLevel string `yaml:"logging" default:"info"`
// Ethereum is the ethereum network configuration.
Ethereum EthereumConfig `yaml:"ethereum"`
// Redis is the redis configuration.
Redis *redis.Config `yaml:"redis"`
// StateManager is the state manager configuration.
StateManager state.Config `yaml:"stateManager"`
// Processors is the processor configuration.
Processors processor.Config `yaml:"processors"`
// ShutdownTimeout is the timeout for shutting down the server.
ShutdownTimeout time.Duration `yaml:"shutdownTimeout" default:"10s"`
}

// Validate validates the configuration.
func (c *Config) Validate() error {
if c.Redis == nil {
return fmt.Errorf("redis configuration is required")
}

if err := c.Redis.Validate(); err != nil {
return fmt.Errorf("invalid redis configuration: %w", err)
}

if err := c.Ethereum.Validate(); err != nil {
return fmt.Errorf("invalid ethereum configuration: %w", err)
}

if err := c.StateManager.Validate(); err != nil {
return fmt.Errorf("invalid state manager configuration: %w", err)
}

if err := c.Processors.Validate(); err != nil {
return fmt.Errorf("invalid processor configuration: %w", err)
}

return nil
}
Loading