diff --git a/example_config.yaml b/example_config.yaml index a6734f4..619339f 100644 --- a/example_config.yaml +++ b/example_config.yaml @@ -37,6 +37,12 @@ processors: maxProcessQueueSize: 10000 # Stop processing new blocks if process queue exceeds this size backpressureHysteresis: 0.8 # Clear backpressure when queue drops below this fraction of max (8000 in this case) + # Stale block detection (optional, enabled by default) + staleBlockDetection: + enabled: true + staleThreshold: "5m" # Time after which a block is considered stale + checkInterval: "1m" # How often to check for stale blocks + # Leader election configuration (optional, enabled by default) leaderElection: enabled: true diff --git a/pkg/processor/config.go b/pkg/processor/config.go index b0c6ad8..95a2a55 100644 --- a/pkg/processor/config.go +++ b/pkg/processor/config.go @@ -28,12 +28,27 @@ type Config struct { MaxProcessQueueSize int `yaml:"maxProcessQueueSize"` BackpressureHysteresis float64 `yaml:"backpressureHysteresis"` + // Stale block detection configuration + StaleBlockDetection StaleBlockDetectionConfig `yaml:"staleBlockDetection"` + // Processor configurations TransactionStructlog structlog.Config `yaml:"transactionStructlog"` TransactionSimple simple.Config `yaml:"transactionSimple"` TransactionStructlogAgg structlog_agg.Config `yaml:"transactionStructlogAgg"` } +// StaleBlockDetectionConfig holds configuration for stale block detection. +type StaleBlockDetectionConfig struct { + // Enabled enables stale block detection (default: true) + Enabled bool `yaml:"enabled"` + + // StaleThreshold is the time after which a block is considered stale (default: 5m) + StaleThreshold time.Duration `yaml:"staleThreshold"` + + // CheckInterval is how often to check for stale blocks (default: 1m) + CheckInterval time.Duration `yaml:"checkInterval"` +} + // LeaderElectionConfig holds configuration for leader election. type LeaderElectionConfig struct { // Enable leader election (default: true) @@ -97,6 +112,20 @@ func (c *Config) Validate() error { return fmt.Errorf("leader election renewal interval must be less than TTL") } + // Set stale block detection defaults + // Enable by default unless explicitly disabled + if !c.StaleBlockDetection.Enabled && c.StaleBlockDetection.StaleThreshold == 0 && c.StaleBlockDetection.CheckInterval == 0 { + c.StaleBlockDetection.Enabled = true + } + + if c.StaleBlockDetection.StaleThreshold == 0 { + c.StaleBlockDetection.StaleThreshold = tracker.DefaultStaleThreshold + } + + if c.StaleBlockDetection.CheckInterval == 0 { + c.StaleBlockDetection.CheckInterval = DefaultStaleBlockCheckInterval + } + if c.TransactionStructlog.Enabled { if c.TransactionStructlog.Addr == "" { return fmt.Errorf("transaction structlog addr is required when enabled") diff --git a/pkg/processor/defaults.go b/pkg/processor/defaults.go index 29e4f05..a11308c 100644 --- a/pkg/processor/defaults.go +++ b/pkg/processor/defaults.go @@ -26,6 +26,9 @@ const ( // DefaultLeaderRenewalInterval is the default renewal interval for leader election. DefaultLeaderRenewalInterval = 3 * time.Second + // DefaultStaleBlockCheckInterval is the default interval for checking stale blocks. + DefaultStaleBlockCheckInterval = 1 * time.Minute + // DefaultBackpressureBackoffMin is the minimum backoff duration when backpressure is detected. DefaultBackpressureBackoffMin = 10 * time.Millisecond diff --git a/pkg/processor/manager.go b/pkg/processor/manager.go index 17d15f3..d7b4afb 100644 --- a/pkg/processor/manager.go +++ b/pkg/processor/manager.go @@ -704,9 +704,15 @@ func (m *Manager) runBlockProcessing(ctx context.Context) { queueMonitorTicker := time.NewTicker(30 * time.Second) // Monitor queues every 30s defer queueMonitorTicker.Stop() + // Stale block detection ticker + staleBlockTicker := time.NewTicker(m.config.StaleBlockDetection.CheckInterval) + defer staleBlockTicker.Stop() + m.log.WithFields(logrus.Fields{ - "interval": m.config.Interval, - "processors": len(m.processors), + "interval": m.config.Interval, + "processors": len(m.processors), + "stale_check_interval": m.config.StaleBlockDetection.CheckInterval, + "stale_threshold": m.config.StaleBlockDetection.StaleThreshold, }).Info("Started block processing loop") // Check if we have any processors @@ -741,6 +747,9 @@ func (m *Manager) runBlockProcessing(ctx context.Context) { case <-queueMonitorTicker.C: m.log.Debug("Queue monitoring ticker fired") m.startQueueMonitoring(ctx) + case <-staleBlockTicker.C: + m.log.Debug("Stale block detection ticker fired") + m.checkStaleBlocks(ctx) default: if !m.isLeader { m.log.Warn("No longer leader but block processing still running - stopping") @@ -1216,10 +1225,10 @@ func (m *Manager) enqueueSimpleBlockTask(ctx context.Context, p *transaction_sim var err error if m.config.Mode == tracker.BACKWARDS_MODE { - task, err = transaction_simple.NewProcessBackwardsTask(payload) + task, _, err = transaction_simple.NewProcessBackwardsTask(payload) queue = tracker.PrefixedProcessBackwardsQueue(transaction_simple.ProcessorName, m.redisPrefix) } else { - task, err = transaction_simple.NewProcessForwardsTask(payload) + task, _, err = transaction_simple.NewProcessForwardsTask(payload) queue = tracker.PrefixedProcessForwardsQueue(transaction_simple.ProcessorName, m.redisPrefix) } @@ -1297,3 +1306,69 @@ func (m *Manager) updateBlocksStoredMetrics(ctx context.Context) { } } } + +// checkStaleBlocks checks for stale blocks across all processors and triggers retries. +func (m *Manager) checkStaleBlocks(ctx context.Context) { + if !m.config.StaleBlockDetection.Enabled { + return + } + + for processorName, processor := range m.processors { + // Check for context cancellation between processors + select { + case <-ctx.Done(): + return + default: + } + + // Get the completion tracker based on processor type + var completionTracker *tracker.BlockCompletionTracker + + switch p := processor.(type) { + case *transaction_structlog.Processor: + completionTracker = p.GetCompletionTracker() + case *transaction_simple.Processor: + completionTracker = p.GetCompletionTracker() + case *transaction_structlog_agg.Processor: + completionTracker = p.GetCompletionTracker() + default: + m.log.WithField("processor", processorName).Warn("Unknown processor type for stale block detection") + + continue + } + + if completionTracker == nil { + continue + } + + staleBlocks, err := completionTracker.GetStaleBlocks(ctx, m.network.Name, processorName, m.config.Mode) + if err != nil { + m.log.WithError(err).WithField("processor", processorName).Warn("Failed to get stale blocks") + + continue + } + + for _, blockNum := range staleBlocks { + m.log.WithFields(logrus.Fields{ + "processor": processorName, + "block_number": blockNum, + }).Warn("Stale block detected - clearing for re-processing") + + // Clear the stale block tracking (it will be re-enqueued on next ProcessNextBlock cycle) + if clearErr := completionTracker.ClearBlock(ctx, blockNum, m.network.Name, processorName, m.config.Mode); clearErr != nil { + m.log.WithError(clearErr).WithFields(logrus.Fields{ + "processor": processorName, + "block_number": blockNum, + }).Error("Failed to clear stale block") + } + } + + if len(staleBlocks) > 0 { + m.log.WithFields(logrus.Fields{ + "processor": processorName, + "stale_count": len(staleBlocks), + "stale_blocks": staleBlocks, + }).Info("Cleared stale blocks for re-processing") + } + } +} diff --git a/pkg/processor/tracker/block_tracker.go b/pkg/processor/tracker/block_tracker.go new file mode 100644 index 0000000..735f89c --- /dev/null +++ b/pkg/processor/tracker/block_tracker.go @@ -0,0 +1,392 @@ +package tracker + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + "github.com/redis/go-redis/v9" + "github.com/sirupsen/logrus" +) + +// Default configuration values for BlockCompletionTracker. +const ( + // DefaultBlockMetaTTL is the default TTL for block tracking keys. + DefaultBlockMetaTTL = 30 * time.Minute + + // DefaultStaleThreshold is the default time after which a block is considered stale. + DefaultStaleThreshold = 5 * time.Minute +) + +// BlockCompletionTrackerConfig holds configuration for the BlockCompletionTracker. +type BlockCompletionTrackerConfig struct { + // StaleThreshold is the time after which a block is considered stale. + StaleThreshold time.Duration + + // AutoRetryStale enables automatic retry of stale blocks. + AutoRetryStale bool +} + +// BlockCompletionTracker tracks block completion using Redis SETs for task deduplication. +// This replaces the counter-based PendingTracker with a SET-based approach that: +// - Uses asynq.TaskID() for deterministic task deduplication +// - Tracks completed taskIDs in a Redis SET (idempotent SADD) +// - Stores expected count and enqueued_at metadata +// - Supports stale block detection for auto-retry. +type BlockCompletionTracker struct { + redis *redis.Client + prefix string + log logrus.FieldLogger + config BlockCompletionTrackerConfig + + stateProvider StateProvider // For ClickHouse writes +} + +// NewBlockCompletionTracker creates a new BlockCompletionTracker. +func NewBlockCompletionTracker( + redisClient *redis.Client, + prefix string, + log logrus.FieldLogger, + stateProvider StateProvider, + config BlockCompletionTrackerConfig, +) *BlockCompletionTracker { + // Apply defaults + if config.StaleThreshold == 0 { + config.StaleThreshold = DefaultStaleThreshold + } + + return &BlockCompletionTracker{ + redis: redisClient, + prefix: prefix, + log: log.WithField("component", "block_completion_tracker"), + config: config, + stateProvider: stateProvider, + } +} + +// Redis key patterns: +// - {prefix}:completed:{processor}:{network}:{mode}:{blockNum} -> SET of completed taskIDs +// - {prefix}:expected:{processor}:{network}:{mode}:{blockNum} -> STRING expected count +// - {prefix}:block_meta:{processor}:{network}:{mode}:{blockNum} -> HASH with enqueued_at, queue + +func (t *BlockCompletionTracker) completedKey(network, processor, mode string, blockNum uint64) string { + if t.prefix == "" { + return fmt.Sprintf("completed:%s:%s:%s:%d", processor, network, mode, blockNum) + } + + return fmt.Sprintf("%s:completed:%s:%s:%s:%d", t.prefix, processor, network, mode, blockNum) +} + +func (t *BlockCompletionTracker) expectedKey(network, processor, mode string, blockNum uint64) string { + if t.prefix == "" { + return fmt.Sprintf("expected:%s:%s:%s:%d", processor, network, mode, blockNum) + } + + return fmt.Sprintf("%s:expected:%s:%s:%s:%d", t.prefix, processor, network, mode, blockNum) +} + +func (t *BlockCompletionTracker) metaKey(network, processor, mode string, blockNum uint64) string { + if t.prefix == "" { + return fmt.Sprintf("block_meta:%s:%s:%s:%d", processor, network, mode, blockNum) + } + + return fmt.Sprintf("%s:block_meta:%s:%s:%s:%d", t.prefix, processor, network, mode, blockNum) +} + +// metaKeyPattern returns a pattern for scanning block_meta keys. +func (t *BlockCompletionTracker) metaKeyPattern(network, processor, mode string) string { + if t.prefix == "" { + return fmt.Sprintf("block_meta:%s:%s:%s:*", processor, network, mode) + } + + return fmt.Sprintf("%s:block_meta:%s:%s:%s:*", t.prefix, processor, network, mode) +} + +// RegisterBlock initializes tracking for a new block. +// Clears any existing completion data (safe for retries). +// This should be called AFTER MarkBlockEnqueued to ensure ClickHouse has the record. +func (t *BlockCompletionTracker) RegisterBlock( + ctx context.Context, + blockNum uint64, + expectedCount int, + network, processor, mode, queue string, +) error { + completedKey := t.completedKey(network, processor, mode, blockNum) + expectedKey := t.expectedKey(network, processor, mode, blockNum) + metaKey := t.metaKey(network, processor, mode, blockNum) + + pipe := t.redis.Pipeline() + pipe.Del(ctx, completedKey) // Clear old completions + pipe.Set(ctx, expectedKey, expectedCount, DefaultBlockMetaTTL) // Set expected count + pipe.HSet(ctx, metaKey, map[string]any{ + "enqueued_at": time.Now().Unix(), + "queue": queue, + "expected": expectedCount, + }) + pipe.Expire(ctx, metaKey, DefaultBlockMetaTTL) + + _, err := pipe.Exec(ctx) + if err != nil { + return fmt.Errorf("failed to register block: %w", err) + } + + t.log.WithFields(logrus.Fields{ + "block_number": blockNum, + "expected_count": expectedCount, + "network": network, + "processor": processor, + "mode": mode, + "queue": queue, + }).Debug("Registered block for completion tracking") + + return nil +} + +// TrackTaskCompletion records a task completion and checks if block is done. +// Returns true if all tasks are now complete. +func (t *BlockCompletionTracker) TrackTaskCompletion( + ctx context.Context, + taskID string, + blockNum uint64, + network, processor, mode string, +) (bool, error) { + completedKey := t.completedKey(network, processor, mode, blockNum) + expectedKey := t.expectedKey(network, processor, mode, blockNum) + + // Add to completed set (idempotent - same task completing twice is fine) + // Set TTL to ensure cleanup if block never completes + pipe := t.redis.Pipeline() + pipe.SAdd(ctx, completedKey, taskID) + pipe.Expire(ctx, completedKey, DefaultBlockMetaTTL) + + if _, err := pipe.Exec(ctx); err != nil { + return false, fmt.Errorf("failed to add task to completed set: %w", err) + } + + // Get counts + completedCount, err := t.redis.SCard(ctx, completedKey).Result() + if err != nil { + return false, fmt.Errorf("failed to get completed count: %w", err) + } + + expectedStr, err := t.redis.Get(ctx, expectedKey).Result() + if err == redis.Nil { + // Block not registered - might be old task from before retry, or already cleaned up + t.log.WithFields(logrus.Fields{ + "block_number": blockNum, + "task_id": taskID, + "network": network, + "processor": processor, + "mode": mode, + }).Debug("Block not registered for completion tracking (may be old task or already complete)") + + return false, nil + } + + if err != nil { + return false, fmt.Errorf("failed to get expected count: %w", err) + } + + expected, err := strconv.ParseInt(expectedStr, 10, 64) + if err != nil { + return false, fmt.Errorf("failed to parse expected count: %w", err) + } + + t.log.WithFields(logrus.Fields{ + "block_number": blockNum, + "task_id": taskID, + "completed": completedCount, + "expected": expected, + "network": network, + "processor": processor, + "mode": mode, + }).Debug("Tracked task completion") + + return completedCount >= expected, nil +} + +// MarkBlockComplete writes to ClickHouse and cleans up Redis. +func (t *BlockCompletionTracker) MarkBlockComplete( + ctx context.Context, + blockNum uint64, + network, processor, mode string, +) error { + // Write to ClickHouse + if err := t.stateProvider.MarkBlockComplete(ctx, blockNum, network, processor); err != nil { + return fmt.Errorf("failed to mark block complete in ClickHouse: %w", err) + } + + // Cleanup Redis keys + completedKey := t.completedKey(network, processor, mode, blockNum) + expectedKey := t.expectedKey(network, processor, mode, blockNum) + metaKey := t.metaKey(network, processor, mode, blockNum) + + if err := t.redis.Del(ctx, completedKey, expectedKey, metaKey).Err(); err != nil { + // Log but don't fail - keys will expire anyway + t.log.WithError(err).WithFields(logrus.Fields{ + "block_number": blockNum, + "network": network, + "processor": processor, + "mode": mode, + }).Warn("Failed to cleanup Redis keys after block completion") + } + + t.log.WithFields(logrus.Fields{ + "block_number": blockNum, + "network": network, + "processor": processor, + "mode": mode, + }).Debug("Block marked complete - all tasks finished") + + return nil +} + +// GetStaleBlocks returns blocks that have been processing longer than the stale threshold. +func (t *BlockCompletionTracker) GetStaleBlocks( + ctx context.Context, + network, processor, mode string, +) ([]uint64, error) { + pattern := t.metaKeyPattern(network, processor, mode) + staleBlocks := make([]uint64, 0) + + iter := t.redis.Scan(ctx, 0, pattern, 100).Iterator() + for iter.Next(ctx) { + key := iter.Val() + + enqueuedAtStr, err := t.redis.HGet(ctx, key, "enqueued_at").Result() + if err != nil { + t.log.WithError(err).WithField("key", key).Debug("Failed to get enqueued_at") + + continue + } + + enqueuedAt, err := strconv.ParseInt(enqueuedAtStr, 10, 64) + if err != nil { + t.log.WithError(err).WithField("key", key).Debug("Failed to parse enqueued_at") + + continue + } + + if time.Since(time.Unix(enqueuedAt, 0)) > t.config.StaleThreshold { + // Extract block number from key + blockNum := extractBlockNumFromKey(key) + if blockNum != 0 { + staleBlocks = append(staleBlocks, blockNum) + } + } + } + + if err := iter.Err(); err != nil { + return nil, fmt.Errorf("failed to scan for stale blocks: %w", err) + } + + return staleBlocks, nil +} + +// GetBlockStatus returns the completion status of a block. +func (t *BlockCompletionTracker) GetBlockStatus( + ctx context.Context, + blockNum uint64, + network, processor, mode string, +) (completed int64, expected int64, enqueuedAt time.Time, err error) { + completedKey := t.completedKey(network, processor, mode, blockNum) + expectedKey := t.expectedKey(network, processor, mode, blockNum) + metaKey := t.metaKey(network, processor, mode, blockNum) + + pipe := t.redis.Pipeline() + completedCmd := pipe.SCard(ctx, completedKey) + expectedCmd := pipe.Get(ctx, expectedKey) + enqueuedAtCmd := pipe.HGet(ctx, metaKey, "enqueued_at") + + _, err = pipe.Exec(ctx) + if err != nil && err != redis.Nil { + return 0, 0, time.Time{}, fmt.Errorf("failed to get block status: %w", err) + } + + completed, _ = completedCmd.Result() + + expectedStr, _ := expectedCmd.Result() + if expectedStr != "" { + expected, _ = strconv.ParseInt(expectedStr, 10, 64) + } + + enqueuedAtStr, _ := enqueuedAtCmd.Result() + if enqueuedAtStr != "" { + enqueuedAtUnix, _ := strconv.ParseInt(enqueuedAtStr, 10, 64) + enqueuedAt = time.Unix(enqueuedAtUnix, 0) + } + + return completed, expected, enqueuedAt, nil +} + +// ClearBlock removes all tracking data for a block (used when retrying). +func (t *BlockCompletionTracker) ClearBlock( + ctx context.Context, + blockNum uint64, + network, processor, mode string, +) error { + completedKey := t.completedKey(network, processor, mode, blockNum) + expectedKey := t.expectedKey(network, processor, mode, blockNum) + metaKey := t.metaKey(network, processor, mode, blockNum) + + if err := t.redis.Del(ctx, completedKey, expectedKey, metaKey).Err(); err != nil { + return fmt.Errorf("failed to clear block tracking: %w", err) + } + + t.log.WithFields(logrus.Fields{ + "block_number": blockNum, + "network": network, + "processor": processor, + "mode": mode, + }).Debug("Cleared block tracking data") + + return nil +} + +// extractBlockNumFromKey extracts the block number from a Redis key. +// Key format: block_meta:{prefix}:{processor}:{network}:{mode}:{blockNum} +// or: block_meta:{processor}:{network}:{mode}:{blockNum} +func extractBlockNumFromKey(key string) uint64 { + parts := strings.Split(key, ":") + if len(parts) < 2 { + return 0 + } + + // Block number is always the last part + blockNumStr := parts[len(parts)-1] + + blockNum, err := strconv.ParseUint(blockNumStr, 10, 64) + if err != nil { + return 0 + } + + return blockNum +} + +// HasBlockTracking checks if a block has Redis tracking data. +// Returns true if block_meta key exists (block is being tracked). +// Used to detect orphaned blocks that are in ClickHouse (complete=0) but have no Redis tracking. +func (t *BlockCompletionTracker) HasBlockTracking( + ctx context.Context, + blockNum uint64, + network, processor, mode string, +) (bool, error) { + metaKey := t.metaKey(network, processor, mode, blockNum) + + exists, err := t.redis.Exists(ctx, metaKey).Result() + if err != nil { + return false, fmt.Errorf("failed to check block tracking: %w", err) + } + + return exists > 0, nil +} + +// GenerateTaskID creates a deterministic task ID for deduplication. +// Format: {processor}:{network}:{blockNum}:{identifier} +// For transaction-based processors: identifier = txHash. +// For block-based processors: identifier = "block". +func GenerateTaskID(processor, network string, blockNum uint64, identifier string) string { + return fmt.Sprintf("%s:%s:%d:%s", processor, network, blockNum, identifier) +} diff --git a/pkg/processor/tracker/block_tracker_test.go b/pkg/processor/tracker/block_tracker_test.go new file mode 100644 index 0000000..8fe2d49 --- /dev/null +++ b/pkg/processor/tracker/block_tracker_test.go @@ -0,0 +1,274 @@ +package tracker + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/alicebob/miniredis/v2" + "github.com/redis/go-redis/v9" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// mockStateProviderForTracker implements StateProvider for testing. +type mockStateProviderForTracker struct { + oldestIncomplete *uint64 + newestIncomplete *uint64 + markCompleteErr error +} + +func (m *mockStateProviderForTracker) GetOldestIncompleteBlock( + _ context.Context, _, _ string, _ uint64, +) (*uint64, error) { + return m.oldestIncomplete, nil +} + +func (m *mockStateProviderForTracker) GetNewestIncompleteBlock( + _ context.Context, _, _ string, _ uint64, +) (*uint64, error) { + return m.newestIncomplete, nil +} + +func (m *mockStateProviderForTracker) MarkBlockComplete( + _ context.Context, _ uint64, _, _ string, +) error { + return m.markCompleteErr +} + +func TestBlockCompletionTracker_HasBlockTracking(t *testing.T) { + tests := []struct { + name string + setupRedis func(mr *miniredis.Miniredis, blockNum uint64) + blockNum uint64 + wantTracked bool + }{ + { + name: "returns true when block_meta key exists", + blockNum: 100, + setupRedis: func(mr *miniredis.Miniredis, blockNum uint64) { + mr.HSet( + fmt.Sprintf("block_meta:test_processor:test_network:forwards:%d", blockNum), + "enqueued_at", fmt.Sprintf("%d", time.Now().Unix()), + ) + }, + wantTracked: true, + }, + { + name: "returns false when block_meta key does not exist", + blockNum: 100, + setupRedis: func(_ *miniredis.Miniredis, _ uint64) {}, + wantTracked: false, + }, + { + name: "returns false for different block number", + blockNum: 100, + setupRedis: func(mr *miniredis.Miniredis, _ uint64) { + // Set up tracking for a DIFFERENT block + mr.HSet( + "block_meta:test_processor:test_network:forwards:999", + "enqueued_at", fmt.Sprintf("%d", time.Now().Unix()), + ) + }, + wantTracked: false, + }, + { + name: "returns true for block with prefix", + blockNum: 100, + setupRedis: func(mr *miniredis.Miniredis, blockNum uint64) { + // Note: prefix is empty in this test, so key format doesn't include prefix + mr.HSet( + fmt.Sprintf("block_meta:test_processor:test_network:forwards:%d", blockNum), + "enqueued_at", fmt.Sprintf("%d", time.Now().Unix()), + ) + }, + wantTracked: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{}, + ) + + tt.setupRedis(mr, tt.blockNum) + + got, err := tracker.HasBlockTracking( + context.Background(), tt.blockNum, + "test_network", "test_processor", "forwards", + ) + + require.NoError(t, err) + assert.Equal(t, tt.wantTracked, got) + }) + } +} + +func TestBlockCompletionTracker_HasBlockTracking_WithPrefix(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + prefix := "myapp" + tracker := NewBlockCompletionTracker( + client, prefix, logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{}, + ) + + // Set up tracking with prefix + blockNum := uint64(100) + mr.HSet( + fmt.Sprintf("%s:block_meta:test_processor:test_network:forwards:%d", prefix, blockNum), + "enqueued_at", fmt.Sprintf("%d", time.Now().Unix()), + ) + + got, err := tracker.HasBlockTracking( + context.Background(), blockNum, + "test_network", "test_processor", "forwards", + ) + + require.NoError(t, err) + assert.True(t, got) +} + +func TestBlockCompletionTracker_HasBlockTracking_AfterRegisterBlock(t *testing.T) { + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), &mockStateProviderForTracker{}, + BlockCompletionTrackerConfig{}, + ) + + blockNum := uint64(100) + network := "test_network" + processor := "test_processor" + mode := "forwards" + + // Verify block has no tracking initially + hasTracking, err := tracker.HasBlockTracking(context.Background(), blockNum, network, processor, mode) + require.NoError(t, err) + assert.False(t, hasTracking, "block should not have tracking before RegisterBlock") + + // Register the block + err = tracker.RegisterBlock(context.Background(), blockNum, 5, network, processor, mode, "test_queue") + require.NoError(t, err) + + // Verify block now has tracking + hasTracking, err = tracker.HasBlockTracking(context.Background(), blockNum, network, processor, mode) + require.NoError(t, err) + assert.True(t, hasTracking, "block should have tracking after RegisterBlock") +} + +func TestOrphanedBlockRecoveryFlow(t *testing.T) { + // This test simulates the full flow: + // 1. Block is in ClickHouse (complete=0) - simulated by mockStateProvider + // 2. Redis tracking is missing (orphaned) + // 3. IsBlockedByIncompleteBlocks returns blocked + blocking block + // 4. HasBlockTracking returns false + // 5. RegisterBlock is called (simulating ReprocessBlock) + // 6. Block now has tracking and will complete normally + mr, err := miniredis.Run() + require.NoError(t, err) + + defer mr.Close() + + client := redis.NewClient(&redis.Options{Addr: mr.Addr()}) + defer client.Close() + + orphanedBlockNum := uint64(100) + nextBlockNum := uint64(110) + + mockState := &mockStateProviderForLimiter{ + oldestIncomplete: &orphanedBlockNum, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.New(), + StateProvider: mockState, + Network: "test", + Processor: "test", + }, LimiterConfig{MaxPendingBlockRange: 5}) + + tracker := NewBlockCompletionTracker( + client, "", logrus.New(), mockState, + BlockCompletionTrackerConfig{}, + ) + + // Step 1: Check if blocked + blocked, blockingBlock, err := limiter.IsBlockedByIncompleteBlocks( + context.Background(), nextBlockNum, FORWARDS_MODE, + ) + require.NoError(t, err) + assert.True(t, blocked, "should be blocked by incomplete block") + require.NotNil(t, blockingBlock) + assert.Equal(t, orphanedBlockNum, *blockingBlock) + + // Step 2: Check if blocking block is orphaned + hasTracking, err := tracker.HasBlockTracking( + context.Background(), *blockingBlock, + "test", "test", FORWARDS_MODE, + ) + require.NoError(t, err) + assert.False(t, hasTracking, "blocking block should be orphaned (no Redis tracking)") + + // Step 3: Simulate ReprocessBlock - register the block + err = tracker.RegisterBlock( + context.Background(), *blockingBlock, 3, + "test", "test", FORWARDS_MODE, "test_queue", + ) + require.NoError(t, err) + + // Step 4: Verify block is no longer orphaned + hasTracking, err = tracker.HasBlockTracking( + context.Background(), *blockingBlock, + "test", "test", FORWARDS_MODE, + ) + require.NoError(t, err) + assert.True(t, hasTracking, "block should have tracking after reprocessing") +} + +// mockStateProviderForLimiter implements StateProvider for limiter testing. +type mockStateProviderForLimiter struct { + oldestIncomplete *uint64 + newestIncomplete *uint64 +} + +func (m *mockStateProviderForLimiter) GetOldestIncompleteBlock( + _ context.Context, _, _ string, _ uint64, +) (*uint64, error) { + return m.oldestIncomplete, nil +} + +func (m *mockStateProviderForLimiter) GetNewestIncompleteBlock( + _ context.Context, _, _ string, _ uint64, +) (*uint64, error) { + return m.newestIncomplete, nil +} + +func (m *mockStateProviderForLimiter) MarkBlockComplete( + _ context.Context, _ uint64, _, _ string, +) error { + return nil +} diff --git a/pkg/processor/tracker/completion.go b/pkg/processor/tracker/completion.go deleted file mode 100644 index 598ccbb..0000000 --- a/pkg/processor/tracker/completion.go +++ /dev/null @@ -1,68 +0,0 @@ -package tracker - -import ( - "context" - "errors" - "strings" - - "github.com/sirupsen/logrus" - - "github.com/ethpandaops/execution-processor/pkg/ethereum" -) - -// IsBlockNotFoundError checks if an error indicates a block was not found. -// Uses errors.Is for sentinel errors, with fallback to string matching for wrapped errors. -func IsBlockNotFoundError(err error) bool { - if err == nil { - return false - } - - // Check for sentinel error first - if errors.Is(err, ethereum.ErrBlockNotFound) { - return true - } - - // Fallback to string matching for errors from external clients - errStr := strings.ToLower(err.Error()) - - return strings.Contains(errStr, "not found") || - strings.Contains(errStr, "unknown block") || - strings.Contains(errStr, "block not found") || - strings.Contains(errStr, "header not found") -} - -// TrackBlockCompletion decrements the pending task count and marks the block complete when all tasks finish. -func (l *Limiter) TrackBlockCompletion(ctx context.Context, blockNumber uint64, mode string) { - remaining, err := l.pendingTracker.DecrementPending(ctx, blockNumber, l.network, l.processor, mode) - if err != nil { - l.log.WithError(err).WithFields(logrus.Fields{ - "block_number": blockNumber, - "mode": mode, - }).Warn("Failed to decrement pending count") - - return - } - - // If all tasks are complete, mark the block as complete - if remaining <= 0 { - if err := l.stateProvider.MarkBlockComplete(ctx, blockNumber, l.network, l.processor); err != nil { - l.log.WithError(err).WithFields(logrus.Fields{ - "block_number": blockNumber, - }).Error("Failed to mark block complete") - - return - } - - // Cleanup Redis tracking key - if err := l.pendingTracker.CleanupBlock(ctx, blockNumber, l.network, l.processor, mode); err != nil { - l.log.WithError(err).WithFields(logrus.Fields{ - "block_number": blockNumber, - }).Warn("Failed to cleanup block tracking") - } - - l.log.WithFields(logrus.Fields{ - "block_number": blockNumber, - "mode": mode, - }).Debug("Block marked complete - all tasks finished") - } -} diff --git a/pkg/processor/tracker/errors.go b/pkg/processor/tracker/errors.go new file mode 100644 index 0000000..557b753 --- /dev/null +++ b/pkg/processor/tracker/errors.go @@ -0,0 +1,29 @@ +package tracker + +import ( + "errors" + "strings" + + "github.com/ethpandaops/execution-processor/pkg/ethereum" +) + +// IsBlockNotFoundError checks if an error indicates a block was not found. +// Uses errors.Is for sentinel errors, with fallback to string matching for wrapped errors. +func IsBlockNotFoundError(err error) bool { + if err == nil { + return false + } + + // Check for sentinel error first + if errors.Is(err, ethereum.ErrBlockNotFound) { + return true + } + + // Fallback to string matching for errors from external clients + errStr := strings.ToLower(err.Error()) + + return strings.Contains(errStr, "not found") || + strings.Contains(errStr, "unknown block") || + strings.Contains(errStr, "block not found") || + strings.Contains(errStr, "header not found") +} diff --git a/pkg/processor/tracker/limiter.go b/pkg/processor/tracker/limiter.go index cbfb01d..8ef4466 100644 --- a/pkg/processor/tracker/limiter.go +++ b/pkg/processor/tracker/limiter.go @@ -22,42 +22,44 @@ type LimiterConfig struct { // LimiterDeps holds dependencies for the Limiter. type LimiterDeps struct { - Log logrus.FieldLogger - StateProvider StateProvider - PendingTracker *PendingTracker - Network string - Processor string + Log logrus.FieldLogger + StateProvider StateProvider + Network string + Processor string } // Limiter provides shared blocking and completion functionality for processors. type Limiter struct { - log logrus.FieldLogger - stateProvider StateProvider - pendingTracker *PendingTracker - config LimiterConfig - network string - processor string + log logrus.FieldLogger + stateProvider StateProvider + config LimiterConfig + network string + processor string } // NewLimiter creates a new Limiter. func NewLimiter(deps *LimiterDeps, config LimiterConfig) *Limiter { return &Limiter{ - log: deps.Log, - stateProvider: deps.StateProvider, - pendingTracker: deps.PendingTracker, - config: config, - network: deps.Network, - processor: deps.Processor, + log: deps.Log, + stateProvider: deps.StateProvider, + config: config, + network: deps.Network, + processor: deps.Processor, } } // IsBlockedByIncompleteBlocks checks if processing should be blocked based on distance // from the oldest/newest incomplete block (depending on processing mode). -// Returns true if blocked, false if processing can proceed. -func (l *Limiter) IsBlockedByIncompleteBlocks(ctx context.Context, nextBlock uint64, mode string) (bool, error) { +// Returns: blocked status, blocking block number (if blocked), error. +// The blocking block number can be used to check if the block is orphaned (no Redis tracking). +func (l *Limiter) IsBlockedByIncompleteBlocks( + ctx context.Context, + nextBlock uint64, + mode string, +) (bool, *uint64, error) { // Safe conversion: MaxPendingBlockRange is validated to be > 0 during config validation if l.config.MaxPendingBlockRange <= 0 { - return false, nil + return false, nil, nil } maxPendingBlockRange := uint64(l.config.MaxPendingBlockRange) //nolint:gosec // validated above @@ -71,7 +73,7 @@ func (l *Limiter) IsBlockedByIncompleteBlocks(ctx context.Context, nextBlock uin ctx, l.network, l.processor, searchMaxBlock, ) if err != nil { - return false, err + return false, nil, err } if newestIncomplete != nil && (*newestIncomplete-nextBlock) >= maxPendingBlockRange { @@ -84,7 +86,7 @@ func (l *Limiter) IsBlockedByIncompleteBlocks(ctx context.Context, nextBlock uin common.BlockProcessingSkipped.WithLabelValues(l.network, l.processor, "max_pending_block_range").Inc() - return true, nil + return true, newestIncomplete, nil } } else { // Forwards mode: check distance from oldest incomplete block @@ -98,7 +100,7 @@ func (l *Limiter) IsBlockedByIncompleteBlocks(ctx context.Context, nextBlock uin ctx, l.network, l.processor, searchMinBlock, ) if err != nil { - return false, err + return false, nil, err } if oldestIncomplete != nil && (nextBlock-*oldestIncomplete) >= maxPendingBlockRange { @@ -111,11 +113,11 @@ func (l *Limiter) IsBlockedByIncompleteBlocks(ctx context.Context, nextBlock uin common.BlockProcessingSkipped.WithLabelValues(l.network, l.processor, "max_pending_block_range").Inc() - return true, nil + return true, oldestIncomplete, nil } } - return false, nil + return false, nil, nil } // GetAvailableCapacity returns how many more blocks can be enqueued before hitting diff --git a/pkg/processor/tracker/limiter_test.go b/pkg/processor/tracker/limiter_test.go index 299e5b7..fa61bd8 100644 --- a/pkg/processor/tracker/limiter_test.go +++ b/pkg/processor/tracker/limiter_test.go @@ -1,9 +1,11 @@ package tracker import ( + "context" "errors" "testing" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/ethpandaops/execution-processor/pkg/ethereum" @@ -359,3 +361,125 @@ type testError struct { func (e *testError) Error() string { return e.msg } + +// mockStateProvider implements StateProvider for testing IsBlockedByIncompleteBlocks. +type mockStateProvider struct { + oldestIncomplete *uint64 + newestIncomplete *uint64 +} + +func (m *mockStateProvider) GetOldestIncompleteBlock( + _ context.Context, _, _ string, _ uint64, +) (*uint64, error) { + return m.oldestIncomplete, nil +} + +func (m *mockStateProvider) GetNewestIncompleteBlock( + _ context.Context, _, _ string, _ uint64, +) (*uint64, error) { + return m.newestIncomplete, nil +} + +func (m *mockStateProvider) MarkBlockComplete( + _ context.Context, _ uint64, _, _ string, +) error { + return nil +} + +func TestLimiter_IsBlockedByIncompleteBlocks_ReturnsBlockingBlock(t *testing.T) { + tests := []struct { + name string + mode string + nextBlock uint64 + maxPendingBlockRange int + oldestIncomplete *uint64 + newestIncomplete *uint64 + wantBlocked bool + wantBlockingBlock *uint64 + }{ + { + name: "forwards mode - returns oldest incomplete when blocked", + mode: FORWARDS_MODE, + nextBlock: 110, + maxPendingBlockRange: 5, + oldestIncomplete: uint64Ptr(100), // distance = 10 >= 5 + wantBlocked: true, + wantBlockingBlock: uint64Ptr(100), + }, + { + name: "forwards mode - returns nil when not blocked", + mode: FORWARDS_MODE, + nextBlock: 103, + maxPendingBlockRange: 5, + oldestIncomplete: uint64Ptr(100), // distance = 3 < 5 + wantBlocked: false, + wantBlockingBlock: nil, + }, + { + name: "backwards mode - returns newest incomplete when blocked", + mode: BACKWARDS_MODE, + nextBlock: 100, + maxPendingBlockRange: 5, + newestIncomplete: uint64Ptr(110), // distance = 10 >= 5 + wantBlocked: true, + wantBlockingBlock: uint64Ptr(110), + }, + { + name: "backwards mode - returns nil when not blocked", + mode: BACKWARDS_MODE, + nextBlock: 100, + maxPendingBlockRange: 5, + newestIncomplete: uint64Ptr(103), // distance = 3 < 5 + wantBlocked: false, + wantBlockingBlock: nil, + }, + { + name: "no incomplete blocks - returns nil", + mode: FORWARDS_MODE, + nextBlock: 100, + maxPendingBlockRange: 5, + oldestIncomplete: nil, + wantBlocked: false, + wantBlockingBlock: nil, + }, + { + name: "zero maxPendingBlockRange - returns not blocked", + mode: FORWARDS_MODE, + nextBlock: 110, + maxPendingBlockRange: 0, + oldestIncomplete: uint64Ptr(100), + wantBlocked: false, + wantBlockingBlock: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockState := &mockStateProvider{ + oldestIncomplete: tt.oldestIncomplete, + newestIncomplete: tt.newestIncomplete, + } + + limiter := NewLimiter(&LimiterDeps{ + Log: logrus.New(), + StateProvider: mockState, + Network: "test", + Processor: "test", + }, LimiterConfig{MaxPendingBlockRange: tt.maxPendingBlockRange}) + + blocked, blockingBlock, err := limiter.IsBlockedByIncompleteBlocks( + context.Background(), tt.nextBlock, tt.mode, + ) + + assert.NoError(t, err) + assert.Equal(t, tt.wantBlocked, blocked) + + if tt.wantBlockingBlock == nil { + assert.Nil(t, blockingBlock) + } else { + assert.NotNil(t, blockingBlock) + assert.Equal(t, *tt.wantBlockingBlock, *blockingBlock) + } + }) + } +} diff --git a/pkg/processor/tracker/pending.go b/pkg/processor/tracker/pending.go deleted file mode 100644 index 48a7afe..0000000 --- a/pkg/processor/tracker/pending.go +++ /dev/null @@ -1,215 +0,0 @@ -package tracker - -import ( - "context" - "fmt" - "strconv" - "time" - - "github.com/redis/go-redis/v9" - "github.com/sirupsen/logrus" -) - -// PendingTracker tracks pending tasks per block using Redis. -// Used for two-phase completion tracking to know when all tasks for a block are complete. -type PendingTracker struct { - redis *redis.Client - prefix string - log logrus.FieldLogger -} - -// NewPendingTracker creates a new PendingTracker. -func NewPendingTracker(redisClient *redis.Client, prefix string, log logrus.FieldLogger) *PendingTracker { - return &PendingTracker{ - redis: redisClient, - prefix: prefix, - log: log.WithField("component", "pending_tracker"), - } -} - -// blockKey returns the Redis key for tracking a block's pending task count. -// Key pattern: {prefix}:block:{network}:{processor}:{mode}:{block_number}. -func (t *PendingTracker) blockKey(blockNumber uint64, network, processor, mode string) string { - if t.prefix == "" { - return fmt.Sprintf("block:%s:%s:%s:%d", network, processor, mode, blockNumber) - } - - return fmt.Sprintf("%s:block:%s:%s:%s:%d", t.prefix, network, processor, mode, blockNumber) -} - -// ErrBlockAlreadyBeingProcessed is returned when attempting to initialize a block that is already being processed. -var ErrBlockAlreadyBeingProcessed = fmt.Errorf("block is already being processed") - -// InitBlock initializes tracking for a block with the given task count. -// Uses SetNX to ensure only one processor can claim a block at a time. -// Returns ErrBlockAlreadyBeingProcessed if the block is already being tracked. -// This should be called BEFORE MarkBlockEnqueued to prevent race conditions. -func (t *PendingTracker) InitBlock(ctx context.Context, blockNumber uint64, taskCount int, network, processor, mode string) error { - key := t.blockKey(blockNumber, network, processor, mode) - - // Use SetNX to atomically check-and-set - only succeeds if key doesn't exist - // TTL of 30 minutes prevents orphaned keys if processor crashes - wasSet, err := t.redis.SetNX(ctx, key, taskCount, 30*time.Minute).Result() - if err != nil { - return fmt.Errorf("failed to init block tracking: %w", err) - } - - if !wasSet { - t.log.WithFields(logrus.Fields{ - "block_number": blockNumber, - "network": network, - "processor": processor, - "mode": mode, - "key": key, - }).Debug("Block already being processed by another worker") - - return ErrBlockAlreadyBeingProcessed - } - - t.log.WithFields(logrus.Fields{ - "block_number": blockNumber, - "task_count": taskCount, - "network": network, - "processor": processor, - "mode": mode, - "key": key, - }).Debug("Initialized block tracking") - - return nil -} - -// DecrementPending decrements the pending task count for a block. -// Returns the remaining count after decrement. -func (t *PendingTracker) DecrementPending(ctx context.Context, blockNumber uint64, network, processor, mode string) (int64, error) { - key := t.blockKey(blockNumber, network, processor, mode) - - remaining, err := t.redis.DecrBy(ctx, key, 1).Result() - if err != nil { - return 0, fmt.Errorf("failed to decrement pending count: %w", err) - } - - t.log.WithFields(logrus.Fields{ - "block_number": blockNumber, - "remaining": remaining, - "network": network, - "processor": processor, - "mode": mode, - }).Trace("Decremented pending task count") - - return remaining, nil -} - -// GetPendingCount returns the current pending task count for a block. -// Returns 0 if the key doesn't exist (block not being tracked or already cleaned up). -func (t *PendingTracker) GetPendingCount(ctx context.Context, blockNumber uint64, network, processor, mode string) (int64, error) { - key := t.blockKey(blockNumber, network, processor, mode) - - val, err := t.redis.Get(ctx, key).Result() - if err == redis.Nil { - return 0, nil - } - - if err != nil { - return 0, fmt.Errorf("failed to get pending count: %w", err) - } - - count, err := strconv.ParseInt(val, 10, 64) - if err != nil { - return 0, fmt.Errorf("failed to parse pending count: %w", err) - } - - return count, nil -} - -// CleanupBlock removes the tracking key for a block. -// Should be called after a block is marked complete. -func (t *PendingTracker) CleanupBlock(ctx context.Context, blockNumber uint64, network, processor, mode string) error { - key := t.blockKey(blockNumber, network, processor, mode) - - err := t.redis.Del(ctx, key).Err() - if err != nil { - return fmt.Errorf("failed to cleanup block tracking: %w", err) - } - - t.log.WithFields(logrus.Fields{ - "block_number": blockNumber, - "network": network, - "processor": processor, - "mode": mode, - }).Debug("Cleaned up block tracking") - - return nil -} - -// BlockInit contains the information needed to initialize a block for tracking. -type BlockInit struct { - Number uint64 - TaskCount int -} - -// InitBlocks initializes tracking for multiple blocks atomically via Redis pipeline. -// Uses SetNX to ensure only one processor can claim each block at a time. -// Returns the block numbers that were successfully initialized (those not already being processed). -func (t *PendingTracker) InitBlocks( - ctx context.Context, - blocks []BlockInit, - network, processor, mode string, -) ([]uint64, error) { - if len(blocks) == 0 { - return []uint64{}, nil - } - - // Use pipeline for atomic batch operation - pipe := t.redis.Pipeline() - cmds := make([]*redis.BoolCmd, len(blocks)) - - for i, block := range blocks { - key := t.blockKey(block.Number, network, processor, mode) - // SetNX with 30 minute TTL to prevent orphaned keys - cmds[i] = pipe.SetNX(ctx, key, block.TaskCount, 30*time.Minute) - } - - // Execute pipeline - _, err := pipe.Exec(ctx) - if err != nil && err != redis.Nil { - return nil, fmt.Errorf("failed to execute init blocks pipeline: %w", err) - } - - // Collect successfully initialized block numbers - initialized := make([]uint64, 0, len(blocks)) - - for i, cmd := range cmds { - wasSet, cmdErr := cmd.Result() - if cmdErr != nil && cmdErr != redis.Nil { - t.log.WithError(cmdErr).WithFields(logrus.Fields{ - "block_number": blocks[i].Number, - "network": network, - "processor": processor, - "mode": mode, - }).Warn("Failed to check SetNX result for block") - - continue - } - - if wasSet { - initialized = append(initialized, blocks[i].Number) - - t.log.WithFields(logrus.Fields{ - "block_number": blocks[i].Number, - "task_count": blocks[i].TaskCount, - "network": network, - "processor": processor, - "mode": mode, - }).Debug("Initialized block tracking") - } else { - t.log.WithFields(logrus.Fields{ - "block_number": blocks[i].Number, - "network": network, - "processor": processor, - "mode": mode, - }).Debug("Block already being processed by another worker") - } - } - - return initialized, nil -} diff --git a/pkg/processor/tracker/pending_test.go b/pkg/processor/tracker/pending_test.go deleted file mode 100644 index 4c81399..0000000 --- a/pkg/processor/tracker/pending_test.go +++ /dev/null @@ -1,228 +0,0 @@ -package tracker - -import ( - "context" - "testing" - - "github.com/alicebob/miniredis/v2" - "github.com/redis/go-redis/v9" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -func setupTestRedis(t *testing.T) (*redis.Client, func()) { - t.Helper() - - mr, err := miniredis.Run() - require.NoError(t, err) - - client := redis.NewClient(&redis.Options{ - Addr: mr.Addr(), - }) - - return client, func() { - client.Close() - mr.Close() - } -} - -func TestPendingTracker_InitBlock(t *testing.T) { - ctx := context.Background() - - redisClient, cleanup := setupTestRedis(t) - defer cleanup() - - log := logrus.New() - tracker := NewPendingTracker(redisClient, "test", log) - - err := tracker.InitBlock(ctx, 100, 5, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - - // Verify the value was set - count, err := tracker.GetPendingCount(ctx, 100, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(5), count) -} - -func TestPendingTracker_DecrementPending(t *testing.T) { - ctx := context.Background() - - redisClient, cleanup := setupTestRedis(t) - defer cleanup() - - log := logrus.New() - tracker := NewPendingTracker(redisClient, "test", log) - - // Initialize with 3 tasks - err := tracker.InitBlock(ctx, 100, 3, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - - // Decrement once - remaining, err := tracker.DecrementPending(ctx, 100, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(2), remaining) - - // Decrement again - remaining, err = tracker.DecrementPending(ctx, 100, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(1), remaining) - - // Decrement to zero - remaining, err = tracker.DecrementPending(ctx, 100, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(0), remaining) -} - -func TestPendingTracker_DecrementToZero(t *testing.T) { - ctx := context.Background() - - redisClient, cleanup := setupTestRedis(t) - defer cleanup() - - log := logrus.New() - tracker := NewPendingTracker(redisClient, "test", log) - - // Initialize with 1 task - err := tracker.InitBlock(ctx, 100, 1, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - - // Decrement to zero - remaining, err := tracker.DecrementPending(ctx, 100, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(0), remaining) -} - -func TestPendingTracker_GetPendingCount(t *testing.T) { - ctx := context.Background() - - redisClient, cleanup := setupTestRedis(t) - defer cleanup() - - log := logrus.New() - tracker := NewPendingTracker(redisClient, "test", log) - - // Key doesn't exist - should return 0 - count, err := tracker.GetPendingCount(ctx, 999, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(0), count) - - // Initialize and check - err = tracker.InitBlock(ctx, 100, 10, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - - count, err = tracker.GetPendingCount(ctx, 100, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(10), count) -} - -func TestPendingTracker_CleanupBlock(t *testing.T) { - ctx := context.Background() - - redisClient, cleanup := setupTestRedis(t) - defer cleanup() - - log := logrus.New() - tracker := NewPendingTracker(redisClient, "test", log) - - // Initialize - err := tracker.InitBlock(ctx, 100, 5, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - - // Verify it exists - count, err := tracker.GetPendingCount(ctx, 100, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(5), count) - - // Cleanup - err = tracker.CleanupBlock(ctx, 100, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - - // Verify it's gone - count, err = tracker.GetPendingCount(ctx, 100, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(0), count) -} - -func TestPendingTracker_KeyPattern(t *testing.T) { - log := logrus.New() - - tests := []struct { - name string - prefix string - blockNumber uint64 - network string - processor string - mode string - expectedKey string - }{ - { - name: "with prefix", - prefix: "myapp", - blockNumber: 12345, - network: "mainnet", - processor: "test_processor", - mode: "forwards", - expectedKey: "myapp:block:mainnet:test_processor:forwards:12345", - }, - { - name: "without prefix", - prefix: "", - blockNumber: 12345, - network: "mainnet", - processor: "test_processor", - mode: "backwards", - expectedKey: "block:mainnet:test_processor:backwards:12345", - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tracker := NewPendingTracker(nil, tt.prefix, log) - key := tracker.blockKey(tt.blockNumber, tt.network, tt.processor, tt.mode) - assert.Equal(t, tt.expectedKey, key) - }) - } -} - -func TestPendingTracker_MultipleBlocks(t *testing.T) { - ctx := context.Background() - - redisClient, cleanup := setupTestRedis(t) - defer cleanup() - - log := logrus.New() - tracker := NewPendingTracker(redisClient, "test", log) - - // Initialize multiple blocks - require.NoError(t, tracker.InitBlock(ctx, 100, 5, "mainnet", "test_processor", "forwards")) - require.NoError(t, tracker.InitBlock(ctx, 101, 3, "mainnet", "test_processor", "forwards")) - require.NoError(t, tracker.InitBlock(ctx, 102, 7, "mainnet", "test_processor", "forwards")) - - // Verify all blocks have correct counts - count, err := tracker.GetPendingCount(ctx, 100, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(5), count) - - count, err = tracker.GetPendingCount(ctx, 101, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(3), count) - - count, err = tracker.GetPendingCount(ctx, 102, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(7), count) - - // Decrement one block - remaining, err := tracker.DecrementPending(ctx, 101, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(2), remaining) - - // Other blocks unchanged - count, err = tracker.GetPendingCount(ctx, 100, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(5), count) - - count, err = tracker.GetPendingCount(ctx, 102, "mainnet", "test_processor", "forwards") - require.NoError(t, err) - assert.Equal(t, int64(7), count) -} diff --git a/pkg/processor/tracker/processor.go b/pkg/processor/tracker/processor.go index b199c51..da183ce 100644 --- a/pkg/processor/tracker/processor.go +++ b/pkg/processor/tracker/processor.go @@ -116,6 +116,11 @@ type BlockProcessor interface { // SetProcessingMode configures forwards or backwards processing. SetProcessingMode(mode string) + + // ReprocessBlock re-enqueues tasks for an orphaned block. + // Used when a block is in ClickHouse (complete=0) but has no Redis tracking. + // This can happen due to Redis TTL expiry, Redis restart, or crashes. + ReprocessBlock(ctx context.Context, blockNum uint64) error } // QueueInfo contains information about a processor queue. diff --git a/pkg/processor/transaction/simple/block_processing.go b/pkg/processor/transaction/simple/block_processing.go index b406c3d..c30c2d7 100644 --- a/pkg/processor/transaction/simple/block_processing.go +++ b/pkg/processor/transaction/simple/block_processing.go @@ -15,6 +15,9 @@ import ( "github.com/ethpandaops/execution-processor/pkg/state" ) +// ErrTaskIDConflict is returned when a task with the same ID already exists. +var ErrTaskIDConflict = asynq.ErrTaskIDConflict + // ProcessNextBlock processes the next available block(s). // In zero-interval mode, this attempts to fetch and process multiple blocks // up to the available capacity for improved throughput. @@ -52,6 +55,33 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { return nil } + // Distance-based pending block range check with orphan detection + blocked, blockingBlock, err := p.IsBlockedByIncompleteBlocks(ctx, nextBlock.Uint64(), p.processingMode) + if err != nil { + p.log.WithError(err).Warn("Failed to check incomplete blocks distance, proceeding anyway") + } else if blocked { + // Check if the blocking block is orphaned (no Redis tracking) + if blockingBlock != nil { + hasTracking, trackErr := p.completionTracker.HasBlockTracking( + ctx, *blockingBlock, p.network.Name, p.Name(), p.processingMode) + if trackErr != nil { + p.log.WithError(trackErr).Warn("Failed to check block tracking") + } else if !hasTracking { + // Orphaned block - reprocess it + p.log.WithFields(logrus.Fields{ + "blocking_block": *blockingBlock, + "next_block": nextBlock, + }).Warn("Detected orphaned block blocking progress, reprocessing") + + if reprocessErr := p.ReprocessBlock(ctx, *blockingBlock); reprocessErr != nil { + p.log.WithError(reprocessErr).Error("Failed to reprocess orphaned block") + } + } + } + + return nil + } + // Get available capacity for batch processing capacity, err := p.GetAvailableCapacity(ctx, nextBlock.Uint64(), p.processingMode) if err != nil { @@ -161,7 +191,33 @@ func (p *Processor) processBlock(ctx context.Context, block execution.Block) err return p.stateManager.MarkBlockComplete(ctx, blockNumber.Uint64(), p.network.Name, p.Name()) } - // Enqueue block processing task + // 1. Mark block as enqueued in ClickHouse FIRST (complete=0) + // This records that we started processing this block (1 task per block for simple processor) + if markErr := p.stateManager.MarkBlockEnqueued(ctx, blockNumber.Uint64(), 1, p.network.Name, p.Name()); markErr != nil { + p.log.WithError(markErr).WithFields(logrus.Fields{ + "network": p.network.Name, + "block_number": blockNumber, + }).Error("could not mark block as enqueued") + + return markErr + } + + // 2. Register block for completion tracking in Redis (clears any old state) + queue := p.getProcessForwardsQueue() + if p.processingMode == tracker.BACKWARDS_MODE { + queue = p.getProcessBackwardsQueue() + } + + if regErr := p.completionTracker.RegisterBlock(ctx, blockNumber.Uint64(), 1, p.network.Name, p.Name(), p.processingMode, queue); regErr != nil { + p.log.WithError(regErr).WithFields(logrus.Fields{ + "network": p.network.Name, + "block_number": blockNumber, + }).Error("could not register block for completion tracking") + + return regErr + } + + // 3. Create and enqueue block processing task with TaskID deduplication payload := &ProcessPayload{ BlockNumber: *blockNumber, NetworkName: p.network.Name, @@ -170,50 +226,128 @@ func (p *Processor) processBlock(ctx context.Context, block execution.Block) err var task *asynq.Task - var queue string + var taskID string if p.processingMode == tracker.BACKWARDS_MODE { - task, err = NewProcessBackwardsTask(payload) - queue = p.getProcessBackwardsQueue() + task, taskID, err = NewProcessBackwardsTask(payload) } else { - task, err = NewProcessForwardsTask(payload) - queue = p.getProcessForwardsQueue() + task, taskID, err = NewProcessForwardsTask(payload) } if err != nil { return fmt.Errorf("failed to create task: %w", err) } - if err := p.EnqueueTask(ctx, task, asynq.Queue(queue)); err != nil { + // Enqueue with TaskID for deduplication + err = p.EnqueueTask(ctx, task, + asynq.Queue(queue), + asynq.TaskID(taskID), + ) + + if errors.Is(err, ErrTaskIDConflict) { + // Task already exists - this is fine, it will still complete and be tracked + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "block_number": blockNumber.Uint64(), + }).Debug("Task already exists (TaskID conflict), skipping") + } else if err != nil { return fmt.Errorf("failed to enqueue task: %w", err) + } else { + common.TasksEnqueued.WithLabelValues(p.network.Name, ProcessorName, queue, task.Type()).Inc() } - common.TasksEnqueued.WithLabelValues(p.network.Name, ProcessorName, queue, task.Type()).Inc() + p.log.WithFields(logrus.Fields{ + "block_number": blockNumber.Uint64(), + "tx_count": len(block.Transactions()), + }).Info("Enqueued block for processing") + + return nil +} - // Initialize block tracking in Redis (1 task per block for simple processor) - if err := p.pendingTracker.InitBlock(ctx, blockNumber.Uint64(), 1, p.network.Name, p.Name(), p.processingMode); err != nil { - p.log.WithError(err).WithFields(logrus.Fields{ - "network": p.network.Name, - "block_number": blockNumber, - }).Error("could not init block tracking in Redis") +// ReprocessBlock re-enqueues tasks for an orphaned block. +// Used when a block is in ClickHouse (complete=0) but has no Redis tracking. +// TaskID deduplication ensures no duplicate tasks are created. +func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { + p.log.WithFields(logrus.Fields{ + "block_number": blockNum, + "network": p.network.Name, + }).Info("Reprocessing orphaned block") - return err + // Get execution node + node := p.pool.GetHealthyExecutionNode() + if node == nil { + return fmt.Errorf("no healthy execution node available") } - // Mark block as enqueued (phase 1 of two-phase completion) - if err := p.stateManager.MarkBlockEnqueued(ctx, blockNumber.Uint64(), 1, p.network.Name, p.Name()); err != nil { - p.log.WithError(err).WithFields(logrus.Fields{ - "network": p.network.Name, - "block_number": blockNumber, - }).Error("could not mark block as enqueued") + // Fetch block + block, err := node.BlockByNumber(ctx, new(big.Int).SetUint64(blockNum)) + if err != nil { + return fmt.Errorf("failed to fetch block %d: %w", blockNum, err) + } - return err + // Handle empty blocks - mark complete immediately + if len(block.Transactions()) == 0 { + p.log.WithField("block", blockNum).Debug("Empty orphaned block, marking as complete") + + return p.stateManager.MarkBlockComplete(ctx, blockNum, p.network.Name, p.Name()) + } + + // Determine queue based on processing mode + queue := p.getProcessForwardsQueue() + if p.processingMode == tracker.BACKWARDS_MODE { + queue = p.getProcessBackwardsQueue() + } + + // Register in Redis (clears any partial state) + if regErr := p.completionTracker.RegisterBlock( + ctx, blockNum, 1, p.network.Name, p.Name(), p.processingMode, queue, + ); regErr != nil { + return fmt.Errorf("failed to register block: %w", regErr) + } + + // Create and enqueue block processing task with TaskID deduplication + payload := &ProcessPayload{ + BlockNumber: *new(big.Int).SetUint64(blockNum), + NetworkName: p.network.Name, + ProcessingMode: p.processingMode, + } + + var task *asynq.Task + + var taskID string + + if p.processingMode == tracker.BACKWARDS_MODE { + task, taskID, err = NewProcessBackwardsTask(payload) + } else { + task, taskID, err = NewProcessForwardsTask(payload) + } + + if err != nil { + return fmt.Errorf("failed to create task: %w", err) + } + + // Enqueue with TaskID for deduplication + err = p.EnqueueTask(ctx, task, + asynq.Queue(queue), + asynq.TaskID(taskID), + ) + + if errors.Is(err, ErrTaskIDConflict) { + // Task already exists - this is fine, it will still complete and be tracked + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "block_number": blockNum, + }).Debug("Task already exists (TaskID conflict) during reprocess") + } else if err != nil { + return fmt.Errorf("failed to enqueue task: %w", err) + } else { + common.TasksEnqueued.WithLabelValues(p.network.Name, ProcessorName, queue, task.Type()).Inc() } p.log.WithFields(logrus.Fields{ - "block_number": blockNumber.Uint64(), + "block_number": blockNum, "tx_count": len(block.Transactions()), - }).Info("Enqueued block for processing") + }).Info("Reprocessed orphaned block") return nil } diff --git a/pkg/processor/transaction/simple/handlers.go b/pkg/processor/transaction/simple/handlers.go index 0e21b7b..84c8131 100644 --- a/pkg/processor/transaction/simple/handlers.go +++ b/pkg/processor/transaction/simple/handlers.go @@ -166,8 +166,25 @@ func (p *Processor) handleProcessTask(ctx context.Context, task *asynq.Task) err "success", ).Inc() - // Track block completion using embedded Blocker - p.TrackBlockCompletion(ctx, blockNumber.Uint64(), payload.ProcessingMode) + // Track task completion using SET-based tracker + taskID := GenerateTaskID(p.network.Name, blockNumber.Uint64()) + + allComplete, trackErr := p.completionTracker.TrackTaskCompletion(ctx, taskID, blockNumber.Uint64(), p.network.Name, p.Name(), payload.ProcessingMode) + if trackErr != nil { + p.log.WithError(trackErr).WithFields(logrus.Fields{ + "block_number": blockNumber.Uint64(), + "task_id": taskID, + }).Warn("Failed to track task completion") + // Non-fatal - stale detection will catch it + } + + if allComplete { + if markErr := p.completionTracker.MarkBlockComplete(ctx, blockNumber.Uint64(), p.network.Name, p.Name(), payload.ProcessingMode); markErr != nil { + p.log.WithError(markErr).WithFields(logrus.Fields{ + "block_number": blockNumber.Uint64(), + }).Error("Failed to mark block complete") + } + } p.log.WithFields(logrus.Fields{ "block_number": blockNumber.Uint64(), diff --git a/pkg/processor/transaction/simple/processor.go b/pkg/processor/transaction/simple/processor.go index 145279d..95701b7 100644 --- a/pkg/processor/transaction/simple/processor.go +++ b/pkg/processor/transaction/simple/processor.go @@ -44,13 +44,15 @@ type Processor struct { asynqClient *asynq.Client processingMode string redisPrefix string - pendingTracker *tracker.PendingTracker // Row buffer for batched ClickHouse inserts rowBuffer *rowbuffer.Buffer[Transaction] // Embedded limiter for shared blocking/completion logic *tracker.Limiter + + // Block completion tracker for SET-based task deduplication + completionTracker *tracker.BlockCompletionTracker } // New creates a new simple transaction processor. @@ -84,34 +86,44 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { } log := deps.Log.WithField("processor", ProcessorName) - pendingTracker := tracker.NewPendingTracker(deps.RedisClient, deps.RedisPrefix, log) // Create the limiter for shared functionality limiter := tracker.NewLimiter( &tracker.LimiterDeps{ - Log: log, - StateProvider: deps.State, - PendingTracker: pendingTracker, - Network: deps.Network.Name, - Processor: ProcessorName, + Log: log, + StateProvider: deps.State, + Network: deps.Network.Name, + Processor: ProcessorName, }, tracker.LimiterConfig{ MaxPendingBlockRange: config.MaxPendingBlockRange, }, ) + // Create the block completion tracker for SET-based task deduplication + completionTracker := tracker.NewBlockCompletionTracker( + deps.RedisClient, + deps.RedisPrefix, + log, + deps.State, + tracker.BlockCompletionTrackerConfig{ + StaleThreshold: tracker.DefaultStaleThreshold, + AutoRetryStale: true, + }, + ) + processor := &Processor{ - log: log, - pool: deps.Pool, - stateManager: deps.State, - clickhouse: clickhouseClient, - config: config, - network: deps.Network, - asynqClient: deps.AsynqClient, - processingMode: tracker.FORWARDS_MODE, // Default mode - redisPrefix: deps.RedisPrefix, - pendingTracker: pendingTracker, - Limiter: limiter, + log: log, + pool: deps.Pool, + stateManager: deps.State, + clickhouse: clickhouseClient, + config: config, + network: deps.Network, + asynqClient: deps.AsynqClient, + processingMode: tracker.FORWARDS_MODE, // Default mode + redisPrefix: deps.RedisPrefix, + Limiter: limiter, + completionTracker: completionTracker, } // Create the row buffer with the flush function @@ -180,6 +192,11 @@ func (p *Processor) SetProcessingMode(mode string) { p.log.WithField("mode", mode).Info("Processing mode updated") } +// GetCompletionTracker returns the block completion tracker. +func (p *Processor) GetCompletionTracker() *tracker.BlockCompletionTracker { + return p.completionTracker +} + // EnqueueTask enqueues a task to the specified queue with infinite retries. func (p *Processor) EnqueueTask(ctx context.Context, task *asynq.Task, opts ...asynq.Option) error { opts = append(opts, asynq.MaxRetry(math.MaxInt32)) diff --git a/pkg/processor/transaction/simple/processor_test.go b/pkg/processor/transaction/simple/processor_test.go index a454c22..7a6e7e9 100644 --- a/pkg/processor/transaction/simple/processor_test.go +++ b/pkg/processor/transaction/simple/processor_test.go @@ -132,11 +132,15 @@ func TestNewProcessForwardsTask(t *testing.T) { NetworkName: "mainnet", } - task, err := transaction_simple.NewProcessForwardsTask(payload) + task, taskID, err := transaction_simple.NewProcessForwardsTask(payload) assert.NoError(t, err) assert.NotNil(t, task) assert.Equal(t, transaction_simple.ProcessForwardsTaskType, task.Type()) + // Verify taskID is generated correctly + expectedTaskID := transaction_simple.GenerateTaskID(payload.NetworkName, payload.BlockNumber.Uint64()) + assert.Equal(t, expectedTaskID, taskID) + // Verify payload can be unmarshaled from task var unmarshaled transaction_simple.ProcessPayload @@ -153,11 +157,15 @@ func TestNewProcessBackwardsTask(t *testing.T) { NetworkName: "mainnet", } - task, err := transaction_simple.NewProcessBackwardsTask(payload) + task, taskID, err := transaction_simple.NewProcessBackwardsTask(payload) assert.NoError(t, err) assert.NotNil(t, task) assert.Equal(t, transaction_simple.ProcessBackwardsTaskType, task.Type()) + // Verify taskID is generated correctly + expectedTaskID := transaction_simple.GenerateTaskID(payload.NetworkName, payload.BlockNumber.Uint64()) + assert.Equal(t, expectedTaskID, taskID) + // Verify payload can be unmarshaled from task var unmarshaled transaction_simple.ProcessPayload diff --git a/pkg/processor/transaction/simple/tasks.go b/pkg/processor/transaction/simple/tasks.go index 12eac0d..d7df3f6 100644 --- a/pkg/processor/transaction/simple/tasks.go +++ b/pkg/processor/transaction/simple/tasks.go @@ -2,6 +2,7 @@ package simple import ( "encoding/json" + "fmt" "math/big" "github.com/hibiken/asynq" @@ -35,26 +36,39 @@ func (p *ProcessPayload) UnmarshalBinary(data []byte) error { return json.Unmarshal(data, p) } +// GenerateTaskID creates a deterministic task ID for deduplication. +// Format: {processor}:{network}:{blockNum}:block. +// For block-based processors, we use "block" as the identifier since there's one task per block. +func GenerateTaskID(network string, blockNumber uint64) string { + return fmt.Sprintf("%s:%s:%d:block", ProcessorName, network, blockNumber) +} + // NewProcessForwardsTask creates a new forwards process task. -func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error) { +// Returns the task, taskID for deduplication, and any error. +func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, string, error) { payload.ProcessingMode = tracker.FORWARDS_MODE data, err := payload.MarshalBinary() if err != nil { - return nil, err + return nil, "", err } - return asynq.NewTask(ProcessForwardsTaskType, data), nil + taskID := GenerateTaskID(payload.NetworkName, payload.BlockNumber.Uint64()) + + return asynq.NewTask(ProcessForwardsTaskType, data), taskID, nil } // NewProcessBackwardsTask creates a new backwards process task. -func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error) { +// Returns the task, taskID for deduplication, and any error. +func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, string, error) { payload.ProcessingMode = tracker.BACKWARDS_MODE data, err := payload.MarshalBinary() if err != nil { - return nil, err + return nil, "", err } - return asynq.NewTask(ProcessBackwardsTaskType, data), nil + taskID := GenerateTaskID(payload.NetworkName, payload.BlockNumber.Uint64()) + + return asynq.NewTask(ProcessBackwardsTaskType, data), taskID, nil } diff --git a/pkg/processor/transaction/structlog/block_processing.go b/pkg/processor/transaction/structlog/block_processing.go index 8873d36..3f58424 100644 --- a/pkg/processor/transaction/structlog/block_processing.go +++ b/pkg/processor/transaction/structlog/block_processing.go @@ -15,6 +15,9 @@ import ( "github.com/ethpandaops/execution-processor/pkg/state" ) +// ErrTaskIDConflict is returned when a task with the same ID already exists. +var ErrTaskIDConflict = asynq.ErrTaskIDConflict + // ProcessNextBlock processes the next available block(s). // In zero-interval mode, this attempts to fetch and process multiple blocks // up to the available capacity for improved throughput. @@ -55,6 +58,33 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { return nil } + // Distance-based pending block range check with orphan detection + blocked, blockingBlock, err := p.IsBlockedByIncompleteBlocks(ctx, nextBlock.Uint64(), p.processingMode) + if err != nil { + p.log.WithError(err).Warn("Failed to check incomplete blocks distance, proceeding anyway") + } else if blocked { + // Check if the blocking block is orphaned (no Redis tracking) + if blockingBlock != nil { + hasTracking, trackErr := p.completionTracker.HasBlockTracking( + ctx, *blockingBlock, p.network.Name, p.Name(), p.processingMode) + if trackErr != nil { + p.log.WithError(trackErr).Warn("Failed to check block tracking") + } else if !hasTracking { + // Orphaned block - reprocess it + p.log.WithFields(logrus.Fields{ + "blocking_block": *blockingBlock, + "next_block": nextBlock, + }).Warn("Detected orphaned block blocking progress, reprocessing") + + if reprocessErr := p.ReprocessBlock(ctx, *blockingBlock); reprocessErr != nil { + p.log.WithError(reprocessErr).Error("Failed to reprocess orphaned block") + } + } + } + + return nil + } + // Get available capacity for batch processing capacity, err := p.GetAvailableCapacity(ctx, nextBlock.Uint64(), p.processingMode) if err != nil { @@ -188,42 +218,35 @@ func (p *Processor) processBlock(ctx context.Context, block execution.Block) err // Calculate expected task count before enqueueing expectedTaskCount := len(block.Transactions()) - // Acquire exclusive lock on this block via Redis FIRST - if initErr := p.pendingTracker.InitBlock(ctx, blockNumber.Uint64(), expectedTaskCount, p.network.Name, p.Name(), p.processingMode); initErr != nil { - // If block is already being processed by another worker, skip gracefully - if errors.Is(initErr, tracker.ErrBlockAlreadyBeingProcessed) { - p.log.WithFields(logrus.Fields{ - "network": p.network.Name, - "block_number": blockNumber, - }).Debug("Block already being processed by another worker, skipping") - - common.BlockProcessingSkipped.WithLabelValues(p.network.Name, p.Name(), "already_processing").Inc() - - return nil - } - - p.log.WithError(initErr).WithFields(logrus.Fields{ + // 1. Mark block as enqueued in ClickHouse FIRST (complete=0) + // This records that we started processing this block + if markErr := p.stateManager.MarkBlockEnqueued(ctx, blockNumber.Uint64(), expectedTaskCount, p.network.Name, p.Name()); markErr != nil { + p.log.WithError(markErr).WithFields(logrus.Fields{ "network": p.network.Name, "block_number": blockNumber, - }).Error("could not init block tracking in Redis") + }).Error("could not mark block as enqueued") - return initErr + return markErr } - // Mark the block as enqueued AFTER acquiring Redis lock (phase 1 of two-phase completion) - if markErr := p.stateManager.MarkBlockEnqueued(ctx, blockNumber.Uint64(), expectedTaskCount, p.network.Name, p.Name()); markErr != nil { - p.log.WithError(markErr).WithFields(logrus.Fields{ + // 2. Register block for completion tracking in Redis (clears any old state) + // This uses SET-based tracking instead of counter-based + queue := p.getProcessForwardsQueue() + if p.processingMode == tracker.BACKWARDS_MODE { + queue = p.getProcessBackwardsQueue() + } + + if regErr := p.completionTracker.RegisterBlock(ctx, blockNumber.Uint64(), expectedTaskCount, p.network.Name, p.Name(), p.processingMode, queue); regErr != nil { + p.log.WithError(regErr).WithFields(logrus.Fields{ "network": p.network.Name, "block_number": blockNumber, - }).Error("could not mark block as enqueued") + }).Error("could not register block for completion tracking") - // Clean up Redis lock since we failed to mark in ClickHouse - _ = p.pendingTracker.CleanupBlock(ctx, blockNumber.Uint64(), p.network.Name, p.Name(), p.processingMode) - - return markErr + return regErr } - // Enqueue tasks for each transaction LAST + // 3. Enqueue tasks with TaskID deduplication + // Tasks may start processing immediately, but the block is already registered taskCount, err := p.EnqueueTransactionTasks(ctx, block) if err != nil { return fmt.Errorf("failed to enqueue transaction tasks: %w", err) @@ -250,9 +273,12 @@ func (p *Processor) processBlock(ctx context.Context, block execution.Block) err } // EnqueueTransactionTasks enqueues transaction processing tasks for a given block. +// Uses TaskID for deduplication - tasks with conflicting IDs are skipped but still count. func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution.Block) (int, error) { var enqueuedCount int + var skippedCount int + var errs []error for index, tx := range block.Transactions() { @@ -268,6 +294,8 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution // Create the task based on processing mode var task *asynq.Task + var taskID string + var queue string var taskType string @@ -275,11 +303,11 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution var err error if p.processingMode == tracker.BACKWARDS_MODE { - task, err = NewProcessBackwardsTask(payload) + task, taskID, err = NewProcessBackwardsTask(payload) queue = p.getProcessBackwardsQueue() taskType = ProcessBackwardsTaskType } else { - task, err = NewProcessForwardsTask(payload) + task, taskID, err = NewProcessForwardsTask(payload) queue = p.getProcessForwardsQueue() taskType = ProcessForwardsTaskType } @@ -290,8 +318,25 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution continue } - // Enqueue the task - if err := p.EnqueueTask(ctx, task, asynq.Queue(queue)); err != nil { + // Enqueue the task with TaskID for deduplication + err = p.EnqueueTask(ctx, task, + asynq.Queue(queue), + asynq.TaskID(taskID), + ) + + if errors.Is(err, ErrTaskIDConflict) { + // Task already exists - this is fine, it will still complete and be tracked + skippedCount++ + + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "tx_hash": tx.Hash().String(), + }).Debug("Task already exists (TaskID conflict), skipping") + + continue + } + + if err != nil { errs = append(errs, fmt.Errorf("failed to enqueue task for tx %s: %w", tx.Hash().String(), err)) continue @@ -306,12 +351,72 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution "block_number": block.Number(), "total_txs": len(block.Transactions()), "enqueued_count": enqueuedCount, + "skipped_count": skippedCount, "error_count": len(errs), }).Info("Enqueued transaction processing tasks") if len(errs) > 0 { - return enqueuedCount, fmt.Errorf("failed to enqueue %d tasks: %v", len(errs), errs[0]) + return enqueuedCount + skippedCount, fmt.Errorf("failed to enqueue %d tasks: %v", len(errs), errs[0]) + } + + // Return total count including skipped (already existing) tasks + return enqueuedCount + skippedCount, nil +} + +// ReprocessBlock re-enqueues tasks for an orphaned block. +// Used when a block is in ClickHouse (complete=0) but has no Redis tracking. +// TaskID deduplication ensures no duplicate tasks are created. +func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { + p.log.WithFields(logrus.Fields{ + "block_number": blockNum, + "network": p.network.Name, + }).Info("Reprocessing orphaned block") + + // Get execution node + node := p.pool.GetHealthyExecutionNode() + if node == nil { + return fmt.Errorf("no healthy execution node available") } - return enqueuedCount, nil + // Fetch block + block, err := node.BlockByNumber(ctx, new(big.Int).SetUint64(blockNum)) + if err != nil { + return fmt.Errorf("failed to fetch block %d: %w", blockNum, err) + } + + // Handle empty blocks - mark complete immediately + if len(block.Transactions()) == 0 { + p.log.WithField("block", blockNum).Debug("Empty orphaned block, marking as complete") + + return p.stateManager.MarkBlockComplete(ctx, blockNum, p.network.Name, p.Name()) + } + + expectedCount := len(block.Transactions()) + + // Determine queue based on processing mode + queue := p.getProcessForwardsQueue() + if p.processingMode == tracker.BACKWARDS_MODE { + queue = p.getProcessBackwardsQueue() + } + + // Register in Redis (clears any partial state) + if regErr := p.completionTracker.RegisterBlock( + ctx, blockNum, expectedCount, p.network.Name, p.Name(), p.processingMode, queue, + ); regErr != nil { + return fmt.Errorf("failed to register block: %w", regErr) + } + + // Enqueue tasks (TaskID handles dedup - existing tasks return ErrTaskIDConflict) + enqueuedCount, err := p.EnqueueTransactionTasks(ctx, block) + if err != nil { + return fmt.Errorf("failed to enqueue tasks: %w", err) + } + + p.log.WithFields(logrus.Fields{ + "block_number": blockNum, + "expected_count": expectedCount, + "enqueued_count": enqueuedCount, + }).Info("Reprocessed orphaned block") + + return nil } diff --git a/pkg/processor/transaction/structlog/handlers.go b/pkg/processor/transaction/structlog/handlers.go index c919a97..96fd820 100644 --- a/pkg/processor/transaction/structlog/handlers.go +++ b/pkg/processor/transaction/structlog/handlers.go @@ -63,8 +63,25 @@ func (p *Processor) handleProcessForwardsTask(ctx context.Context, task *asynq.T // Record successful processing common.TasksProcessed.WithLabelValues(p.network.Name, ProcessorName, tracker.ProcessForwardsQueue(ProcessorName), ProcessForwardsTaskType, "success").Inc() - // Track block completion using embedded Blocker - p.TrackBlockCompletion(ctx, blockNumber.Uint64(), tracker.FORWARDS_MODE) + // Track task completion using SET-based tracker + taskID := GenerateTaskID(p.network.Name, blockNumber.Uint64(), payload.TransactionHash) + + allComplete, trackErr := p.completionTracker.TrackTaskCompletion(ctx, taskID, blockNumber.Uint64(), p.network.Name, p.Name(), tracker.FORWARDS_MODE) + if trackErr != nil { + p.log.WithError(trackErr).WithFields(logrus.Fields{ + "block_number": blockNumber.Uint64(), + "task_id": taskID, + }).Warn("Failed to track task completion") + // Non-fatal - stale detection will catch it + } + + if allComplete { + if markErr := p.completionTracker.MarkBlockComplete(ctx, blockNumber.Uint64(), p.network.Name, p.Name(), tracker.FORWARDS_MODE); markErr != nil { + p.log.WithError(markErr).WithFields(logrus.Fields{ + "block_number": blockNumber.Uint64(), + }).Error("Failed to mark block complete") + } + } p.log.WithFields(logrus.Fields{ "transaction_hash": payload.TransactionHash, @@ -125,8 +142,25 @@ func (p *Processor) handleProcessBackwardsTask(ctx context.Context, task *asynq. // Record successful processing common.TasksProcessed.WithLabelValues(p.network.Name, ProcessorName, tracker.ProcessBackwardsQueue(ProcessorName), ProcessBackwardsTaskType, "success").Inc() - // Track block completion using embedded Blocker - p.TrackBlockCompletion(ctx, blockNumber.Uint64(), tracker.BACKWARDS_MODE) + // Track task completion using SET-based tracker + taskID := GenerateTaskID(p.network.Name, blockNumber.Uint64(), payload.TransactionHash) + + allComplete, trackErr := p.completionTracker.TrackTaskCompletion(ctx, taskID, blockNumber.Uint64(), p.network.Name, p.Name(), tracker.BACKWARDS_MODE) + if trackErr != nil { + p.log.WithError(trackErr).WithFields(logrus.Fields{ + "block_number": blockNumber.Uint64(), + "task_id": taskID, + }).Warn("Failed to track task completion") + // Non-fatal - stale detection will catch it + } + + if allComplete { + if markErr := p.completionTracker.MarkBlockComplete(ctx, blockNumber.Uint64(), p.network.Name, p.Name(), tracker.BACKWARDS_MODE); markErr != nil { + p.log.WithError(markErr).WithFields(logrus.Fields{ + "block_number": blockNumber.Uint64(), + }).Error("Failed to mark block complete") + } + } p.log.WithFields(logrus.Fields{ "transaction_hash": payload.TransactionHash, diff --git a/pkg/processor/transaction/structlog/processor.go b/pkg/processor/transaction/structlog/processor.go index 8abcf9b..4570550 100644 --- a/pkg/processor/transaction/structlog/processor.go +++ b/pkg/processor/transaction/structlog/processor.go @@ -50,7 +50,6 @@ type Processor struct { asynqClient *asynq.Client processingMode string redisPrefix string - pendingTracker *tracker.PendingTracker // Row buffer for batched ClickHouse inserts rowBuffer *rowbuffer.Buffer[Structlog] @@ -58,6 +57,9 @@ type Processor struct { // Embedded limiter for shared blocking/completion logic *tracker.Limiter + // Block completion tracker for SET-based task deduplication + completionTracker *tracker.BlockCompletionTracker + // Background metrics worker fields metricsStop chan struct{} metricsWg sync.WaitGroup @@ -92,33 +94,43 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { } log := deps.Log.WithField("processor", ProcessorName) - pendingTracker := tracker.NewPendingTracker(deps.RedisClient, deps.RedisPrefix, log) // Create the limiter for shared functionality limiter := tracker.NewLimiter( &tracker.LimiterDeps{ - Log: log, - StateProvider: deps.State, - PendingTracker: pendingTracker, - Network: deps.Network.Name, - Processor: ProcessorName, + Log: log, + StateProvider: deps.State, + Network: deps.Network.Name, + Processor: ProcessorName, }, tracker.LimiterConfig{ MaxPendingBlockRange: config.MaxPendingBlockRange, }, ) + // Create the block completion tracker for SET-based task deduplication + completionTracker := tracker.NewBlockCompletionTracker( + deps.RedisClient, + deps.RedisPrefix, + log, + deps.State, + tracker.BlockCompletionTrackerConfig{ + StaleThreshold: tracker.DefaultStaleThreshold, + AutoRetryStale: true, + }, + ) + processor := &Processor{ - log: log, - pool: deps.Pool, - stateManager: deps.State, - clickhouse: clickhouseClient, - config: config, - asynqClient: deps.AsynqClient, - processingMode: tracker.FORWARDS_MODE, // Default mode - redisPrefix: deps.RedisPrefix, - pendingTracker: pendingTracker, - Limiter: limiter, + log: log, + pool: deps.Pool, + stateManager: deps.State, + clickhouse: clickhouseClient, + config: config, + asynqClient: deps.AsynqClient, + processingMode: tracker.FORWARDS_MODE, // Default mode + redisPrefix: deps.RedisPrefix, + Limiter: limiter, + completionTracker: completionTracker, } processor.network = deps.Network @@ -224,6 +236,11 @@ func (p *Processor) SetProcessingMode(mode string) { p.log.WithField("mode", mode).Info("Processing mode updated") } +// GetCompletionTracker returns the block completion tracker. +func (p *Processor) GetCompletionTracker() *tracker.BlockCompletionTracker { + return p.completionTracker +} + // getProcessForwardsQueue returns the prefixed process forwards queue name. func (p *Processor) getProcessForwardsQueue() string { return tracker.PrefixedProcessForwardsQueue(ProcessorName, p.redisPrefix) diff --git a/pkg/processor/transaction/structlog/processor_test.go b/pkg/processor/transaction/structlog/processor_test.go index 1137a0c..6e394a2 100644 --- a/pkg/processor/transaction/structlog/processor_test.go +++ b/pkg/processor/transaction/structlog/processor_test.go @@ -375,7 +375,7 @@ func TestNewProcessForwardsTask(t *testing.T) { Network: "mainnet", } - task, err := transaction_structlog.NewProcessForwardsTask(payload) + task, taskID, err := transaction_structlog.NewProcessForwardsTask(payload) if err != nil { t.Fatalf("failed to create task: %v", err) } @@ -384,6 +384,12 @@ func TestNewProcessForwardsTask(t *testing.T) { t.Errorf("expected task type %s, got %s", transaction_structlog.ProcessForwardsTaskType, task.Type()) } + // Verify taskID is generated correctly + expectedTaskID := transaction_structlog.GenerateTaskID(payload.NetworkName, payload.BlockNumber.Uint64(), payload.TransactionHash) + if taskID != expectedTaskID { + t.Errorf("expected taskID %s, got %s", expectedTaskID, taskID) + } + // Verify payload can be unmarshaled from task var unmarshaled transaction_structlog.ProcessPayload @@ -410,7 +416,7 @@ func TestNewProcessBackwardsTask(t *testing.T) { Network: "mainnet", } - task, err := transaction_structlog.NewProcessBackwardsTask(payload) + task, taskID, err := transaction_structlog.NewProcessBackwardsTask(payload) if err != nil { t.Fatalf("failed to create task: %v", err) } @@ -419,6 +425,12 @@ func TestNewProcessBackwardsTask(t *testing.T) { t.Errorf("expected task type %s, got %s", transaction_structlog.ProcessBackwardsTaskType, task.Type()) } + // Verify taskID is generated correctly + expectedTaskID := transaction_structlog.GenerateTaskID(payload.NetworkName, payload.BlockNumber.Uint64(), payload.TransactionHash) + if taskID != expectedTaskID { + t.Errorf("expected taskID %s, got %s", expectedTaskID, taskID) + } + // Verify payload can be unmarshaled from task var unmarshaled transaction_structlog.ProcessPayload diff --git a/pkg/processor/transaction/structlog/tasks.go b/pkg/processor/transaction/structlog/tasks.go index 22c61d7..0475739 100644 --- a/pkg/processor/transaction/structlog/tasks.go +++ b/pkg/processor/transaction/structlog/tasks.go @@ -2,6 +2,7 @@ package structlog import ( "encoding/json" + "fmt" "math/big" "github.com/hibiken/asynq" @@ -37,26 +38,38 @@ func (p *ProcessPayload) UnmarshalBinary(data []byte) error { return json.Unmarshal(data, p) } +// GenerateTaskID creates a deterministic task ID for deduplication. +// Format: {processor}:{network}:{blockNum}:{txHash}. +func GenerateTaskID(network string, blockNumber uint64, txHash string) string { + return fmt.Sprintf("%s:%s:%d:%s", ProcessorName, network, blockNumber, txHash) +} + // NewProcessForwardsTask creates a new forwards process task. -func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error) { +// Returns the task, taskID for deduplication, and any error. +func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, string, error) { payload.ProcessingMode = tracker.FORWARDS_MODE data, err := json.Marshal(payload) if err != nil { - return nil, err + return nil, "", err } - return asynq.NewTask(ProcessForwardsTaskType, data), nil + taskID := GenerateTaskID(payload.NetworkName, payload.BlockNumber.Uint64(), payload.TransactionHash) + + return asynq.NewTask(ProcessForwardsTaskType, data), taskID, nil } // NewProcessBackwardsTask creates a new backwards process task. -func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error) { +// Returns the task, taskID for deduplication, and any error. +func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, string, error) { payload.ProcessingMode = tracker.BACKWARDS_MODE data, err := json.Marshal(payload) if err != nil { - return nil, err + return nil, "", err } - return asynq.NewTask(ProcessBackwardsTaskType, data), nil + taskID := GenerateTaskID(payload.NetworkName, payload.BlockNumber.Uint64(), payload.TransactionHash) + + return asynq.NewTask(ProcessBackwardsTaskType, data), taskID, nil } diff --git a/pkg/processor/transaction/structlog_agg/block_processing.go b/pkg/processor/transaction/structlog_agg/block_processing.go index 05c7773..f8f5d1c 100644 --- a/pkg/processor/transaction/structlog_agg/block_processing.go +++ b/pkg/processor/transaction/structlog_agg/block_processing.go @@ -15,6 +15,9 @@ import ( "github.com/ethpandaops/execution-processor/pkg/state" ) +// ErrTaskIDConflict is returned when a task with the same ID already exists. +var ErrTaskIDConflict = asynq.ErrTaskIDConflict + // ProcessNextBlock processes the next available block(s). // In zero-interval mode, this attempts to fetch and process multiple blocks // up to the available capacity for improved throughput. @@ -55,6 +58,33 @@ func (p *Processor) ProcessNextBlock(ctx context.Context) error { return nil } + // Distance-based pending block range check with orphan detection + blocked, blockingBlock, err := p.IsBlockedByIncompleteBlocks(ctx, nextBlock.Uint64(), p.processingMode) + if err != nil { + p.log.WithError(err).Warn("Failed to check incomplete blocks distance, proceeding anyway") + } else if blocked { + // Check if the blocking block is orphaned (no Redis tracking) + if blockingBlock != nil { + hasTracking, trackErr := p.completionTracker.HasBlockTracking( + ctx, *blockingBlock, p.network.Name, p.Name(), p.processingMode) + if trackErr != nil { + p.log.WithError(trackErr).Warn("Failed to check block tracking") + } else if !hasTracking { + // Orphaned block - reprocess it + p.log.WithFields(logrus.Fields{ + "blocking_block": *blockingBlock, + "next_block": nextBlock, + }).Warn("Detected orphaned block blocking progress, reprocessing") + + if reprocessErr := p.ReprocessBlock(ctx, *blockingBlock); reprocessErr != nil { + p.log.WithError(reprocessErr).Error("Failed to reprocess orphaned block") + } + } + } + + return nil + } + // Get available capacity for batch processing capacity, err := p.GetAvailableCapacity(ctx, nextBlock.Uint64(), p.processingMode) if err != nil { @@ -188,42 +218,35 @@ func (p *Processor) processBlock(ctx context.Context, block execution.Block) err // Calculate expected task count before enqueueing expectedTaskCount := len(block.Transactions()) - // Acquire exclusive lock on this block via Redis FIRST - if initErr := p.pendingTracker.InitBlock(ctx, blockNumber.Uint64(), expectedTaskCount, p.network.Name, p.Name(), p.processingMode); initErr != nil { - // If block is already being processed by another worker, skip gracefully - if errors.Is(initErr, tracker.ErrBlockAlreadyBeingProcessed) { - p.log.WithFields(logrus.Fields{ - "network": p.network.Name, - "block_number": blockNumber, - }).Debug("Block already being processed by another worker, skipping") - - common.BlockProcessingSkipped.WithLabelValues(p.network.Name, p.Name(), "already_processing").Inc() - - return nil - } - - p.log.WithError(initErr).WithFields(logrus.Fields{ + // 1. Mark block as enqueued in ClickHouse FIRST (complete=0) + // This records that we started processing this block + if markErr := p.stateManager.MarkBlockEnqueued(ctx, blockNumber.Uint64(), expectedTaskCount, p.network.Name, p.Name()); markErr != nil { + p.log.WithError(markErr).WithFields(logrus.Fields{ "network": p.network.Name, "block_number": blockNumber, - }).Error("could not init block tracking in Redis") + }).Error("could not mark block as enqueued") - return initErr + return markErr } - // Mark the block as enqueued AFTER acquiring Redis lock (phase 1 of two-phase completion) - if markErr := p.stateManager.MarkBlockEnqueued(ctx, blockNumber.Uint64(), expectedTaskCount, p.network.Name, p.Name()); markErr != nil { - p.log.WithError(markErr).WithFields(logrus.Fields{ + // 2. Register block for completion tracking in Redis (clears any old state) + // This uses SET-based tracking instead of counter-based + queue := p.getProcessForwardsQueue() + if p.processingMode == tracker.BACKWARDS_MODE { + queue = p.getProcessBackwardsQueue() + } + + if regErr := p.completionTracker.RegisterBlock(ctx, blockNumber.Uint64(), expectedTaskCount, p.network.Name, p.Name(), p.processingMode, queue); regErr != nil { + p.log.WithError(regErr).WithFields(logrus.Fields{ "network": p.network.Name, "block_number": blockNumber, - }).Error("could not mark block as enqueued") + }).Error("could not register block for completion tracking") - // Clean up Redis lock since we failed to mark in ClickHouse - _ = p.pendingTracker.CleanupBlock(ctx, blockNumber.Uint64(), p.network.Name, p.Name(), p.processingMode) - - return markErr + return regErr } - // Enqueue tasks for each transaction LAST + // 3. Enqueue tasks with TaskID deduplication + // Tasks may start processing immediately, but the block is already registered taskCount, err := p.EnqueueTransactionTasks(ctx, block) if err != nil { return fmt.Errorf("failed to enqueue transaction tasks: %w", err) @@ -250,9 +273,12 @@ func (p *Processor) processBlock(ctx context.Context, block execution.Block) err } // EnqueueTransactionTasks enqueues transaction processing tasks for a given block. +// Uses TaskID for deduplication - tasks with conflicting IDs are skipped but still count. func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution.Block) (int, error) { var enqueuedCount int + var skippedCount int + var errs []error for index, tx := range block.Transactions() { @@ -268,6 +294,8 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution // Create the task based on processing mode var task *asynq.Task + var taskID string + var queue string var taskType string @@ -275,11 +303,11 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution var err error if p.processingMode == tracker.BACKWARDS_MODE { - task, err = NewProcessBackwardsTask(payload) + task, taskID, err = NewProcessBackwardsTask(payload) queue = p.getProcessBackwardsQueue() taskType = ProcessBackwardsTaskType } else { - task, err = NewProcessForwardsTask(payload) + task, taskID, err = NewProcessForwardsTask(payload) queue = p.getProcessForwardsQueue() taskType = ProcessForwardsTaskType } @@ -290,8 +318,25 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution continue } - // Enqueue the task - if err := p.EnqueueTask(ctx, task, asynq.Queue(queue)); err != nil { + // Enqueue the task with TaskID for deduplication + err = p.EnqueueTask(ctx, task, + asynq.Queue(queue), + asynq.TaskID(taskID), + ) + + if errors.Is(err, ErrTaskIDConflict) { + // Task already exists - this is fine, it will still complete and be tracked + skippedCount++ + + p.log.WithFields(logrus.Fields{ + "task_id": taskID, + "tx_hash": tx.Hash().String(), + }).Debug("Task already exists (TaskID conflict), skipping") + + continue + } + + if err != nil { errs = append(errs, fmt.Errorf("failed to enqueue task for tx %s: %w", tx.Hash().String(), err)) continue @@ -306,12 +351,72 @@ func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution "block_number": block.Number(), "total_txs": len(block.Transactions()), "enqueued_count": enqueuedCount, + "skipped_count": skippedCount, "error_count": len(errs), }).Info("Enqueued transaction processing tasks") if len(errs) > 0 { - return enqueuedCount, fmt.Errorf("failed to enqueue %d tasks: %v", len(errs), errs[0]) + return enqueuedCount + skippedCount, fmt.Errorf("failed to enqueue %d tasks: %v", len(errs), errs[0]) + } + + // Return total count including skipped (already existing) tasks + return enqueuedCount + skippedCount, nil +} + +// ReprocessBlock re-enqueues tasks for an orphaned block. +// Used when a block is in ClickHouse (complete=0) but has no Redis tracking. +// TaskID deduplication ensures no duplicate tasks are created. +func (p *Processor) ReprocessBlock(ctx context.Context, blockNum uint64) error { + p.log.WithFields(logrus.Fields{ + "block_number": blockNum, + "network": p.network.Name, + }).Info("Reprocessing orphaned block") + + // Get execution node + node := p.pool.GetHealthyExecutionNode() + if node == nil { + return fmt.Errorf("no healthy execution node available") } - return enqueuedCount, nil + // Fetch block + block, err := node.BlockByNumber(ctx, new(big.Int).SetUint64(blockNum)) + if err != nil { + return fmt.Errorf("failed to fetch block %d: %w", blockNum, err) + } + + // Handle empty blocks - mark complete immediately + if len(block.Transactions()) == 0 { + p.log.WithField("block", blockNum).Debug("Empty orphaned block, marking as complete") + + return p.stateManager.MarkBlockComplete(ctx, blockNum, p.network.Name, p.Name()) + } + + expectedCount := len(block.Transactions()) + + // Determine queue based on processing mode + queue := p.getProcessForwardsQueue() + if p.processingMode == tracker.BACKWARDS_MODE { + queue = p.getProcessBackwardsQueue() + } + + // Register in Redis (clears any partial state) + if regErr := p.completionTracker.RegisterBlock( + ctx, blockNum, expectedCount, p.network.Name, p.Name(), p.processingMode, queue, + ); regErr != nil { + return fmt.Errorf("failed to register block: %w", regErr) + } + + // Enqueue tasks (TaskID handles dedup - existing tasks return ErrTaskIDConflict) + enqueuedCount, err := p.EnqueueTransactionTasks(ctx, block) + if err != nil { + return fmt.Errorf("failed to enqueue tasks: %w", err) + } + + p.log.WithFields(logrus.Fields{ + "block_number": blockNum, + "expected_count": expectedCount, + "enqueued_count": enqueuedCount, + }).Info("Reprocessed orphaned block") + + return nil } diff --git a/pkg/processor/transaction/structlog_agg/handlers.go b/pkg/processor/transaction/structlog_agg/handlers.go index e6bb887..a6d110f 100644 --- a/pkg/processor/transaction/structlog_agg/handlers.go +++ b/pkg/processor/transaction/structlog_agg/handlers.go @@ -63,8 +63,25 @@ func (p *Processor) handleProcessForwardsTask(ctx context.Context, task *asynq.T // Record successful processing common.TasksProcessed.WithLabelValues(p.network.Name, ProcessorName, tracker.ProcessForwardsQueue(ProcessorName), ProcessForwardsTaskType, "success").Inc() - // Track block completion using embedded Limiter - p.TrackBlockCompletion(ctx, blockNumber.Uint64(), tracker.FORWARDS_MODE) + // Track task completion using SET-based tracker + taskID := GenerateTaskID(p.network.Name, blockNumber.Uint64(), payload.TransactionHash) + + allComplete, trackErr := p.completionTracker.TrackTaskCompletion(ctx, taskID, blockNumber.Uint64(), p.network.Name, p.Name(), tracker.FORWARDS_MODE) + if trackErr != nil { + p.log.WithError(trackErr).WithFields(logrus.Fields{ + "block_number": blockNumber.Uint64(), + "task_id": taskID, + }).Warn("Failed to track task completion") + // Non-fatal - stale detection will catch it + } + + if allComplete { + if markErr := p.completionTracker.MarkBlockComplete(ctx, blockNumber.Uint64(), p.network.Name, p.Name(), tracker.FORWARDS_MODE); markErr != nil { + p.log.WithError(markErr).WithFields(logrus.Fields{ + "block_number": blockNumber.Uint64(), + }).Error("Failed to mark block complete") + } + } p.log.WithFields(logrus.Fields{ "transaction_hash": payload.TransactionHash, @@ -125,8 +142,25 @@ func (p *Processor) handleProcessBackwardsTask(ctx context.Context, task *asynq. // Record successful processing common.TasksProcessed.WithLabelValues(p.network.Name, ProcessorName, tracker.ProcessBackwardsQueue(ProcessorName), ProcessBackwardsTaskType, "success").Inc() - // Track block completion using embedded Limiter - p.TrackBlockCompletion(ctx, blockNumber.Uint64(), tracker.BACKWARDS_MODE) + // Track task completion using SET-based tracker + taskID := GenerateTaskID(p.network.Name, blockNumber.Uint64(), payload.TransactionHash) + + allComplete, trackErr := p.completionTracker.TrackTaskCompletion(ctx, taskID, blockNumber.Uint64(), p.network.Name, p.Name(), tracker.BACKWARDS_MODE) + if trackErr != nil { + p.log.WithError(trackErr).WithFields(logrus.Fields{ + "block_number": blockNumber.Uint64(), + "task_id": taskID, + }).Warn("Failed to track task completion") + // Non-fatal - stale detection will catch it + } + + if allComplete { + if markErr := p.completionTracker.MarkBlockComplete(ctx, blockNumber.Uint64(), p.network.Name, p.Name(), tracker.BACKWARDS_MODE); markErr != nil { + p.log.WithError(markErr).WithFields(logrus.Fields{ + "block_number": blockNumber.Uint64(), + }).Error("Failed to mark block complete") + } + } p.log.WithFields(logrus.Fields{ "transaction_hash": payload.TransactionHash, diff --git a/pkg/processor/transaction/structlog_agg/processor.go b/pkg/processor/transaction/structlog_agg/processor.go index 2e3a732..0a857ee 100644 --- a/pkg/processor/transaction/structlog_agg/processor.go +++ b/pkg/processor/transaction/structlog_agg/processor.go @@ -60,7 +60,6 @@ type Processor struct { asynqClient *asynq.Client processingMode string redisPrefix string - pendingTracker *tracker.PendingTracker // Row buffer for batched ClickHouse inserts rowBuffer *rowbuffer.Buffer[insertRow] @@ -68,6 +67,9 @@ type Processor struct { // Embedded limiter for shared blocking/completion logic *tracker.Limiter + // Block completion tracker for SET-based task deduplication + completionTracker *tracker.BlockCompletionTracker + // Background metrics worker fields metricsStop chan struct{} metricsWg sync.WaitGroup @@ -102,33 +104,43 @@ func New(deps *Dependencies, config *Config) (*Processor, error) { } log := deps.Log.WithField("processor", ProcessorName) - pendingTracker := tracker.NewPendingTracker(deps.RedisClient, deps.RedisPrefix, log) // Create the limiter for shared functionality limiter := tracker.NewLimiter( &tracker.LimiterDeps{ - Log: log, - StateProvider: deps.State, - PendingTracker: pendingTracker, - Network: deps.Network.Name, - Processor: ProcessorName, + Log: log, + StateProvider: deps.State, + Network: deps.Network.Name, + Processor: ProcessorName, }, tracker.LimiterConfig{ MaxPendingBlockRange: config.MaxPendingBlockRange, }, ) + // Create the block completion tracker for SET-based task deduplication + completionTracker := tracker.NewBlockCompletionTracker( + deps.RedisClient, + deps.RedisPrefix, + log, + deps.State, + tracker.BlockCompletionTrackerConfig{ + StaleThreshold: tracker.DefaultStaleThreshold, + AutoRetryStale: true, + }, + ) + processor := &Processor{ - log: log, - pool: deps.Pool, - stateManager: deps.State, - clickhouse: clickhouseClient, - config: config, - asynqClient: deps.AsynqClient, - processingMode: tracker.FORWARDS_MODE, // Default mode - redisPrefix: deps.RedisPrefix, - pendingTracker: pendingTracker, - Limiter: limiter, + log: log, + pool: deps.Pool, + stateManager: deps.State, + clickhouse: clickhouseClient, + config: config, + asynqClient: deps.AsynqClient, + processingMode: tracker.FORWARDS_MODE, // Default mode + redisPrefix: deps.RedisPrefix, + Limiter: limiter, + completionTracker: completionTracker, } processor.network = deps.Network @@ -234,6 +246,11 @@ func (p *Processor) SetProcessingMode(mode string) { p.log.WithField("mode", mode).Info("Processing mode updated") } +// GetCompletionTracker returns the block completion tracker. +func (p *Processor) GetCompletionTracker() *tracker.BlockCompletionTracker { + return p.completionTracker +} + // getProcessForwardsQueue returns the prefixed process forwards queue name. func (p *Processor) getProcessForwardsQueue() string { return tracker.PrefixedProcessForwardsQueue(ProcessorName, p.redisPrefix) diff --git a/pkg/processor/transaction/structlog_agg/tasks.go b/pkg/processor/transaction/structlog_agg/tasks.go index ef40735..41a5b0f 100644 --- a/pkg/processor/transaction/structlog_agg/tasks.go +++ b/pkg/processor/transaction/structlog_agg/tasks.go @@ -2,6 +2,7 @@ package structlog_agg import ( "encoding/json" + "fmt" "math/big" "github.com/hibiken/asynq" @@ -37,26 +38,38 @@ func (p *ProcessPayload) UnmarshalBinary(data []byte) error { return json.Unmarshal(data, p) } +// GenerateTaskID creates a deterministic task ID for deduplication. +// Format: {processor}:{network}:{blockNum}:{txHash}. +func GenerateTaskID(network string, blockNumber uint64, txHash string) string { + return fmt.Sprintf("%s:%s:%d:%s", ProcessorName, network, blockNumber, txHash) +} + // NewProcessForwardsTask creates a new forwards process task. -func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, error) { +// Returns the task, taskID for deduplication, and any error. +func NewProcessForwardsTask(payload *ProcessPayload) (*asynq.Task, string, error) { payload.ProcessingMode = tracker.FORWARDS_MODE data, err := json.Marshal(payload) if err != nil { - return nil, err + return nil, "", err } - return asynq.NewTask(ProcessForwardsTaskType, data), nil + taskID := GenerateTaskID(payload.NetworkName, payload.BlockNumber.Uint64(), payload.TransactionHash) + + return asynq.NewTask(ProcessForwardsTaskType, data), taskID, nil } // NewProcessBackwardsTask creates a new backwards process task. -func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, error) { +// Returns the task, taskID for deduplication, and any error. +func NewProcessBackwardsTask(payload *ProcessPayload) (*asynq.Task, string, error) { payload.ProcessingMode = tracker.BACKWARDS_MODE data, err := json.Marshal(payload) if err != nil { - return nil, err + return nil, "", err } - return asynq.NewTask(ProcessBackwardsTaskType, data), nil + taskID := GenerateTaskID(payload.NetworkName, payload.BlockNumber.Uint64(), payload.TransactionHash) + + return asynq.NewTask(ProcessBackwardsTaskType, data), taskID, nil }