⚠ This page is served via a proxy. Original site: https://github.com
This service does not collect credentials or authentication data.
Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions pkg/processor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,27 @@ type Config struct {
MaxProcessQueueSize int `yaml:"maxProcessQueueSize"`
BackpressureHysteresis float64 `yaml:"backpressureHysteresis"`

// Stale block detection configuration
StaleBlockDetection StaleBlockDetectionConfig `yaml:"staleBlockDetection"`

// Processor configurations
TransactionStructlog structlog.Config `yaml:"transactionStructlog"`
TransactionSimple simple.Config `yaml:"transactionSimple"`
TransactionStructlogAgg structlog_agg.Config `yaml:"transactionStructlogAgg"`
}

// StaleBlockDetectionConfig holds configuration for stale block detection.
type StaleBlockDetectionConfig struct {
// Enabled enables stale block detection (default: true)
Enabled bool `yaml:"enabled"`

// StaleThreshold is the time after which a block is considered stale (default: 5m)
StaleThreshold time.Duration `yaml:"staleThreshold"`

// CheckInterval is how often to check for stale blocks (default: 1m)
CheckInterval time.Duration `yaml:"checkInterval"`
}

// LeaderElectionConfig holds configuration for leader election.
type LeaderElectionConfig struct {
// Enable leader election (default: true)
Expand Down Expand Up @@ -97,6 +112,20 @@ func (c *Config) Validate() error {
return fmt.Errorf("leader election renewal interval must be less than TTL")
}

// Set stale block detection defaults
// Enable by default unless explicitly disabled
if !c.StaleBlockDetection.Enabled && c.StaleBlockDetection.StaleThreshold == 0 && c.StaleBlockDetection.CheckInterval == 0 {
c.StaleBlockDetection.Enabled = true
}

if c.StaleBlockDetection.StaleThreshold == 0 {
c.StaleBlockDetection.StaleThreshold = tracker.DefaultStaleThreshold
}

if c.StaleBlockDetection.CheckInterval == 0 {
c.StaleBlockDetection.CheckInterval = DefaultStaleBlockCheckInterval
}

if c.TransactionStructlog.Enabled {
if c.TransactionStructlog.Addr == "" {
return fmt.Errorf("transaction structlog addr is required when enabled")
Expand Down
3 changes: 3 additions & 0 deletions pkg/processor/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ const (

// DefaultLeaderRenewalInterval is the default renewal interval for leader election.
DefaultLeaderRenewalInterval = 3 * time.Second

// DefaultStaleBlockCheckInterval is the default interval for checking stale blocks.
DefaultStaleBlockCheckInterval = 1 * time.Minute
)
83 changes: 79 additions & 4 deletions pkg/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,9 +700,15 @@ func (m *Manager) runBlockProcessing(ctx context.Context) {
queueMonitorTicker := time.NewTicker(30 * time.Second) // Monitor queues every 30s
defer queueMonitorTicker.Stop()

// Stale block detection ticker
staleBlockTicker := time.NewTicker(m.config.StaleBlockDetection.CheckInterval)
defer staleBlockTicker.Stop()

m.log.WithFields(logrus.Fields{
"interval": m.config.Interval,
"processors": len(m.processors),
"interval": m.config.Interval,
"processors": len(m.processors),
"stale_check_interval": m.config.StaleBlockDetection.CheckInterval,
"stale_threshold": m.config.StaleBlockDetection.StaleThreshold,
}).Info("Started block processing loop")

// Check if we have any processors
Expand Down Expand Up @@ -737,6 +743,9 @@ func (m *Manager) runBlockProcessing(ctx context.Context) {
case <-queueMonitorTicker.C:
m.log.Debug("Queue monitoring ticker fired")
m.startQueueMonitoring(ctx)
case <-staleBlockTicker.C:
m.log.Debug("Stale block detection ticker fired")
m.checkStaleBlocks(ctx)
default:
if !m.isLeader {
m.log.Warn("No longer leader but block processing still running - stopping")
Expand Down Expand Up @@ -1177,10 +1186,10 @@ func (m *Manager) enqueueSimpleBlockTask(ctx context.Context, p *transaction_sim
var err error

if m.config.Mode == tracker.BACKWARDS_MODE {
task, err = transaction_simple.NewProcessBackwardsTask(payload)
task, _, err = transaction_simple.NewProcessBackwardsTask(payload)
queue = tracker.PrefixedProcessBackwardsQueue(transaction_simple.ProcessorName, m.redisPrefix)
} else {
task, err = transaction_simple.NewProcessForwardsTask(payload)
task, _, err = transaction_simple.NewProcessForwardsTask(payload)
queue = tracker.PrefixedProcessForwardsQueue(transaction_simple.ProcessorName, m.redisPrefix)
}

Expand Down Expand Up @@ -1258,3 +1267,69 @@ func (m *Manager) updateBlocksStoredMetrics(ctx context.Context) {
}
}
}

// checkStaleBlocks checks for stale blocks across all processors and triggers retries.
func (m *Manager) checkStaleBlocks(ctx context.Context) {
if !m.config.StaleBlockDetection.Enabled {
return
}

for processorName, processor := range m.processors {
// Check for context cancellation between processors
select {
case <-ctx.Done():
return
default:
}

// Get the completion tracker based on processor type
var completionTracker *tracker.BlockCompletionTracker

switch p := processor.(type) {
case *transaction_structlog.Processor:
completionTracker = p.GetCompletionTracker()
case *transaction_simple.Processor:
completionTracker = p.GetCompletionTracker()
case *transaction_structlog_agg.Processor:
completionTracker = p.GetCompletionTracker()
default:
m.log.WithField("processor", processorName).Warn("Unknown processor type for stale block detection")

continue
}

if completionTracker == nil {
continue
}

staleBlocks, err := completionTracker.GetStaleBlocks(ctx, m.network.Name, processorName, m.config.Mode)
if err != nil {
m.log.WithError(err).WithField("processor", processorName).Warn("Failed to get stale blocks")

continue
}

for _, blockNum := range staleBlocks {
m.log.WithFields(logrus.Fields{
"processor": processorName,
"block_number": blockNum,
}).Warn("Stale block detected - clearing for re-processing")

// Clear the stale block tracking (it will be re-enqueued on next ProcessNextBlock cycle)
if clearErr := completionTracker.ClearBlock(ctx, blockNum, m.network.Name, processorName, m.config.Mode); clearErr != nil {
m.log.WithError(clearErr).WithFields(logrus.Fields{
"processor": processorName,
"block_number": blockNum,
}).Error("Failed to clear stale block")
}
}

if len(staleBlocks) > 0 {
m.log.WithFields(logrus.Fields{
"processor": processorName,
"stale_count": len(staleBlocks),
"stale_blocks": staleBlocks,
}).Info("Cleared stale blocks for re-processing")
}
}
}
Loading