diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 31980a996f2..ccb65f72ac9 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -550,6 +550,12 @@ impl ChainStore for MockChainStore { ) -> Result, Error> { unimplemented!() } + async fn block_hashes_by_block_numbers( + &self, + _numbers: &[BlockNumber], + ) -> Result>, Error> { + unimplemented!() + } async fn confirm_block_hash( &self, _number: BlockNumber, diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 658baa8be3e..4ad2d4f2e54 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -602,6 +602,12 @@ pub trait ChainStore: ChainHeadStore { number: BlockNumber, ) -> Result, Error>; + /// Return the hashes of all blocks with the given numbers (batch version) + async fn block_hashes_by_block_numbers( + &self, + numbers: &[BlockNumber], + ) -> Result>, Error>; + /// Confirm that block number `number` has hash `hash` and that the store /// may purge any other blocks with that number async fn confirm_block_hash( @@ -790,6 +796,19 @@ pub trait StatusStore: Send + Sync + 'static { block_number: BlockNumber, fetch_block_ptr: &dyn BlockPtrForNumber, ) -> Result, StoreError>; + + /// Like `get_public_proof_of_indexing` but accepts optional pre-fetched block hashes + /// to avoid redundant database lookups when processing batches of POI requests. + async fn get_public_proof_of_indexing_with_block_hash( + &self, + subgraph_id: &DeploymentHash, + block_number: BlockNumber, + prefetched_hashes: Option<&Vec>, + fetch_block_ptr: &dyn BlockPtrForNumber, + ) -> Result, StoreError>; + + /// Get the network for a deployment + async fn network_for_deployment(&self, id: &DeploymentHash) -> Result; } #[async_trait] diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index b8385866d33..7b3a16deaf9 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use async_trait::async_trait; use graph::data::query::Trace; @@ -15,7 +15,7 @@ use graph::components::versions::VERSIONS; use graph::data::graphql::{object, IntoValue, ObjectOrInterface, ValueMap}; use graph::data::subgraph::{status, DeploymentFeatures}; use graph::data::value::Object; -use graph::futures03::TryFutureExt; +use graph::futures03::{future, TryFutureExt}; use graph::prelude::*; use graph_graphql::prelude::{a, ExecutionContext, Resolver}; @@ -352,7 +352,10 @@ where )) } - fn resolve_proof_of_indexing(&self, field: &a::Field) -> Result { + async fn resolve_proof_of_indexing( + &self, + field: &a::Field, + ) -> Result { let deployment_id = field .get_required::("subgraph") .expect("Valid subgraphId required"); @@ -381,7 +384,7 @@ where let poi_fut = self .store .get_proof_of_indexing(&deployment_id, &indexer, block.clone()); - let poi = match graph::futures03::executor::block_on(poi_fut) { + let poi = match poi_fut.await { Ok(Some(poi)) => r::Value::String(format!("0x{}", hex::encode(poi))), Ok(None) => r::Value::Null, Err(e) => { @@ -414,40 +417,108 @@ where return Err(QueryExecutionError::TooExpensive); } - let mut public_poi_results = vec![]; - for request in requests { - let (poi_result, request) = match self - .store - .get_public_proof_of_indexing(&request.deployment, request.block_number, self) - .await - { - Ok(Some(poi)) => (Some(poi), request), - Ok(None) => (None, request), - Err(e) => { - error!( - self.logger, - "Failed to query public proof of indexing"; - "subgraph" => &request.deployment, - "block" => format!("{}", request.block_number), - "error" => format!("{:?}", e) - ); - (None, request) + // Step 1: Group requests by network and collect block numbers for batch lookup + let mut requests_by_network: HashMap> = HashMap::new(); + let mut request_networks: Vec> = Vec::with_capacity(requests.len()); + + for (idx, request) in requests.iter().enumerate() { + match self.store.network_for_deployment(&request.deployment).await { + Ok(network) => { + requests_by_network + .entry(network.clone()) + .or_default() + .push((idx, request.block_number)); + request_networks.push(Some(network)); } - }; + Err(_) => { + request_networks.push(None); + } + } + } + + // Step 2: Pre-fetch all block hashes per network in batch + let mut block_hash_cache: HashMap< + (String, BlockNumber), + Vec, + > = HashMap::new(); + + for (network, network_requests) in &requests_by_network { + let block_numbers: Vec = + network_requests.iter().map(|(_, num)| *num).collect(); - public_poi_results.push( - PublicProofOfIndexingResult { - deployment: request.deployment, - block: match poi_result { - Some((ref block, _)) => block.clone(), - None => PartialBlockPtr::from(request.block_number), - }, - proof_of_indexing: poi_result.map(|(_, poi)| poi), + if let Some(chain_store) = self.store.block_store().chain_store(network).await { + match chain_store + .block_hashes_by_block_numbers(&block_numbers) + .await + { + Ok(hashes) => { + for (num, hash_vec) in hashes { + block_hash_cache.insert((network.clone(), num), hash_vec); + } + } + Err(e) => { + debug!( + self.logger, + "Failed to batch fetch block hashes for network"; + "network" => network, + "error" => format!("{:?}", e) + ); + // Continue without pre-fetched hashes - will fall back to individual lookups + } } - .into_value(), - ) + } } + // Step 3: Process all POI requests in parallel, using cached block hashes + let poi_futures: Vec<_> = requests + .into_iter() + .zip(request_networks.into_iter()) + .map(|(request, network_opt)| { + let cache = &block_hash_cache; + async move { + let prefetched_hashes = network_opt + .as_ref() + .and_then(|network| cache.get(&(network.clone(), request.block_number))); + + let poi_result = match self + .store + .get_public_proof_of_indexing_with_block_hash( + &request.deployment, + request.block_number, + prefetched_hashes, + self, + ) + .await + { + Ok(Some(poi)) => Some(poi), + Ok(None) => None, + Err(e) => { + error!( + self.logger, + "Failed to query public proof of indexing"; + "subgraph" => &request.deployment, + "block" => format!("{}", request.block_number), + "error" => format!("{:?}", e) + ); + None + } + }; + + PublicProofOfIndexingResult { + deployment: request.deployment, + block: match poi_result { + Some((ref block, _)) => block.clone(), + None => PartialBlockPtr::from(request.block_number), + }, + proof_of_indexing: poi_result.map(|(_, poi)| poi), + } + .into_value() + } + }) + .collect(); + + let public_poi_results = future::join_all(poi_futures).await; + Ok(r::Value::List(public_poi_results)) } @@ -791,7 +862,7 @@ where field.name.as_str(), scalar_type.name.as_str(), ) { - ("Query", "proofOfIndexing", "Bytes") => self.resolve_proof_of_indexing(field), + ("Query", "proofOfIndexing", "Bytes") => self.resolve_proof_of_indexing(field).await, ("Query", "blockData", "JSONObject") => self.resolve_block_data(field).await, ("Query", "blockHashFromNumber", "Bytes") => { self.resolve_block_hash_from_number(field).await diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index bdf63f52c31..3d455b7d5bf 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -739,6 +739,58 @@ mod data { } } + /// Return the hashes of all blocks with the given block numbers (batch version) + pub(super) async fn block_hashes_by_block_numbers( + &self, + conn: &mut AsyncPgConnection, + chain: &str, + numbers: &[BlockNumber], + ) -> Result>, Error> { + if numbers.is_empty() { + return Ok(HashMap::new()); + } + + match self { + Storage::Shared => { + use public::ethereum_blocks as b; + + let results = b::table + .select((b::number, b::hash)) + .filter(b::network_name.eq(chain)) + .filter(b::number.eq_any(Vec::from_iter(numbers.iter().map(|&n| n as i64)))) + .load::<(i64, String)>(conn) + .await?; + + let mut map: HashMap> = HashMap::new(); + for (num, hash) in results { + let block_hash = hash.parse()?; + map.entry(num as BlockNumber).or_default().push(block_hash); + } + Ok(map) + } + Storage::Private(Schema { blocks, .. }) => { + let results = blocks + .table() + .select((blocks.number(), blocks.hash())) + .filter( + blocks + .number() + .eq_any(Vec::from_iter(numbers.iter().map(|&n| n as i64))), + ) + .load::<(i64, Vec)>(conn) + .await?; + + let mut map: HashMap> = HashMap::new(); + for (num, hash) in results { + map.entry(num as BlockNumber) + .or_default() + .push(BlockHash::from(hash)); + } + Ok(map) + } + } + } + pub(super) async fn confirm_block_hash( &self, conn: &mut AsyncPgConnection, @@ -2971,6 +3023,16 @@ impl ChainStoreTrait for ChainStore { .await } + async fn block_hashes_by_block_numbers( + &self, + numbers: &[BlockNumber], + ) -> Result>, Error> { + let mut conn = self.pool.get_permitted().await?; + self.storage + .block_hashes_by_block_numbers(&mut conn, &self.chain, numbers) + .await + } + async fn confirm_block_hash( &self, number: BlockNumber, diff --git a/store/postgres/src/store.rs b/store/postgres/src/store.rs index 4adec80ab5b..10996d0572f 100644 --- a/store/postgres/src/store.rs +++ b/store/postgres/src/store.rs @@ -171,6 +171,28 @@ impl StatusStore for Store { .await } + async fn get_public_proof_of_indexing_with_block_hash( + &self, + subgraph_id: &DeploymentHash, + block_number: BlockNumber, + prefetched_hashes: Option<&Vec>, + fetch_block_ptr: &dyn BlockPtrForNumber, + ) -> Result, StoreError> { + self.subgraph_store + .get_public_proof_of_indexing_with_block_hash( + subgraph_id, + block_number, + prefetched_hashes, + self.block_store().clone(), + fetch_block_ptr, + ) + .await + } + + async fn network_for_deployment(&self, id: &DeploymentHash) -> Result { + self.subgraph_store.network_for_deployment(id).await + } + async fn query_permit(&self) -> QueryPermit { // Status queries go to the primary shard. self.block_store.query_permit_primary().await diff --git a/store/postgres/src/subgraph_store.rs b/store/postgres/src/subgraph_store.rs index 478d21eba02..a6eacd12145 100644 --- a/store/postgres/src/subgraph_store.rs +++ b/store/postgres/src/subgraph_store.rs @@ -259,6 +259,33 @@ impl SubgraphStore { .await } + pub(crate) async fn get_public_proof_of_indexing_with_block_hash( + &self, + id: &DeploymentHash, + block_number: BlockNumber, + prefetched_hashes: Option<&Vec>, + block_store: impl BlockStore, + fetch_block_ptr: &dyn BlockPtrForNumber, + ) -> Result, StoreError> { + self.inner + .get_public_proof_of_indexing_with_block_hash( + id, + block_number, + prefetched_hashes, + block_store, + fetch_block_ptr, + ) + .await + } + + /// Get the network for a deployment + pub(crate) async fn network_for_deployment( + &self, + id: &DeploymentHash, + ) -> Result { + self.inner.network_for_deployment(id).await + } + pub fn notification_sender(&self) -> Arc { self.sender.clone() } @@ -622,6 +649,15 @@ impl Inner { Ok(site) } + /// Get the network for a deployment + pub(crate) async fn network_for_deployment( + &self, + id: &DeploymentHash, + ) -> Result { + let site = self.site(id).await?; + Ok(site.network.clone()) + } + /// Return the store and site for the active deployment of this /// deployment hash async fn store( @@ -1171,6 +1207,78 @@ impl Inner { })) } + /// Like `get_public_proof_of_indexing` but accepts optional pre-fetched block hashes + /// to avoid redundant database lookups when processing batches of POI requests. + pub(crate) async fn get_public_proof_of_indexing_with_block_hash( + &self, + id: &DeploymentHash, + block_number: BlockNumber, + prefetched_hashes: Option<&Vec>, + block_store: impl BlockStore, + fetch_block_ptr: &dyn BlockPtrForNumber, + ) -> Result, StoreError> { + let (store, site) = self.store(id).await?; + + let block_hash = match prefetched_hashes { + Some(hashes) if hashes.len() == 1 => { + // Use the pre-fetched hash directly + hashes[0].clone() + } + Some(hashes) if hashes.is_empty() => { + // No blocks found for this number, try RPC fallback + match fetch_block_ptr + .block_ptr_for_number(site.network.clone(), block_number) + .await + .ok() + .flatten() + { + None => return Ok(None), + Some(block_ptr) => block_ptr.hash, + } + } + _ => { + // Multiple hashes or no pre-fetched data - fall back to standard lookup + let chain_store = match block_store.chain_store(&site.network).await { + Some(chain_store) => chain_store, + None => return Ok(None), + }; + let mut hashes = chain_store + .block_hashes_by_block_number(block_number) + .await?; + + if hashes.len() == 1 { + hashes.pop().unwrap() + } else { + match fetch_block_ptr + .block_ptr_for_number(site.network.clone(), block_number) + .await + .ok() + .flatten() + { + None => return Ok(None), + Some(block_ptr) => block_ptr.hash, + } + } + } + }; + + let block_for_poi_query = BlockPtr::new(block_hash.clone(), block_number); + let indexer = Some(Address::ZERO); + let poi = store + .get_proof_of_indexing(site, &indexer, block_for_poi_query) + .await?; + + Ok(poi.map(|poi| { + ( + PartialBlockPtr { + number: block_number, + hash: Some(block_hash), + }, + poi, + ) + })) + } + // Only used by tests #[cfg(debug_assertions)] pub async fn find(