From 732c1f8a278c623ae1b1584594a6bf266cc95929 Mon Sep 17 00:00:00 2001 From: miratepuffin Date: Fri, 17 Apr 2026 19:33:38 +0100 Subject: [PATCH 01/11] Exposed heavy queries and exclusive writes --- raphtory-graphql/src/auth.rs | 41 ++++++++----------- raphtory-graphql/src/cli.rs | 21 +++++++++- raphtory-graphql/src/config/app_config.rs | 26 +++++++++++- .../src/config/concurrency_config.rs | 18 ++++++++ raphtory-graphql/src/config/mod.rs | 1 + raphtory-graphql/src/server.rs | 2 +- 6 files changed, 82 insertions(+), 27 deletions(-) create mode 100644 raphtory-graphql/src/config/concurrency_config.rs diff --git a/raphtory-graphql/src/auth.rs b/raphtory-graphql/src/auth.rs index 1626bf38a3..0530fcd44b 100644 --- a/raphtory-graphql/src/auth.rs +++ b/raphtory-graphql/src/auth.rs @@ -1,4 +1,4 @@ -use crate::config::auth_config::{AuthConfig, PublicKey}; +use crate::config::{app_config::AppConfig, auth_config::PublicKey}; use async_graphql::{ async_trait, extensions::{Extension, ExtensionContext, ExtensionFactory, NextParseQuery}, @@ -33,36 +33,29 @@ pub(crate) struct TokenClaims { // TODO: maybe this should be renamed as it doens't only take care of auth anymore pub struct AuthenticatedGraphQL { executor: E, - config: AuthConfig, + config: AppConfig, semaphore: Option, lock: Option>, } impl AuthenticatedGraphQL { /// Create a GraphQL endpoint. - pub fn new(executor: E, config: AuthConfig) -> Self { + pub fn new(executor: E, config: AppConfig) -> Self { + let semaphore = config.concurrency.heavy_query_limit.map(|limit| { + println!("Server running with concurrency limited to {limit} for heavy queries"); + Semaphore::new(limit) + }); + let lock = if config.concurrency.exclusive_writes { + println!("Server running with exclusive writes"); + Some(RwLock::new(())) + } else { + None + }; Self { executor, config, - semaphore: std::env::var("RAPHTORY_CONCURRENCY_LIMIT") - .ok() - .and_then(|limit| { - let limit = limit.parse::().ok()?; - println!( - "Server running with concurrency limited to {limit} for heavy queries" - ); - Some(Semaphore::new(limit)) - }), - lock: std::env::var("RAPHTORY_THREADSAFE") - .ok() - .and_then(|thread_safe| { - if thread_safe == "1" { - println!("Server running in threadsafe mode"); - Some(RwLock::new(())) - } else { - None - } - }), + semaphore, + lock, } } } @@ -124,7 +117,7 @@ where async fn call(&self, req: Request) -> Result { // here ANY error when trying to validate the Authorization header is equivalent to it not being present at all - let access = match &self.config.public_key { + let access = match &self.config.auth.public_key { Some(public_key) => { let presented_access = req .header(AUTHORIZATION) @@ -132,7 +125,7 @@ where match presented_access { Some(access) => access, None => { - if self.config.enabled_for_reads { + if self.config.auth.enabled_for_reads { return Err(Unauthorized(AuthError::RequireRead)); } else { Access::Ro // if read access is not required, we give read access to all requests diff --git a/raphtory-graphql/src/cli.rs b/raphtory-graphql/src/cli.rs index 63a0577cf7..f285b4e965 100644 --- a/raphtory-graphql/src/cli.rs +++ b/raphtory-graphql/src/cli.rs @@ -5,6 +5,7 @@ use crate::{ app_config::AppConfigBuilder, auth_config::{DEFAULT_AUTH_ENABLED_FOR_READS, PUBLIC_KEY_DECODING_ERR_MSG}, cache_config::{DEFAULT_CAPACITY, DEFAULT_TTI_SECONDS}, + concurrency_config::DEFAULT_EXCLUSIVE_WRITES, log_config::DEFAULT_LOG_LEVEL, otlp_config::{ TracingLevel, DEFAULT_OTLP_AGENT_HOST, DEFAULT_OTLP_AGENT_PORT, @@ -78,6 +79,22 @@ struct ServerArgs { #[arg(long, env = "RAPHTORY_AUTH_ENABLED_FOR_READS", default_value_t = DEFAULT_AUTH_ENABLED_FOR_READS, help = "Enable auth for reads")] auth_enabled_for_reads: bool, + #[arg( + long, + env = "RAPHTORY_HEAVY_QUERY_LIMIT", + default_value = None, + help = "Restricts how many expensive graph traversal queries can execute simultaneously. Covers operations like connected components, edge traversals, and neighbour lookups (outComponent, inComponent, edges, outEdges, inEdges, neighbours, outNeighbours, inNeighbours). Once the limit is exceeded, queries are parked on a semaphore and wait until a slot becomes available before executing." + )] + heavy_query_limit: Option, + + #[arg( + long, + env = "RAPHTORY_EXCLUSIVE_WRITES", + default_value_t = DEFAULT_EXCLUSIVE_WRITES, + help = "Ensures only one ingestion/write operation runs at a time and blocks reads until it completes." + )] + exclusive_writes: bool, + #[arg(long, env = "RAPHTORY_PUBLIC_DIR", default_value = None, help = "Public directory path")] public_dir: Option, @@ -114,7 +131,9 @@ where .with_auth_public_key(server_args.auth_public_key) .expect(PUBLIC_KEY_DECODING_ERR_MSG) .with_public_dir(server_args.public_dir) - .with_auth_enabled_for_reads(server_args.auth_enabled_for_reads); + .with_auth_enabled_for_reads(server_args.auth_enabled_for_reads) + .with_heavy_query_limit(server_args.heavy_query_limit) + .with_exclusive_writes(server_args.exclusive_writes); #[cfg(feature = "search")] { diff --git a/raphtory-graphql/src/config/app_config.rs b/raphtory-graphql/src/config/app_config.rs index 9404d678e6..430a7371d2 100644 --- a/raphtory-graphql/src/config/app_config.rs +++ b/raphtory-graphql/src/config/app_config.rs @@ -1,6 +1,7 @@ use super::auth_config::{AuthConfig, PublicKeyError, PUBLIC_KEY_DECODING_ERR_MSG}; use crate::config::{ - cache_config::CacheConfig, log_config::LoggingConfig, otlp_config::TracingConfig, + cache_config::CacheConfig, concurrency_config::ConcurrencyConfig, log_config::LoggingConfig, + otlp_config::TracingConfig, }; use config::{Config, ConfigError, File}; use serde::{Deserialize, Serialize}; @@ -16,6 +17,7 @@ pub struct AppConfig { pub cache: CacheConfig, pub tracing: TracingConfig, pub auth: AuthConfig, + pub concurrency: ConcurrencyConfig, pub public_dir: Option, #[cfg(feature = "search")] pub index: IndexConfig, @@ -26,6 +28,7 @@ pub struct AppConfigBuilder { cache: CacheConfig, tracing: TracingConfig, auth: AuthConfig, + concurrency: ConcurrencyConfig, public_dir: Option, #[cfg(feature = "search")] index: IndexConfig, @@ -38,6 +41,7 @@ impl From for AppConfigBuilder { cache: config.cache, tracing: config.tracing, auth: config.auth, + concurrency: config.concurrency, public_dir: config.public_dir, #[cfg(feature = "search")] index: config.index, @@ -111,6 +115,16 @@ impl AppConfigBuilder { self } + pub fn with_heavy_query_limit(mut self, heavy_query_limit: Option) -> Self { + self.concurrency.heavy_query_limit = heavy_query_limit; + self + } + + pub fn with_exclusive_writes(mut self, exclusive_writes: bool) -> Self { + self.concurrency.exclusive_writes = exclusive_writes; + self + } + pub fn with_public_dir(mut self, public_dir: Option) -> Self { self.public_dir = public_dir; self @@ -128,6 +142,7 @@ impl AppConfigBuilder { cache: self.cache, tracing: self.tracing, auth: self.auth, + concurrency: self.concurrency, public_dir: self.public_dir, #[cfg(feature = "search")] index: self.index, @@ -199,6 +214,15 @@ pub fn load_config( app_config_builder = app_config_builder.with_auth_enabled_for_reads(enabled_for_reads); } + if let Ok(heavy_query_limit) = + settings.get::>("concurrency.heavy_query_limit") + { + app_config_builder = app_config_builder.with_heavy_query_limit(heavy_query_limit); + } + if let Ok(exclusive_writes) = settings.get::("concurrency.exclusive_writes") { + app_config_builder = app_config_builder.with_exclusive_writes(exclusive_writes); + } + if let Ok(public_dir) = settings.get::>("public_dir") { app_config_builder = app_config_builder.with_public_dir(public_dir); } diff --git a/raphtory-graphql/src/config/concurrency_config.rs b/raphtory-graphql/src/config/concurrency_config.rs new file mode 100644 index 0000000000..59163afeb8 --- /dev/null +++ b/raphtory-graphql/src/config/concurrency_config.rs @@ -0,0 +1,18 @@ +use serde::{Deserialize, Serialize}; + +pub const DEFAULT_EXCLUSIVE_WRITES: bool = false; + +/// Controls how Raphtory schedules concurrent GraphQL work. +#[derive(Debug, Default, Deserialize, PartialEq, Clone, Serialize)] +pub struct ConcurrencyConfig { + /// Restricts how many expensive graph traversal queries can execute simultaneously. + /// Covers operations like connected components, edge traversals, and neighbour lookups + /// (outComponent, inComponent, edges, outEdges, inEdges, neighbours, outNeighbours, + /// inNeighbours). Once the limit is exceeded, queries are parked on a semaphore and + /// wait until a slot becomes available before executing. `None` means unlimited. + pub heavy_query_limit: Option, + + /// Ensures only one ingestion/write operation runs at a time and blocks reads until + /// it completes. + pub exclusive_writes: bool, +} diff --git a/raphtory-graphql/src/config/mod.rs b/raphtory-graphql/src/config/mod.rs index e0ae764243..ecb442929e 100644 --- a/raphtory-graphql/src/config/mod.rs +++ b/raphtory-graphql/src/config/mod.rs @@ -1,6 +1,7 @@ pub mod app_config; pub mod auth_config; pub mod cache_config; +pub mod concurrency_config; #[cfg(feature = "search")] pub mod index_config; pub mod log_config; diff --git a/raphtory-graphql/src/server.rs b/raphtory-graphql/src/server.rs index c4e429592f..c7f73fdb63 100644 --- a/raphtory-graphql/src/server.rs +++ b/raphtory-graphql/src/server.rs @@ -265,7 +265,7 @@ impl GraphServer { "/", PublicFilesEndpoint::new( self.config.public_dir.clone(), - AuthenticatedGraphQL::new(schema, self.config.auth.clone()), + AuthenticatedGraphQL::new(schema, self.config.clone()), ), ) .at("/health", get(health)) From b9883867556cba8e2ebc244bc7732ed9a8e7cb5d Mon Sep 17 00:00:00 2001 From: miratepuffin Date: Fri, 17 Apr 2026 20:08:04 +0100 Subject: [PATCH 02/11] Add graphql schema flags --- raphtory-graphql/src/cli.rs | 48 ++++++++++++++++- raphtory-graphql/src/config/app_config.rs | 57 ++++++++++++++++++-- raphtory-graphql/src/config/mod.rs | 1 + raphtory-graphql/src/config/schema_config.rs | 30 +++++++++++ raphtory-graphql/src/server.rs | 22 ++++++-- 5 files changed, 150 insertions(+), 8 deletions(-) create mode 100644 raphtory-graphql/src/config/schema_config.rs diff --git a/raphtory-graphql/src/cli.rs b/raphtory-graphql/src/cli.rs index f285b4e965..b3b54a7014 100644 --- a/raphtory-graphql/src/cli.rs +++ b/raphtory-graphql/src/cli.rs @@ -11,6 +11,7 @@ use crate::{ TracingLevel, DEFAULT_OTLP_AGENT_HOST, DEFAULT_OTLP_AGENT_PORT, DEFAULT_OTLP_TRACING_SERVICE_NAME, DEFAULT_TRACING_ENABLED, DEFAULT_TRACING_LEVEL, }, + schema_config::DEFAULT_DISABLE_INTROSPECTION, }, model::App, server::DEFAULT_PORT, @@ -95,6 +96,46 @@ struct ServerArgs { )] exclusive_writes: bool, + #[arg( + long, + env = "RAPHTORY_MAX_QUERY_DEPTH", + default_value = None, + help = "Limits how deeply nested a query can be." + )] + max_query_depth: Option, + + #[arg( + long, + env = "RAPHTORY_MAX_QUERY_COMPLEXITY", + default_value = None, + help = "Limits the total estimated cost of a query based on the number of fields selected. Blocks queries that try to fetch too much data in one request." + )] + max_query_complexity: Option, + + #[arg( + long, + env = "RAPHTORY_MAX_RECURSIVE_DEPTH", + default_value = None, + help = "Internal safety limit to prevent stack overflows from pathologically structured queries. Falls back to the async-graphql default of 32 if unset." + )] + max_recursive_depth: Option, + + #[arg( + long, + env = "RAPHTORY_MAX_DIRECTIVES_PER_FIELD", + default_value = None, + help = "Limits the number of GraphQL directives on any single field. Directives are annotations prefixed with @ that modify how a field is executed (e.g. @skip, @include, @deprecated)." + )] + max_directives_per_field: Option, + + #[arg( + long, + env = "RAPHTORY_DISABLE_INTROSPECTION", + default_value_t = DEFAULT_DISABLE_INTROSPECTION, + help = "Fully disable schema introspection, preventing clients from discovering the API's structure and available fields. Recommended for production." + )] + disable_introspection: bool, + #[arg(long, env = "RAPHTORY_PUBLIC_DIR", default_value = None, help = "Public directory path")] public_dir: Option, @@ -133,7 +174,12 @@ where .with_public_dir(server_args.public_dir) .with_auth_enabled_for_reads(server_args.auth_enabled_for_reads) .with_heavy_query_limit(server_args.heavy_query_limit) - .with_exclusive_writes(server_args.exclusive_writes); + .with_exclusive_writes(server_args.exclusive_writes) + .with_max_query_depth(server_args.max_query_depth) + .with_max_query_complexity(server_args.max_query_complexity) + .with_max_recursive_depth(server_args.max_recursive_depth) + .with_max_directives_per_field(server_args.max_directives_per_field) + .with_disable_introspection(server_args.disable_introspection); #[cfg(feature = "search")] { diff --git a/raphtory-graphql/src/config/app_config.rs b/raphtory-graphql/src/config/app_config.rs index 430a7371d2..4cf80aa8f1 100644 --- a/raphtory-graphql/src/config/app_config.rs +++ b/raphtory-graphql/src/config/app_config.rs @@ -1,7 +1,7 @@ use super::auth_config::{AuthConfig, PublicKeyError, PUBLIC_KEY_DECODING_ERR_MSG}; use crate::config::{ cache_config::CacheConfig, concurrency_config::ConcurrencyConfig, log_config::LoggingConfig, - otlp_config::TracingConfig, + otlp_config::TracingConfig, schema_config::SchemaConfig, }; use config::{Config, ConfigError, File}; use serde::{Deserialize, Serialize}; @@ -18,6 +18,7 @@ pub struct AppConfig { pub tracing: TracingConfig, pub auth: AuthConfig, pub concurrency: ConcurrencyConfig, + pub schema: SchemaConfig, pub public_dir: Option, #[cfg(feature = "search")] pub index: IndexConfig, @@ -29,6 +30,7 @@ pub struct AppConfigBuilder { tracing: TracingConfig, auth: AuthConfig, concurrency: ConcurrencyConfig, + schema: SchemaConfig, public_dir: Option, #[cfg(feature = "search")] index: IndexConfig, @@ -42,6 +44,7 @@ impl From for AppConfigBuilder { tracing: config.tracing, auth: config.auth, concurrency: config.concurrency, + schema: config.schema, public_dir: config.public_dir, #[cfg(feature = "search")] index: config.index, @@ -125,6 +128,34 @@ impl AppConfigBuilder { self } + pub fn with_max_query_depth(mut self, max_query_depth: Option) -> Self { + self.schema.max_query_depth = max_query_depth; + self + } + + pub fn with_max_query_complexity(mut self, max_query_complexity: Option) -> Self { + self.schema.max_query_complexity = max_query_complexity; + self + } + + pub fn with_max_recursive_depth(mut self, max_recursive_depth: Option) -> Self { + self.schema.max_recursive_depth = max_recursive_depth; + self + } + + pub fn with_max_directives_per_field( + mut self, + max_directives_per_field: Option, + ) -> Self { + self.schema.max_directives_per_field = max_directives_per_field; + self + } + + pub fn with_disable_introspection(mut self, disable_introspection: bool) -> Self { + self.schema.disable_introspection = disable_introspection; + self + } + pub fn with_public_dir(mut self, public_dir: Option) -> Self { self.public_dir = public_dir; self @@ -143,6 +174,7 @@ impl AppConfigBuilder { tracing: self.tracing, auth: self.auth, concurrency: self.concurrency, + schema: self.schema, public_dir: self.public_dir, #[cfg(feature = "search")] index: self.index, @@ -214,15 +246,32 @@ pub fn load_config( app_config_builder = app_config_builder.with_auth_enabled_for_reads(enabled_for_reads); } - if let Ok(heavy_query_limit) = - settings.get::>("concurrency.heavy_query_limit") - { + if let Ok(heavy_query_limit) = settings.get::>("concurrency.heavy_query_limit") { app_config_builder = app_config_builder.with_heavy_query_limit(heavy_query_limit); } if let Ok(exclusive_writes) = settings.get::("concurrency.exclusive_writes") { app_config_builder = app_config_builder.with_exclusive_writes(exclusive_writes); } + if let Ok(max_query_depth) = settings.get::>("schema.max_query_depth") { + app_config_builder = app_config_builder.with_max_query_depth(max_query_depth); + } + if let Ok(max_query_complexity) = settings.get::>("schema.max_query_complexity") { + app_config_builder = app_config_builder.with_max_query_complexity(max_query_complexity); + } + if let Ok(max_recursive_depth) = settings.get::>("schema.max_recursive_depth") { + app_config_builder = app_config_builder.with_max_recursive_depth(max_recursive_depth); + } + if let Ok(max_directives_per_field) = + settings.get::>("schema.max_directives_per_field") + { + app_config_builder = + app_config_builder.with_max_directives_per_field(max_directives_per_field); + } + if let Ok(disable_introspection) = settings.get::("schema.disable_introspection") { + app_config_builder = app_config_builder.with_disable_introspection(disable_introspection); + } + if let Ok(public_dir) = settings.get::>("public_dir") { app_config_builder = app_config_builder.with_public_dir(public_dir); } diff --git a/raphtory-graphql/src/config/mod.rs b/raphtory-graphql/src/config/mod.rs index ecb442929e..9ef7311dec 100644 --- a/raphtory-graphql/src/config/mod.rs +++ b/raphtory-graphql/src/config/mod.rs @@ -6,6 +6,7 @@ pub mod concurrency_config; pub mod index_config; pub mod log_config; pub mod otlp_config; +pub mod schema_config; #[cfg(test)] mod tests { diff --git a/raphtory-graphql/src/config/schema_config.rs b/raphtory-graphql/src/config/schema_config.rs new file mode 100644 index 0000000000..b327617bb6 --- /dev/null +++ b/raphtory-graphql/src/config/schema_config.rs @@ -0,0 +1,30 @@ +use serde::{Deserialize, Serialize}; + +pub const DEFAULT_DISABLE_INTROSPECTION: bool = false; + +/// Controls GraphQL schema-level protections applied when the server builds its schema. +#[derive(Debug, Default, Deserialize, PartialEq, Clone, Serialize)] +pub struct SchemaConfig { + /// Limits how deeply nested a query can be. For example, a query like + /// graph → nodes → page → edges → page → destination → edges → page → destination + /// would have a depth of 9. `None` means unlimited. + pub max_query_depth: Option, + + /// Limits the total estimated cost of a query based on the number of fields selected. + /// Blocks queries that try to fetch too much data in one request. `None` means unlimited. + pub max_query_complexity: Option, + + /// Internal safety limit to prevent stack overflows from pathologically structured + /// queries. `None` falls back to the async-graphql default of 32. + pub max_recursive_depth: Option, + + /// Limits the number of GraphQL directives on any single field. Directives are + /// annotations prefixed with @ that modify how a field is executed (e.g. @skip, + /// @include, @deprecated). This prevents directive-based abuse. `None` means unlimited. + pub max_directives_per_field: Option, + + /// When true, schema introspection is fully disabled, preventing clients from + /// discovering the API's structure and available fields. Recommended for production + /// to reduce the attack surface. + pub disable_introspection: bool, +} diff --git a/raphtory-graphql/src/server.rs b/raphtory-graphql/src/server.rs index c7f73fdb63..e9c5263f8a 100644 --- a/raphtory-graphql/src/server.rs +++ b/raphtory-graphql/src/server.rs @@ -247,9 +247,25 @@ impl GraphServer { &self, tracer: Option, ) -> Result>, ServerError> { - let schema_builder = App::create_schema(); - let schema_builder = schema_builder.data(self.data.clone()); - let schema_builder = schema_builder.extension(MutationAuth); + let schema_cfg = &self.config.schema; + let mut schema_builder = App::create_schema() + .data(self.data.clone()) + .extension(MutationAuth); + if let Some(depth) = schema_cfg.max_query_depth { + schema_builder = schema_builder.limit_depth(depth); + } + if let Some(complexity) = schema_cfg.max_query_complexity { + schema_builder = schema_builder.limit_complexity(complexity); + } + if let Some(recursive_depth) = schema_cfg.max_recursive_depth { + schema_builder = schema_builder.limit_recursive_depth(recursive_depth); + } + if let Some(max_directives) = schema_cfg.max_directives_per_field { + schema_builder = schema_builder.limit_directives(max_directives); + } + if schema_cfg.disable_introspection { + schema_builder = schema_builder.disable_introspection(); + } let trace_level = self.config.tracing.tracing_level.clone(); let schema = if let Some(t) = tracer { schema_builder From 258ae98f5c54445ca9e16e64780a60f770693e3c Mon Sep 17 00:00:00 2001 From: miratepuffin Date: Fri, 17 Apr 2026 20:14:31 +0100 Subject: [PATCH 03/11] Added batch controls --- raphtory-graphql/src/auth.rs | 23 ++++++++++++++++--- raphtory-graphql/src/cli.rs | 20 +++++++++++++++- raphtory-graphql/src/config/app_config.rs | 16 +++++++++++++ .../src/config/concurrency_config.rs | 11 +++++++++ 4 files changed, 66 insertions(+), 4 deletions(-) diff --git a/raphtory-graphql/src/auth.rs b/raphtory-graphql/src/auth.rs index 0530fcd44b..4a762530c5 100644 --- a/raphtory-graphql/src/auth.rs +++ b/raphtory-graphql/src/auth.rs @@ -10,7 +10,7 @@ use async_graphql_poem::{GraphQLBatchRequest, GraphQLBatchResponse, GraphQLReque use futures_util::StreamExt; use jsonwebtoken::{decode, Algorithm, Validation}; use poem::{ - error::{TooManyRequests, Unauthorized}, + error::{BadRequest, TooManyRequests, Unauthorized}, Body, Endpoint, FromRequest, IntoResponse, Request, Response, Result, }; use reqwest::header::AUTHORIZATION; @@ -96,6 +96,10 @@ pub enum AuthError { RequireRead, #[error("The requested endpoint requires write access")] RequireWrite, + #[error("Query batching is disabled on this server")] + BatchingDisabled, + #[error("Batch size {actual} exceeds the maximum allowed {max}")] + BatchSizeExceeded { max: usize, actual: usize }, } impl From for ServerError { @@ -154,8 +158,21 @@ where ))) } else { let (req, mut body) = req.split(); - let req = GraphQLBatchRequest::from_request(&req, &mut body).await?; - let req = req.0.data(access); + let batch_req = GraphQLBatchRequest::from_request(&req, &mut body).await?.0; + + if let BatchRequest::Batch(requests) = &batch_req { + if self.config.concurrency.disable_batching { + return Err(BadRequest(AuthError::BatchingDisabled)); + } + if let Some(max) = self.config.concurrency.max_batch_size { + let actual = requests.len(); + if actual > max { + return Err(BadRequest(AuthError::BatchSizeExceeded { max, actual })); + } + } + } + + let req = batch_req.data(access); let contains_update = match &req { BatchRequest::Single(request) => request.query.contains("updateGraph"), diff --git a/raphtory-graphql/src/cli.rs b/raphtory-graphql/src/cli.rs index b3b54a7014..d52237bce1 100644 --- a/raphtory-graphql/src/cli.rs +++ b/raphtory-graphql/src/cli.rs @@ -5,7 +5,7 @@ use crate::{ app_config::AppConfigBuilder, auth_config::{DEFAULT_AUTH_ENABLED_FOR_READS, PUBLIC_KEY_DECODING_ERR_MSG}, cache_config::{DEFAULT_CAPACITY, DEFAULT_TTI_SECONDS}, - concurrency_config::DEFAULT_EXCLUSIVE_WRITES, + concurrency_config::{DEFAULT_DISABLE_BATCHING, DEFAULT_EXCLUSIVE_WRITES}, log_config::DEFAULT_LOG_LEVEL, otlp_config::{ TracingLevel, DEFAULT_OTLP_AGENT_HOST, DEFAULT_OTLP_AGENT_PORT, @@ -96,6 +96,22 @@ struct ServerArgs { )] exclusive_writes: bool, + #[arg( + long, + env = "RAPHTORY_DISABLE_BATCHING", + default_value_t = DEFAULT_DISABLE_BATCHING, + help = "Rejects batched GraphQL requests outright. Batching can otherwise be used to circumvent per-request depth and complexity limits." + )] + disable_batching: bool, + + #[arg( + long, + env = "RAPHTORY_MAX_BATCH_SIZE", + default_value = None, + help = "Caps the number of queries accepted in a single batched HTTP request. Requests whose batch exceeds this size are rejected." + )] + max_batch_size: Option, + #[arg( long, env = "RAPHTORY_MAX_QUERY_DEPTH", @@ -175,6 +191,8 @@ where .with_auth_enabled_for_reads(server_args.auth_enabled_for_reads) .with_heavy_query_limit(server_args.heavy_query_limit) .with_exclusive_writes(server_args.exclusive_writes) + .with_disable_batching(server_args.disable_batching) + .with_max_batch_size(server_args.max_batch_size) .with_max_query_depth(server_args.max_query_depth) .with_max_query_complexity(server_args.max_query_complexity) .with_max_recursive_depth(server_args.max_recursive_depth) diff --git a/raphtory-graphql/src/config/app_config.rs b/raphtory-graphql/src/config/app_config.rs index 4cf80aa8f1..70223a8ec1 100644 --- a/raphtory-graphql/src/config/app_config.rs +++ b/raphtory-graphql/src/config/app_config.rs @@ -128,6 +128,16 @@ impl AppConfigBuilder { self } + pub fn with_disable_batching(mut self, disable_batching: bool) -> Self { + self.concurrency.disable_batching = disable_batching; + self + } + + pub fn with_max_batch_size(mut self, max_batch_size: Option) -> Self { + self.concurrency.max_batch_size = max_batch_size; + self + } + pub fn with_max_query_depth(mut self, max_query_depth: Option) -> Self { self.schema.max_query_depth = max_query_depth; self @@ -252,6 +262,12 @@ pub fn load_config( if let Ok(exclusive_writes) = settings.get::("concurrency.exclusive_writes") { app_config_builder = app_config_builder.with_exclusive_writes(exclusive_writes); } + if let Ok(disable_batching) = settings.get::("concurrency.disable_batching") { + app_config_builder = app_config_builder.with_disable_batching(disable_batching); + } + if let Ok(max_batch_size) = settings.get::>("concurrency.max_batch_size") { + app_config_builder = app_config_builder.with_max_batch_size(max_batch_size); + } if let Ok(max_query_depth) = settings.get::>("schema.max_query_depth") { app_config_builder = app_config_builder.with_max_query_depth(max_query_depth); diff --git a/raphtory-graphql/src/config/concurrency_config.rs b/raphtory-graphql/src/config/concurrency_config.rs index 59163afeb8..eb39c339c2 100644 --- a/raphtory-graphql/src/config/concurrency_config.rs +++ b/raphtory-graphql/src/config/concurrency_config.rs @@ -1,6 +1,7 @@ use serde::{Deserialize, Serialize}; pub const DEFAULT_EXCLUSIVE_WRITES: bool = false; +pub const DEFAULT_DISABLE_BATCHING: bool = false; /// Controls how Raphtory schedules concurrent GraphQL work. #[derive(Debug, Default, Deserialize, PartialEq, Clone, Serialize)] @@ -15,4 +16,14 @@ pub struct ConcurrencyConfig { /// Ensures only one ingestion/write operation runs at a time and blocks reads until /// it completes. pub exclusive_writes: bool, + + /// When true, query batching (sending multiple queries in a single HTTP request) is + /// rejected outright. Batching can otherwise be used to circumvent per-request depth + /// and complexity limits. + pub disable_batching: bool, + + /// Caps the number of queries accepted in a single batched HTTP request. Requests + /// whose batch exceeds this size are rejected. `None` means unlimited (subject to + /// `disable_batching`). + pub max_batch_size: Option, } From f10af018f09376af7521111ddfb145ecacbac827 Mon Sep 17 00:00:00 2001 From: miratepuffin Date: Fri, 17 Apr 2026 20:22:05 +0100 Subject: [PATCH 04/11] Added flags for disabling lists --- raphtory-graphql/src/cli.rs | 22 ++++++++++- raphtory-graphql/src/config/app_config.rs | 16 ++++++++ .../src/config/concurrency_config.rs | 11 ++++++ .../src/model/graph/collection.rs | 38 ++++++++++++++++--- raphtory-graphql/src/server.rs | 1 + 5 files changed, 81 insertions(+), 7 deletions(-) diff --git a/raphtory-graphql/src/cli.rs b/raphtory-graphql/src/cli.rs index d52237bce1..ee65037544 100644 --- a/raphtory-graphql/src/cli.rs +++ b/raphtory-graphql/src/cli.rs @@ -5,7 +5,9 @@ use crate::{ app_config::AppConfigBuilder, auth_config::{DEFAULT_AUTH_ENABLED_FOR_READS, PUBLIC_KEY_DECODING_ERR_MSG}, cache_config::{DEFAULT_CAPACITY, DEFAULT_TTI_SECONDS}, - concurrency_config::{DEFAULT_DISABLE_BATCHING, DEFAULT_EXCLUSIVE_WRITES}, + concurrency_config::{ + DEFAULT_DISABLE_BATCHING, DEFAULT_DISABLE_LISTS, DEFAULT_EXCLUSIVE_WRITES, + }, log_config::DEFAULT_LOG_LEVEL, otlp_config::{ TracingLevel, DEFAULT_OTLP_AGENT_HOST, DEFAULT_OTLP_AGENT_PORT, @@ -112,6 +114,22 @@ struct ServerArgs { )] max_batch_size: Option, + #[arg( + long, + env = "RAPHTORY_DISABLE_LISTS", + default_value_t = DEFAULT_DISABLE_LISTS, + help = "Completely disables bulk list endpoints (e.g. listing all nodes/edges). Essential for large graphs where unbounded list queries could return billions of results and exhaust server resources." + )] + disable_lists: bool, + + #[arg( + long, + env = "RAPHTORY_MAX_PAGE_SIZE", + default_value = None, + help = "Maximum page size enforced on paged collection queries. Caps the `limit` argument of `page` so clients can't circumvent `disable_lists` by requesting huge pages." + )] + max_page_size: Option, + #[arg( long, env = "RAPHTORY_MAX_QUERY_DEPTH", @@ -193,6 +211,8 @@ where .with_exclusive_writes(server_args.exclusive_writes) .with_disable_batching(server_args.disable_batching) .with_max_batch_size(server_args.max_batch_size) + .with_disable_lists(server_args.disable_lists) + .with_max_page_size(server_args.max_page_size) .with_max_query_depth(server_args.max_query_depth) .with_max_query_complexity(server_args.max_query_complexity) .with_max_recursive_depth(server_args.max_recursive_depth) diff --git a/raphtory-graphql/src/config/app_config.rs b/raphtory-graphql/src/config/app_config.rs index 70223a8ec1..42ce88d27b 100644 --- a/raphtory-graphql/src/config/app_config.rs +++ b/raphtory-graphql/src/config/app_config.rs @@ -138,6 +138,16 @@ impl AppConfigBuilder { self } + pub fn with_disable_lists(mut self, disable_lists: bool) -> Self { + self.concurrency.disable_lists = disable_lists; + self + } + + pub fn with_max_page_size(mut self, max_page_size: Option) -> Self { + self.concurrency.max_page_size = max_page_size; + self + } + pub fn with_max_query_depth(mut self, max_query_depth: Option) -> Self { self.schema.max_query_depth = max_query_depth; self @@ -268,6 +278,12 @@ pub fn load_config( if let Ok(max_batch_size) = settings.get::>("concurrency.max_batch_size") { app_config_builder = app_config_builder.with_max_batch_size(max_batch_size); } + if let Ok(disable_lists) = settings.get::("concurrency.disable_lists") { + app_config_builder = app_config_builder.with_disable_lists(disable_lists); + } + if let Ok(max_page_size) = settings.get::>("concurrency.max_page_size") { + app_config_builder = app_config_builder.with_max_page_size(max_page_size); + } if let Ok(max_query_depth) = settings.get::>("schema.max_query_depth") { app_config_builder = app_config_builder.with_max_query_depth(max_query_depth); diff --git a/raphtory-graphql/src/config/concurrency_config.rs b/raphtory-graphql/src/config/concurrency_config.rs index eb39c339c2..5e7d9f754f 100644 --- a/raphtory-graphql/src/config/concurrency_config.rs +++ b/raphtory-graphql/src/config/concurrency_config.rs @@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize}; pub const DEFAULT_EXCLUSIVE_WRITES: bool = false; pub const DEFAULT_DISABLE_BATCHING: bool = false; +pub const DEFAULT_DISABLE_LISTS: bool = false; /// Controls how Raphtory schedules concurrent GraphQL work. #[derive(Debug, Default, Deserialize, PartialEq, Clone, Serialize)] @@ -26,4 +27,14 @@ pub struct ConcurrencyConfig { /// whose batch exceeds this size are rejected. `None` means unlimited (subject to /// `disable_batching`). pub max_batch_size: Option, + + /// When true, completely disables bulk list endpoints (e.g. `list` on a collection). + /// Essential for large graphs where unbounded list queries could return billions of + /// results and exhaust server resources. Clients should use `page` instead. + pub disable_lists: bool, + + /// Maximum page size enforced on paged collection queries. Caps the `limit` argument + /// of `page` so clients can't circumvent `disable_lists` by requesting huge pages. + /// `None` means unlimited. + pub max_page_size: Option, } diff --git a/raphtory-graphql/src/model/graph/collection.rs b/raphtory-graphql/src/model/graph/collection.rs index aa345e984a..fa8ba72e5b 100644 --- a/raphtory-graphql/src/model/graph/collection.rs +++ b/raphtory-graphql/src/model/graph/collection.rs @@ -1,4 +1,5 @@ -use crate::rayon::blocking_compute; +use crate::{config::concurrency_config::ConcurrencyConfig, rayon::blocking_compute}; +use async_graphql::{Context, Error, Result}; use dynamic_graphql::{ internal::{OutputTypeName, ResolveOwned, TypeName}, ResolvedObject, ResolvedObjectFields, @@ -47,18 +48,43 @@ where T: for<'a> ResolveOwned<'a>, { /// Returns a list of collection objects. - async fn list(&self) -> Vec { + async fn list(&self, ctx: &Context<'_>) -> Result> { + if ctx + .data_opt::() + .map(|cfg| cfg.disable_lists) + .unwrap_or(false) + { + return Err(Error::new( + "Bulk list endpoints are disabled on this server. Use `page` instead.", + )); + } let self_clone = self.clone(); - blocking_compute(move || self_clone.items.to_vec()).await + Ok(blocking_compute(move || self_clone.items.to_vec()).await) } /// Fetch one page with a number of items up to a specified limit, optionally offset by a specified amount. The page_index sets the number of pages to skip (defaults to 0). /// /// For example, if page(5, 2, 1) is called, a page with 5 items, offset by 11 items (2 pages of 5 + 1), /// will be returned. - async fn page(&self, limit: usize, offset: Option, page_index: Option) -> Vec { + async fn page( + &self, + ctx: &Context<'_>, + limit: usize, + offset: Option, + page_index: Option, + ) -> Result> { + if let Some(max) = ctx + .data_opt::() + .and_then(|cfg| cfg.max_page_size) + { + if limit > max { + return Err(Error::new(format!( + "page limit {limit} exceeds the maximum allowed page size {max}" + ))); + } + } let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .items @@ -68,7 +94,7 @@ where .cloned() .collect() }) - .await + .await) } /// Returns a count of collection objects. diff --git a/raphtory-graphql/src/server.rs b/raphtory-graphql/src/server.rs index e9c5263f8a..17b65732be 100644 --- a/raphtory-graphql/src/server.rs +++ b/raphtory-graphql/src/server.rs @@ -250,6 +250,7 @@ impl GraphServer { let schema_cfg = &self.config.schema; let mut schema_builder = App::create_schema() .data(self.data.clone()) + .data(self.config.concurrency.clone()) .extension(MutationAuth); if let Some(depth) = schema_cfg.max_query_depth { schema_builder = schema_builder.limit_depth(depth); From 40debbb4dbd41088faf514696d3fdf73995ddd35 Mon Sep 17 00:00:00 2001 From: miratepuffin Date: Fri, 17 Apr 2026 20:30:03 +0100 Subject: [PATCH 05/11] add new flags to python --- raphtory-graphql/src/python/server/server.rs | 90 +++++++++++++++++++- 1 file changed, 86 insertions(+), 4 deletions(-) diff --git a/raphtory-graphql/src/python/server/server.rs b/raphtory-graphql/src/python/server/server.rs index 965e64866b..06a7a8b8e0 100644 --- a/raphtory-graphql/src/python/server/server.rs +++ b/raphtory-graphql/src/python/server/server.rs @@ -32,9 +32,20 @@ use std::{path::PathBuf, thread}; /// otlp_agent_port(str, optional): OTLP agent port for tracing /// otlp_tracing_service_name (str, optional): The OTLP tracing service name /// config_path (str | PathLike, optional): Path to the config file -/// auth_public_key: -/// auth_enabled_for_reads: -/// create_index: +/// auth_public_key (str, optional): Base64-encoded public key used to verify bearer tokens +/// auth_enabled_for_reads (bool, optional): Require auth tokens for read queries +/// create_index (bool, optional): Build a search index on startup +/// heavy_query_limit (int, optional): Maximum number of expensive traversal queries (outComponent, inComponent, edges, outEdges, inEdges, neighbours, outNeighbours, inNeighbours) allowed to run simultaneously. Extra queries are parked on a semaphore. +/// exclusive_writes (bool, optional): If True, ingestion/write operations run one at a time and block reads until complete. +/// disable_batching (bool, optional): If True, batched GraphQL requests are rejected. Prevents bypassing per-request depth/complexity limits. +/// max_batch_size (int, optional): Caps the number of queries accepted in a single batched request. +/// disable_lists (bool, optional): If True, bulk `list` endpoints on collections are disabled. Clients must use `page` instead. +/// max_page_size (int, optional): Maximum page size allowed on paged collection queries. +/// max_query_depth (int, optional): Maximum nesting depth of a query. +/// max_query_complexity (int, optional): Maximum estimated cost of a query, based on the number of fields selected. +/// max_recursive_depth (int, optional): Internal safety limit to prevent stack overflows from pathologically structured queries (async-graphql default is 32). +/// max_directives_per_field (int, optional): Maximum number of directives on any single field. +/// disable_introspection (bool, optional): If True, schema introspection is disabled entirely. #[pyclass(name = "GraphServer", module = "raphtory.graphql")] pub struct PyGraphServer(GraphServer); @@ -68,7 +79,32 @@ fn template_from_python( impl PyGraphServer { #[new] #[pyo3( - signature = (work_dir, cache_capacity = None, cache_tti_seconds = None, log_level = None, tracing=None, tracing_level=None, otlp_agent_host=None, otlp_agent_port=None, otlp_tracing_service_name=None, auth_public_key=None, auth_enabled_for_reads=None, config_path = None, create_index = None) + signature = ( + work_dir, + cache_capacity = None, + cache_tti_seconds = None, + log_level = None, + tracing=None, + tracing_level=None, + otlp_agent_host=None, + otlp_agent_port=None, + otlp_tracing_service_name=None, + auth_public_key=None, + auth_enabled_for_reads=None, + config_path = None, + create_index = None, + heavy_query_limit = None, + exclusive_writes = None, + disable_batching = None, + max_batch_size = None, + disable_lists = None, + max_page_size = None, + max_query_depth = None, + max_query_complexity = None, + max_recursive_depth = None, + max_directives_per_field = None, + disable_introspection = None, + ) )] fn py_new( work_dir: PathBuf, @@ -84,6 +120,17 @@ impl PyGraphServer { auth_enabled_for_reads: Option, config_path: Option, create_index: Option, + heavy_query_limit: Option, + exclusive_writes: Option, + disable_batching: Option, + max_batch_size: Option, + disable_lists: Option, + max_page_size: Option, + max_query_depth: Option, + max_query_complexity: Option, + max_recursive_depth: Option, + max_directives_per_field: Option, + disable_introspection: Option, ) -> PyResult { let mut app_config_builder = AppConfigBuilder::new(); if let Some(log_level) = log_level { @@ -129,6 +176,41 @@ impl PyGraphServer { if let Some(create_index) = create_index { app_config_builder = app_config_builder.with_create_index(create_index); } + if heavy_query_limit.is_some() { + app_config_builder = app_config_builder.with_heavy_query_limit(heavy_query_limit); + } + if let Some(exclusive_writes) = exclusive_writes { + app_config_builder = app_config_builder.with_exclusive_writes(exclusive_writes); + } + if let Some(disable_batching) = disable_batching { + app_config_builder = app_config_builder.with_disable_batching(disable_batching); + } + if max_batch_size.is_some() { + app_config_builder = app_config_builder.with_max_batch_size(max_batch_size); + } + if let Some(disable_lists) = disable_lists { + app_config_builder = app_config_builder.with_disable_lists(disable_lists); + } + if max_page_size.is_some() { + app_config_builder = app_config_builder.with_max_page_size(max_page_size); + } + if max_query_depth.is_some() { + app_config_builder = app_config_builder.with_max_query_depth(max_query_depth); + } + if max_query_complexity.is_some() { + app_config_builder = app_config_builder.with_max_query_complexity(max_query_complexity); + } + if max_recursive_depth.is_some() { + app_config_builder = app_config_builder.with_max_recursive_depth(max_recursive_depth); + } + if max_directives_per_field.is_some() { + app_config_builder = + app_config_builder.with_max_directives_per_field(max_directives_per_field); + } + if let Some(disable_introspection) = disable_introspection { + app_config_builder = + app_config_builder.with_disable_introspection(disable_introspection); + } let app_config = Some(app_config_builder.build()); let server = block_on(GraphServer::new( From 9b8e2f66661f0583e60d3766c4112b24356a6ee1 Mon Sep 17 00:00:00 2001 From: miratepuffin Date: Fri, 17 Apr 2026 21:51:35 +0100 Subject: [PATCH 06/11] Add tests --- python/python/raphtory/graphql/__init__.pyi | 34 +- .../test_graphql/test_server_flags.py | 296 ++++++++++++++++++ raphtory-graphql/schema.graphql | 1 - .../src/model/graph/collection.rs | 52 +-- raphtory-graphql/src/model/graph/edges.rs | 15 +- raphtory-graphql/src/model/graph/history.rs | 136 +++++--- raphtory-graphql/src/model/graph/nodes.rs | 15 +- .../src/model/graph/path_from_node.rs | 15 +- raphtory-graphql/src/model/graph/windowset.rs | 96 ++++-- 9 files changed, 540 insertions(+), 120 deletions(-) create mode 100644 python/tests/test_base_install/test_graphql/test_server_flags.py diff --git a/python/python/raphtory/graphql/__init__.pyi b/python/python/raphtory/graphql/__init__.pyi index 783999e6b9..2689620863 100644 --- a/python/python/raphtory/graphql/__init__.pyi +++ b/python/python/raphtory/graphql/__init__.pyi @@ -60,9 +60,20 @@ class GraphServer(object): otlp_agent_port(str, optional): OTLP agent port for tracing otlp_tracing_service_name (str, optional): The OTLP tracing service name config_path (str | PathLike, optional): Path to the config file - auth_public_key: - auth_enabled_for_reads: - create_index: + auth_public_key (str, optional): Base64-encoded public key used to verify bearer tokens + auth_enabled_for_reads (bool, optional): Require auth tokens for read queries + create_index (bool, optional): Build a search index on startup + heavy_query_limit (int, optional): Maximum number of expensive traversal queries (outComponent, inComponent, edges, outEdges, inEdges, neighbours, outNeighbours, inNeighbours) allowed to run simultaneously. Extra queries are parked on a semaphore. + exclusive_writes (bool, optional): If True, ingestion/write operations run one at a time and block reads until complete. + disable_batching (bool, optional): If True, batched GraphQL requests are rejected. Prevents bypassing per-request depth/complexity limits. + max_batch_size (int, optional): Caps the number of queries accepted in a single batched request. + disable_lists (bool, optional): If True, bulk `list` endpoints on collections are disabled. Clients must use `page` instead. + max_page_size (int, optional): Maximum page size allowed on paged collection queries. + max_query_depth (int, optional): Maximum nesting depth of a query. + max_query_complexity (int, optional): Maximum estimated cost of a query, based on the number of fields selected. + max_recursive_depth (int, optional): Internal safety limit to prevent stack overflows from pathologically structured queries (async-graphql default is 32). + max_directives_per_field (int, optional): Maximum number of directives on any single field. + disable_introspection (bool, optional): If True, schema introspection is disabled entirely. """ def __new__( @@ -76,10 +87,21 @@ class GraphServer(object): otlp_agent_host: Optional[str] = None, otlp_agent_port: Optional[str] = None, otlp_tracing_service_name: Optional[str] = None, - auth_public_key: Any = None, - auth_enabled_for_reads: Any = None, + auth_public_key: Optional[str] = None, + auth_enabled_for_reads: Optional[bool] = None, config_path: Optional[str | PathLike] = None, - create_index: Any = None, + create_index: Optional[bool] = None, + heavy_query_limit: Optional[int] = None, + exclusive_writes: Optional[bool] = None, + disable_batching: Optional[bool] = None, + max_batch_size: Optional[int] = None, + disable_lists: Optional[bool] = None, + max_page_size: Optional[int] = None, + max_query_depth: Optional[int] = None, + max_query_complexity: Optional[int] = None, + max_recursive_depth: Optional[int] = None, + max_directives_per_field: Optional[int] = None, + disable_introspection: Optional[bool] = None, ) -> GraphServer: """Create and return a new object. See help(type) for accurate signature.""" diff --git a/python/tests/test_base_install/test_graphql/test_server_flags.py b/python/tests/test_base_install/test_graphql/test_server_flags.py new file mode 100644 index 0000000000..39d8c6e735 --- /dev/null +++ b/python/tests/test_base_install/test_graphql/test_server_flags.py @@ -0,0 +1,296 @@ +import json +import tempfile +import urllib.error +import urllib.request + +import pytest +from raphtory import Graph +from raphtory.graphql import GraphServer, RaphtoryClient + +SERVER_URL = "http://localhost:1736" + + +def batch_query(body): + """POST a raw JSON body (needed for batch requests — the client only sends single queries).""" + data = json.dumps(body).encode("utf-8") + req = urllib.request.Request( + SERVER_URL + "/", + data=data, + headers={"Content-Type": "application/json"}, + method="POST", + ) + try: + with urllib.request.urlopen(req) as resp: + return resp.status, json.loads(resp.read()) + except urllib.error.HTTPError as e: + raw = e.read() + try: + return e.code, json.loads(raw) + except ValueError: + return e.code, raw.decode("utf-8", errors="replace") + + +def make_graph(client, path="g"): + g = Graph() + g.add_edge(1, "ben", "hamza") + g.add_edge(2, "lucas", "hamza") + g.add_edge(3, "ben", "lucas") + client.send_graph(path, g, overwrite=True) + + +def test_introspection_enabled_by_default(): + work_dir = tempfile.mkdtemp() + with GraphServer(work_dir).start(): + client = RaphtoryClient(SERVER_URL) + result = client.query("{ __schema { queryType { name } } }") + assert result["__schema"]["queryType"]["name"] + + +def test_disable_introspection(): + work_dir = tempfile.mkdtemp() + with GraphServer(work_dir, disable_introspection=True).start(): + client = RaphtoryClient(SERVER_URL) + client.query("{ version }") + + with pytest.raises(Exception) as excinfo: + client.query("{ __schema { queryType { name } } }") + msg = str(excinfo.value) + assert "Unknown field" in msg and "__schema" in msg + + +def test_max_query_depth(): + work_dir = tempfile.mkdtemp() + with GraphServer(work_dir, max_query_depth=3).start(): + client = RaphtoryClient(SERVER_URL) + make_graph(client) + + client.query('{ graph(path: "g") { created } }') + + with pytest.raises(Exception) as excinfo: + client.query( + '{ graph(path: "g") { nodes { page(limit: 5) { edges { page(limit: 5) { src { name } } } } } } }' + ) + assert "Query is nested too deep." in str(excinfo.value) + + +def test_max_query_complexity(): + work_dir = tempfile.mkdtemp() + with GraphServer(work_dir, max_query_complexity=3).start(): + client = RaphtoryClient(SERVER_URL) + make_graph(client) + + client.query("{ version }") + + with pytest.raises(Exception) as excinfo: + client.query( + '{ graph(path: "g") { nodes { page(limit: 5) { name id earliestTime latestTime } } } }' + ) + assert "Query is too complex." in str(excinfo.value) + + +# (field path, query) pairs covering every list-returning resolver: +# GqlCollection, GqlNodes, GqlEdges, GqlPathFromNode, GqlHistory, GqlHistoryTimestamp, +# GqlHistoryDateTime, GqlHistoryEventId, GqlIntervals, and the six WindowSet types. +LIST_QUERIES = [ + ("collection (namespaces)", "{ namespaces { list { path } } }"), + ("GqlNodes", '{ graph(path: "g") { nodes { list { name } } } }'), + ("GqlEdges", '{ graph(path: "g") { edges { list { src { name } } } } }'), + ( + "GqlPathFromNode", + '{ graph(path: "g") { node(name: "ben") { neighbours { list { name } } } } }', + ), + ( + "GqlHistory", + '{ graph(path: "g") { node(name: "ben") { history { list { timestamp } } } } }', + ), + ( + "GqlHistoryTimestamp", + '{ graph(path: "g") { node(name: "ben") { history { timestamps { list } } } } }', + ), + ( + "GqlHistoryDateTime", + '{ graph(path: "g") { node(name: "ben") { history { datetimes { list } } } } }', + ), + ( + "GqlHistoryEventId", + '{ graph(path: "g") { node(name: "ben") { history { eventId { list } } } } }', + ), + ( + "GqlIntervals", + '{ graph(path: "g") { node(name: "ben") { history { intervals { list } } } } }', + ), + ( + "GqlGraphWindowSet", + '{ graph(path: "g") { rolling(window: {epoch: 1}) { list { earliestTime { timestamp } } } } }', + ), + ( + "GqlNodeWindowSet", + '{ graph(path: "g") { node(name: "ben") { rolling(window: {epoch: 1}) { list { name } } } } }', + ), + ( + "GqlNodesWindowSet", + '{ graph(path: "g") { nodes { rolling(window: {epoch: 1}) { list { count } } } } }', + ), + ( + "GqlPathFromNodeWindowSet", + '{ graph(path: "g") { node(name: "ben") { neighbours { rolling(window: {epoch: 1}) { list { count } } } } } }', + ), + ( + "GqlEdgeWindowSet", + '{ graph(path: "g") { edge(src: "ben", dst: "hamza") { rolling(window: {epoch: 1}) { list { src { name } } } } } }', + ), + ( + "GqlEdgesWindowSet", + '{ graph(path: "g") { edges { rolling(window: {epoch: 1}) { list { count } } } } }', + ), +] + +# Same resolvers reached via `page(limit: 50)` — chosen so we can compare against a small +# `max_page_size` limit and trigger the exceeded-size error. +PAGE_QUERIES = [ + ("collection (namespaces)", "{ namespaces { page(limit: 50) { path } } }"), + ("GqlNodes", '{ graph(path: "g") { nodes { page(limit: 50) { name } } } }'), + ( + "GqlEdges", + '{ graph(path: "g") { edges { page(limit: 50) { src { name } } } } }', + ), + ( + "GqlPathFromNode", + '{ graph(path: "g") { node(name: "ben") { neighbours { page(limit: 50) { name } } } } }', + ), + ( + "GqlHistory", + '{ graph(path: "g") { node(name: "ben") { history { page(limit: 50) { timestamp } } } } }', + ), + ( + "GqlHistoryTimestamp", + '{ graph(path: "g") { node(name: "ben") { history { timestamps { page(limit: 50) } } } } }', + ), + ( + "GqlHistoryDateTime", + '{ graph(path: "g") { node(name: "ben") { history { datetimes { page(limit: 50) } } } } }', + ), + ( + "GqlHistoryEventId", + '{ graph(path: "g") { node(name: "ben") { history { eventId { page(limit: 50) } } } } }', + ), + ( + "GqlIntervals", + '{ graph(path: "g") { node(name: "ben") { history { intervals { page(limit: 50) } } } } }', + ), + ( + "GqlGraphWindowSet", + '{ graph(path: "g") { rolling(window: {epoch: 1}) { page(limit: 50) { earliestTime { timestamp } } } } }', + ), + ( + "GqlNodeWindowSet", + '{ graph(path: "g") { node(name: "ben") { rolling(window: {epoch: 1}) { page(limit: 50) { name } } } } }', + ), + ( + "GqlNodesWindowSet", + '{ graph(path: "g") { nodes { rolling(window: {epoch: 1}) { page(limit: 50) { count } } } } }', + ), + ( + "GqlPathFromNodeWindowSet", + '{ graph(path: "g") { node(name: "ben") { neighbours { rolling(window: {epoch: 1}) { page(limit: 50) { count } } } } } }', + ), + ( + "GqlEdgeWindowSet", + '{ graph(path: "g") { edge(src: "ben", dst: "hamza") { rolling(window: {epoch: 1}) { page(limit: 50) { src { name } } } } } }', + ), + ( + "GqlEdgesWindowSet", + '{ graph(path: "g") { edges { rolling(window: {epoch: 1}) { page(limit: 50) { count } } } } }', + ), +] + + +def test_disable_lists_all_resolvers(): + """Every `list` endpoint across every paginated type rejects with the same error.""" + work_dir = tempfile.mkdtemp() + with GraphServer(work_dir, disable_lists=True).start(): + client = RaphtoryClient(SERVER_URL) + make_graph(client) + + for name, query in LIST_QUERIES: + with pytest.raises(Exception) as excinfo: + client.query(query) + assert ( + "Bulk list endpoints are disabled on this server. Use `page` instead." + in str(excinfo.value) + ), f"{name} did not reject with the expected error: {excinfo.value}" + + +def test_disable_lists_page_still_works(): + """Even with `disable_lists=True`, `page` queries still succeed.""" + work_dir = tempfile.mkdtemp() + with GraphServer(work_dir, disable_lists=True).start(): + client = RaphtoryClient(SERVER_URL) + make_graph(client) + result = client.query( + '{ graph(path: "g") { nodes { page(limit: 10) { name } } } }' + ) + assert len(result["graph"]["nodes"]["page"]) == 3 + + +def test_max_page_size_all_resolvers(): + """Every `page` endpoint across every paginated type enforces max_page_size.""" + work_dir = tempfile.mkdtemp() + with GraphServer(work_dir, max_page_size=2).start(): + client = RaphtoryClient(SERVER_URL) + make_graph(client) + + for name, query in PAGE_QUERIES: + with pytest.raises(Exception) as excinfo: + client.query(query) + assert "page limit 50 exceeds the maximum allowed page size 2" in str( + excinfo.value + ), f"{name} did not reject with the expected error: {excinfo.value}" + + +def test_max_page_size_under_cap_works(): + """Pages at or below max_page_size still succeed.""" + work_dir = tempfile.mkdtemp() + with GraphServer(work_dir, max_page_size=2).start(): + client = RaphtoryClient(SERVER_URL) + make_graph(client) + result = client.query( + '{ graph(path: "g") { nodes { page(limit: 2) { name } } } }' + ) + assert len(result["graph"]["nodes"]["page"]) == 2 + + +def test_disable_batching(): + work_dir = tempfile.mkdtemp() + with GraphServer(work_dir, disable_batching=True).start(): + RaphtoryClient(SERVER_URL).query("{ version }") + + status, body = batch_query([{"query": "{ version }"}, {"query": "{ version }"}]) + assert status == 400 + assert "Query batching is disabled on this server" in str(body) + + +def test_max_batch_size(): + work_dir = tempfile.mkdtemp() + with GraphServer(work_dir, max_batch_size=2).start(): + status, body = batch_query([{"query": "{ version }"}] * 2) + assert status == 200 + assert isinstance(body, list) and len(body) == 2 + + status, body = batch_query([{"query": "{ version }"}] * 3) + assert status == 400 + assert "Batch size 3 exceeds the maximum allowed 2" in str(body) + + +def test_flags_smoke(): + """Server accepts the remaining concurrency/schema knobs and still serves queries.""" + work_dir = tempfile.mkdtemp() + with GraphServer( + work_dir, + heavy_query_limit=4, + exclusive_writes=True, + max_recursive_depth=64, + max_directives_per_field=2, + ).start(): + client = RaphtoryClient(SERVER_URL) + assert client.query("{ version }")["version"] diff --git a/raphtory-graphql/schema.graphql b/raphtory-graphql/schema.graphql index dbe88f7daa..28956786d7 100644 --- a/raphtory-graphql/schema.graphql +++ b/raphtory-graphql/schema.graphql @@ -3491,4 +3491,3 @@ schema { query: QueryRoot mutation: MutRoot } - diff --git a/raphtory-graphql/src/model/graph/collection.rs b/raphtory-graphql/src/model/graph/collection.rs index fa8ba72e5b..21e1673239 100644 --- a/raphtory-graphql/src/model/graph/collection.rs +++ b/raphtory-graphql/src/model/graph/collection.rs @@ -6,6 +6,37 @@ use dynamic_graphql::{ }; use std::{borrow::Cow, sync::Arc}; +/// Returns an error when `concurrency.disable_lists` is set. Called from every `list` +/// resolver on paginated collections. +pub(crate) fn check_list_allowed(ctx: &Context<'_>) -> Result<()> { + if ctx + .data_opt::() + .map(|cfg| cfg.disable_lists) + .unwrap_or(false) + { + return Err(Error::new( + "Bulk list endpoints are disabled on this server. Use `page` instead.", + )); + } + Ok(()) +} + +/// Returns an error when `limit` exceeds `concurrency.max_page_size`. Called from every +/// `page` resolver on paginated collections. +pub(crate) fn check_page_limit(ctx: &Context<'_>, limit: usize) -> Result<()> { + if let Some(max) = ctx + .data_opt::() + .and_then(|cfg| cfg.max_page_size) + { + if limit > max { + return Err(Error::new(format!( + "page limit {limit} exceeds the maximum allowed page size {max}" + ))); + } + } + Ok(()) +} + /// Collection of items #[derive(ResolvedObject, Clone)] #[graphql(get_type_name = true)] @@ -49,15 +80,7 @@ where { /// Returns a list of collection objects. async fn list(&self, ctx: &Context<'_>) -> Result> { - if ctx - .data_opt::() - .map(|cfg| cfg.disable_lists) - .unwrap_or(false) - { - return Err(Error::new( - "Bulk list endpoints are disabled on this server. Use `page` instead.", - )); - } + check_list_allowed(ctx)?; let self_clone = self.clone(); Ok(blocking_compute(move || self_clone.items.to_vec()).await) } @@ -73,16 +96,7 @@ where offset: Option, page_index: Option, ) -> Result> { - if let Some(max) = ctx - .data_opt::() - .and_then(|cfg| cfg.max_page_size) - { - if limit > max { - return Err(Error::new(format!( - "page limit {limit} exceeds the maximum allowed page size {max}" - ))); - } - } + check_page_limit(ctx, limit)?; let self_clone = self.clone(); Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); diff --git a/raphtory-graphql/src/model/graph/edges.rs b/raphtory-graphql/src/model/graph/edges.rs index f83d21c67d..e393c327e4 100644 --- a/raphtory-graphql/src/model/graph/edges.rs +++ b/raphtory-graphql/src/model/graph/edges.rs @@ -1,6 +1,7 @@ use crate::{ model::{ graph::{ + collection::{check_list_allowed, check_page_limit}, edge::GqlEdge, filtering::EdgesViewCollection, timeindex::{GqlEventTime, GqlTimeInput}, @@ -11,6 +12,7 @@ use crate::{ }, rayon::blocking_compute, }; +use async_graphql::Context; use dynamic_graphql::{ResolvedObject, ResolvedObjectFields}; use itertools::Itertools; use raphtory::{ @@ -344,22 +346,25 @@ impl GqlEdges { /// will be returned. async fn page( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone.iter().skip(start).take(limit).collect() }) - .await + .await) } /// Returns a list of all objects in the current selection of the collection. You should filter the collection first then call list. - async fn list(&self) -> Vec { + async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || self_clone.iter().collect()).await + Ok(blocking_compute(move || self_clone.iter().collect()).await) } /// Returns a filtered view that applies to list down the chain diff --git a/raphtory-graphql/src/model/graph/history.rs b/raphtory-graphql/src/model/graph/history.rs index 58c15c34d3..7e66f5db17 100644 --- a/raphtory-graphql/src/model/graph/history.rs +++ b/raphtory-graphql/src/model/graph/history.rs @@ -1,8 +1,11 @@ use crate::{ - model::graph::timeindex::{dt_format_str_is_valid, GqlEventTime}, + model::graph::{ + collection::{check_list_allowed, check_page_limit}, + timeindex::{dt_format_str_is_valid, GqlEventTime}, + }, rayon::blocking_compute, }; -use async_graphql::Error; +use async_graphql::{Context, Error}; use dynamic_graphql::{ResolvedObject, ResolvedObjectFields}; use raphtory::db::api::view::history::{ History, HistoryDateTime, HistoryEventId, HistoryTimestamp, InternalHistoryOps, Intervals, @@ -50,15 +53,20 @@ impl GqlHistory { } /// List all time entries present in this history. - async fn list(&self) -> Vec { + async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || self_clone.history.iter().map(|t| t.into()).collect()).await + Ok(blocking_compute(move || self_clone.history.iter().map(|t| t.into()).collect()).await) } /// List all time entries present in this history in reverse order. - async fn list_rev(&self) -> Vec { + async fn list_rev(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || self_clone.history.iter_rev().map(|t| t.into()).collect()).await + Ok( + blocking_compute(move || self_clone.history.iter_rev().map(|t| t.into()).collect()) + .await, + ) } /// Fetch one page of EventTime entries with a number of items up to a specified limit, @@ -68,12 +76,14 @@ impl GqlHistory { /// will be returned. async fn page( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .history @@ -83,7 +93,7 @@ impl GqlHistory { .map(|t| t.into()) .collect() }) - .await + .await) } /// Fetch one page of EventTime entries with a number of items up to a specified limit, @@ -93,12 +103,14 @@ impl GqlHistory { /// will be returned. async fn page_rev( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .history @@ -108,7 +120,7 @@ impl GqlHistory { .map(|t| t.into()) .collect() }) - .await + .await) } /// Returns True if the history is empty. @@ -176,15 +188,17 @@ pub struct GqlHistoryTimestamp { #[ResolvedObjectFields] impl GqlHistoryTimestamp { /// List all timestamps. - async fn list(&self) -> Vec { + async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || self_clone.history_t.collect()).await + Ok(blocking_compute(move || self_clone.history_t.collect()).await) } /// List all timestamps in reverse order. - async fn list_rev(&self) -> Vec { + async fn list_rev(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || self_clone.history_t.collect_rev()).await + Ok(blocking_compute(move || self_clone.history_t.collect_rev()).await) } /// Fetch one page of timestamps with a number of items up to a specified limit, optionally offset by a specified amount. @@ -194,12 +208,14 @@ impl GqlHistoryTimestamp { /// will be returned. async fn page( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .history_t @@ -208,7 +224,7 @@ impl GqlHistoryTimestamp { .take(limit) .collect() }) - .await + .await) } /// Fetch one page of timestamps in reverse order with a number of items up to a specified limit, @@ -218,12 +234,14 @@ impl GqlHistoryTimestamp { /// will be returned. async fn page_rev( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .history_t @@ -232,7 +250,7 @@ impl GqlHistoryTimestamp { .take(limit) .collect() }) - .await + .await) } } @@ -249,7 +267,12 @@ impl GqlHistoryDateTime { /// List all datetimes formatted as strings. /// If filter_broken is set to True, time conversion errors will be ignored. If set to False, a TimeError /// will be raised on time conversion error. Defaults to False. - async fn list(&self, filter_broken: Option) -> Result, Error> { + async fn list( + &self, + ctx: &Context<'_>, + filter_broken: Option, + ) -> Result, Error> { + check_list_allowed(ctx)?; let self_clone = self.clone(); blocking_compute(move || { let fmt_string = self_clone.format_string.as_deref().unwrap_or("%+"); // %+ is RFC 3339 @@ -280,7 +303,12 @@ impl GqlHistoryDateTime { /// List all datetimes formatted as strings in reverse chronological order. /// If filter_broken is set to True, time conversion errors will be ignored. If set to False, a TimeError /// will be raised on time conversion error. Defaults to False. - async fn list_rev(&self, filter_broken: Option) -> Result, Error> { + async fn list_rev( + &self, + ctx: &Context<'_>, + filter_broken: Option, + ) -> Result, Error> { + check_list_allowed(ctx)?; let self_clone = self.clone(); blocking_compute(move || { let fmt_string = self_clone.format_string.as_deref().unwrap_or("%+"); // %+ is RFC 3339 @@ -317,11 +345,13 @@ impl GqlHistoryDateTime { /// will be returned. async fn page( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, filter_broken: Option, ) -> Result, Error> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); @@ -362,11 +392,13 @@ impl GqlHistoryDateTime { /// will be returned. async fn page_rev( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, filter_broken: Option, ) -> Result, Error> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); @@ -409,29 +441,31 @@ pub struct GqlHistoryEventId { #[ResolvedObjectFields] impl GqlHistoryEventId { /// List event ids. - async fn list(&self) -> Vec { + async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { self_clone .history_s .iter() .map(|s: usize| s as u64) .collect() }) - .await + .await) } /// List event ids in reverse order. - async fn list_rev(&self) -> Vec { + async fn list_rev(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { self_clone .history_s .iter_rev() .map(|s: usize| s as u64) .collect() }) - .await + .await) } /// Fetch one page of event ids with a number of items up to a specified limit, @@ -441,12 +475,14 @@ impl GqlHistoryEventId { /// will be returned. async fn page( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .history_s @@ -456,7 +492,7 @@ impl GqlHistoryEventId { .map(|s: usize| s as u64) .collect() }) - .await + .await) } /// Fetch one page of event ids in reverse chronological order with a number of items up to a specified limit, @@ -466,12 +502,14 @@ impl GqlHistoryEventId { /// will be returned. async fn page_rev( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .history_s @@ -481,7 +519,7 @@ impl GqlHistoryEventId { .map(|s: usize| s as u64) .collect() }) - .await + .await) } } @@ -495,15 +533,17 @@ pub struct GqlIntervals { #[ResolvedObjectFields] impl GqlIntervals { /// List time intervals between consecutive timestamps in milliseconds. - async fn list(&self) -> Vec { + async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || self_clone.intervals.collect()).await + Ok(blocking_compute(move || self_clone.intervals.collect()).await) } /// List millisecond time intervals between consecutive timestamps in reverse order. - async fn list_rev(&self) -> Vec { + async fn list_rev(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || self_clone.intervals.collect_rev()).await + Ok(blocking_compute(move || self_clone.intervals.collect_rev()).await) } /// Fetch one page of intervals between consecutive timestamps with a number of items up to a specified limit, @@ -513,12 +553,14 @@ impl GqlIntervals { /// will be returned. async fn page( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .intervals @@ -527,7 +569,7 @@ impl GqlIntervals { .take(limit) .collect() }) - .await + .await) } /// Fetch one page of intervals between consecutive timestamps in reverse order with a number of items up to a specified limit, @@ -537,12 +579,14 @@ impl GqlIntervals { /// will be returned. async fn page_rev( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .intervals @@ -551,7 +595,7 @@ impl GqlIntervals { .take(limit) .collect() }) - .await + .await) } /// Compute the mean interval between consecutive timestamps. Returns None if fewer than 1 timestamp. diff --git a/raphtory-graphql/src/model/graph/nodes.rs b/raphtory-graphql/src/model/graph/nodes.rs index 82fa5a5077..99c6021f0b 100644 --- a/raphtory-graphql/src/model/graph/nodes.rs +++ b/raphtory-graphql/src/model/graph/nodes.rs @@ -1,6 +1,7 @@ use crate::{ model::{ graph::{ + collection::{check_list_allowed, check_page_limit}, filtering::{GqlNodeFilter, NodesViewCollection}, node::GqlNode, timeindex::{GqlEventTime, GqlTimeInput}, @@ -11,6 +12,7 @@ use crate::{ }, rayon::blocking_compute, }; +use async_graphql::{Context, Result}; use dynamic_graphql::{ResolvedObject, ResolvedObjectFields}; use itertools::Itertools; use raphtory::{ @@ -330,21 +332,24 @@ impl GqlNodes { /// will be returned. async fn page( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone.iter().skip(start).take(limit).collect() }) - .await + .await) } - async fn list(&self) -> Vec { + async fn list(&self, ctx: &Context<'_>) -> Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || self_clone.iter().collect()).await + Ok(blocking_compute(move || self_clone.iter().collect()).await) } /// Returns a view of the node ids. diff --git a/raphtory-graphql/src/model/graph/path_from_node.rs b/raphtory-graphql/src/model/graph/path_from_node.rs index e4562fea23..83a193f83f 100644 --- a/raphtory-graphql/src/model/graph/path_from_node.rs +++ b/raphtory-graphql/src/model/graph/path_from_node.rs @@ -1,5 +1,6 @@ use crate::{ model::graph::{ + collection::{check_list_allowed, check_page_limit}, filtering::{GqlNodeFilter, PathFromNodeViewCollection}, node::GqlNode, timeindex::{GqlEventTime, GqlTimeInput}, @@ -8,6 +9,7 @@ use crate::{ }, rayon::blocking_compute, }; +use async_graphql::Context; use dynamic_graphql::{ResolvedObject, ResolvedObjectFields}; use raphtory::{ core::utils::time::TryIntoInterval, @@ -205,21 +207,24 @@ impl GqlPathFromNode { /// will be returned. async fn page( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone.iter().skip(start).take(limit).collect() }) - .await + .await) } - async fn list(&self) -> Vec { + async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || self_clone.iter().collect()).await + Ok(blocking_compute(move || self_clone.iter().collect()).await) } /// Returns the node ids. diff --git a/raphtory-graphql/src/model/graph/windowset.rs b/raphtory-graphql/src/model/graph/windowset.rs index 32b1501029..da1fd6311e 100644 --- a/raphtory-graphql/src/model/graph/windowset.rs +++ b/raphtory-graphql/src/model/graph/windowset.rs @@ -1,11 +1,17 @@ use crate::{ model::graph::{ - edge::GqlEdge, edges::GqlEdges, graph::GqlGraph, node::GqlNode, nodes::GqlNodes, + collection::{check_list_allowed, check_page_limit}, + edge::GqlEdge, + edges::GqlEdges, + graph::GqlGraph, + node::GqlNode, + nodes::GqlNodes, path_from_node::GqlPathFromNode, }, paths::ExistingGraphFolder, rayon::blocking_compute, }; +use async_graphql::Context; use dynamic_graphql::{ResolvedObject, ResolvedObjectFields}; use raphtory::db::{ api::{ @@ -42,12 +48,14 @@ impl GqlGraphWindowSet { /// will be returned. async fn page( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .ws @@ -57,19 +65,20 @@ impl GqlGraphWindowSet { .map(|g| GqlGraph::new(self_clone.path.clone(), g)) .collect() }) - .await + .await) } - async fn list(&self) -> Vec { + async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { self_clone .ws .clone() .map(|g| GqlGraph::new(self_clone.path.clone(), g)) .collect() }) - .await + .await) } } @@ -98,12 +107,14 @@ impl GqlNodeWindowSet { /// will be returned. async fn page( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .ws @@ -113,12 +124,13 @@ impl GqlNodeWindowSet { .map(|n| n.into()) .collect() }) - .await + .await) } - async fn list(&self) -> Vec { + async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || self_clone.ws.clone().map(|n| n.into()).collect()).await + Ok(blocking_compute(move || self_clone.ws.clone().map(|n| n.into()).collect()).await) } } @@ -149,12 +161,14 @@ impl GqlNodesWindowSet { /// will be returned. async fn page( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .ws @@ -164,12 +178,16 @@ impl GqlNodesWindowSet { .map(|n| GqlNodes::new(n)) .collect() }) - .await + .await) } - async fn list(&self) -> Vec { + async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || self_clone.ws.clone().map(|n| GqlNodes::new(n)).collect()).await + Ok( + blocking_compute(move || self_clone.ws.clone().map(|n| GqlNodes::new(n)).collect()) + .await, + ) } } @@ -198,12 +216,14 @@ impl GqlPathFromNodeWindowSet { /// will be returned. async fn page( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .ws @@ -213,19 +233,20 @@ impl GqlPathFromNodeWindowSet { .map(|n| GqlPathFromNode::new(n)) .collect() }) - .await + .await) } - async fn list(&self) -> Vec { + async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { self_clone .ws .clone() .map(|n| GqlPathFromNode::new(n)) .collect() }) - .await + .await) } } @@ -254,12 +275,14 @@ impl GqlEdgeWindowSet { /// will be returned. async fn page( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .ws @@ -269,12 +292,13 @@ impl GqlEdgeWindowSet { .map(|e| e.into()) .collect() }) - .await + .await) } - async fn list(&self) -> Vec { + async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || self_clone.ws.clone().map(|e| e.into()).collect()).await + Ok(blocking_compute(move || self_clone.ws.clone().map(|e| e.into()).collect()).await) } } @@ -303,12 +327,14 @@ impl GqlEdgesWindowSet { /// will be returned. async fn page( &self, + ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Vec { + ) -> async_graphql::Result> { + check_page_limit(ctx, limit)?; let self_clone = self.clone(); - blocking_compute(move || { + Ok(blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .ws @@ -318,11 +344,15 @@ impl GqlEdgesWindowSet { .map(|e| GqlEdges::new(e)) .collect() }) - .await + .await) } - async fn list(&self) -> Vec { + async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { + check_list_allowed(ctx)?; let self_clone = self.clone(); - blocking_compute(move || self_clone.ws.clone().map(|e| GqlEdges::new(e)).collect()).await + Ok( + blocking_compute(move || self_clone.ws.clone().map(|e| GqlEdges::new(e)).collect()) + .await, + ) } } From c4e106d1a502179264ca5622fbfe18f3ab04727f Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sat, 18 Apr 2026 00:34:57 +0000 Subject: [PATCH 07/11] chore: apply tidy-public auto-fixes --- raphtory-graphql/schema.graphql | 1 + 1 file changed, 1 insertion(+) diff --git a/raphtory-graphql/schema.graphql b/raphtory-graphql/schema.graphql index 28956786d7..dbe88f7daa 100644 --- a/raphtory-graphql/schema.graphql +++ b/raphtory-graphql/schema.graphql @@ -3491,3 +3491,4 @@ schema { query: QueryRoot mutation: MutRoot } + From fe478f3dcb4ccea92781805203975fc6293c0ec9 Mon Sep 17 00:00:00 2001 From: miratepuffin Date: Sun, 19 Apr 2026 17:54:53 +0100 Subject: [PATCH 08/11] Finish tests --- .../test_graphql/test_server_flags.py | 52 +++++++++++++++++-- 1 file changed, 47 insertions(+), 5 deletions(-) diff --git a/python/tests/test_base_install/test_graphql/test_server_flags.py b/python/tests/test_base_install/test_graphql/test_server_flags.py index 39d8c6e735..3334577ebe 100644 --- a/python/tests/test_base_install/test_graphql/test_server_flags.py +++ b/python/tests/test_base_install/test_graphql/test_server_flags.py @@ -282,15 +282,57 @@ def test_max_batch_size(): assert "Batch size 3 exceeds the maximum allowed 2" in str(body) -def test_flags_smoke(): - """Server accepts the remaining concurrency/schema knobs and still serves queries.""" +def test_max_recursive_depth(): + work_dir = tempfile.mkdtemp() + with GraphServer(work_dir, max_recursive_depth=2).start(): + client = RaphtoryClient(SERVER_URL) + make_graph(client) + + # depth 2: { graph { created } } — root selection set is depth 0, graph{...} pushes to 1 + client.query('{ graph(path: "g") { created } }') + + with pytest.raises(Exception) as excinfo: + client.query( + '{ graph(path: "g") { nodes { page(limit: 1) { name } } } }' + ) + assert "recursion depth of the query cannot be greater than `2`" in str( + excinfo.value + ) + + +def test_max_directives_per_field(): + work_dir = tempfile.mkdtemp() + with GraphServer(work_dir, max_directives_per_field=1).start(): + client = RaphtoryClient(SERVER_URL) + + # 1 directive — allowed + client.query("{ version @skip(if: false) }") + + # 2 directives on one field — rejected + with pytest.raises(Exception) as excinfo: + client.query("{ version @skip(if: false) @include(if: true) }") + assert ( + "number of directives on the field `version` cannot be greater than `1`" + in str(excinfo.value) + ) + + +# heavy_query_limit and exclusive_writes are concurrency knobs. Their effects only show +# up under parallel load (semaphore-parked queries, write/read serialization), and +# timing-based tests are flaky in CI. This smoke test at least verifies the flags are +# accepted and normal queries still pass through. +def test_concurrency_flags_smoke(): work_dir = tempfile.mkdtemp() with GraphServer( work_dir, heavy_query_limit=4, exclusive_writes=True, - max_recursive_depth=64, - max_directives_per_field=2, ).start(): client = RaphtoryClient(SERVER_URL) - assert client.query("{ version }")["version"] + make_graph(client) + # Read path: works under exclusive_writes's read lock. + assert client.query('{ graph(path: "g") { nodes { count } } }') + # Heavy traversal: goes through the semaphore. + assert client.query( + '{ graph(path: "g") { nodes { page(limit: 10) { neighbours { page(limit: 10) { name } } } } } }' + ) From 5b18dc77acf515624ba742b86ec19c75fab79e8a Mon Sep 17 00:00:00 2001 From: miratepuffin Date: Sun, 19 Apr 2026 18:20:06 +0100 Subject: [PATCH 09/11] Final tests --- .../test_graphql/test_server_flags.py | 44 +++++++++++++++++-- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/python/tests/test_base_install/test_graphql/test_server_flags.py b/python/tests/test_base_install/test_graphql/test_server_flags.py index 3334577ebe..7019bf5688 100644 --- a/python/tests/test_base_install/test_graphql/test_server_flags.py +++ b/python/tests/test_base_install/test_graphql/test_server_flags.py @@ -119,6 +119,26 @@ def test_max_query_complexity(): "GqlIntervals", '{ graph(path: "g") { node(name: "ben") { history { intervals { list } } } } }', ), + ( + "GqlHistory.listRev", + '{ graph(path: "g") { node(name: "ben") { history { listRev { timestamp } } } } }', + ), + ( + "GqlHistoryTimestamp.listRev", + '{ graph(path: "g") { node(name: "ben") { history { timestamps { listRev } } } } }', + ), + ( + "GqlHistoryDateTime.listRev", + '{ graph(path: "g") { node(name: "ben") { history { datetimes { listRev } } } } }', + ), + ( + "GqlHistoryEventId.listRev", + '{ graph(path: "g") { node(name: "ben") { history { eventId { listRev } } } } }', + ), + ( + "GqlIntervals.listRev", + '{ graph(path: "g") { node(name: "ben") { history { intervals { listRev } } } } }', + ), ( "GqlGraphWindowSet", '{ graph(path: "g") { rolling(window: {epoch: 1}) { list { earliestTime { timestamp } } } } }', @@ -178,6 +198,26 @@ def test_max_query_complexity(): "GqlIntervals", '{ graph(path: "g") { node(name: "ben") { history { intervals { page(limit: 50) } } } } }', ), + ( + "GqlHistory.pageRev", + '{ graph(path: "g") { node(name: "ben") { history { pageRev(limit: 50) { timestamp } } } } }', + ), + ( + "GqlHistoryTimestamp.pageRev", + '{ graph(path: "g") { node(name: "ben") { history { timestamps { pageRev(limit: 50) } } } } }', + ), + ( + "GqlHistoryDateTime.pageRev", + '{ graph(path: "g") { node(name: "ben") { history { datetimes { pageRev(limit: 50) } } } } }', + ), + ( + "GqlHistoryEventId.pageRev", + '{ graph(path: "g") { node(name: "ben") { history { eventId { pageRev(limit: 50) } } } } }', + ), + ( + "GqlIntervals.pageRev", + '{ graph(path: "g") { node(name: "ben") { history { intervals { pageRev(limit: 50) } } } } }', + ), ( "GqlGraphWindowSet", '{ graph(path: "g") { rolling(window: {epoch: 1}) { page(limit: 50) { earliestTime { timestamp } } } } }', @@ -292,9 +332,7 @@ def test_max_recursive_depth(): client.query('{ graph(path: "g") { created } }') with pytest.raises(Exception) as excinfo: - client.query( - '{ graph(path: "g") { nodes { page(limit: 1) { name } } } }' - ) + client.query('{ graph(path: "g") { nodes { page(limit: 1) { name } } } }') assert "recursion depth of the query cannot be greater than `2`" in str( excinfo.value ) From 68df428d99ad3619a735b8025bd45402c85bf88b Mon Sep 17 00:00:00 2001 From: miratepuffin Date: Mon, 20 Apr 2026 11:59:36 +0100 Subject: [PATCH 10/11] Swap to async grqphql extension --- Cargo.lock | 1 + Cargo.toml | 1 + .../test_graphql/test_server_flags.py | 28 +- raphtory-graphql/Cargo.toml | 1 + raphtory-graphql/src/collection_guard.rs | 301 ++++++++++++++++++ raphtory-graphql/src/lib.rs | 1 + .../src/model/graph/collection.rs | 52 +-- raphtory-graphql/src/model/graph/edges.rs | 15 +- raphtory-graphql/src/model/graph/history.rs | 136 +++----- raphtory-graphql/src/model/graph/nodes.rs | 15 +- .../src/model/graph/path_from_node.rs | 15 +- raphtory-graphql/src/model/graph/windowset.rs | 90 ++---- raphtory-graphql/src/server.rs | 5 +- 13 files changed, 425 insertions(+), 236 deletions(-) create mode 100644 raphtory-graphql/src/collection_guard.rs diff --git a/Cargo.lock b/Cargo.lock index 17f43cd917..8cdf88cd36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6569,6 +6569,7 @@ dependencies = [ "arrow-array", "async-graphql", "async-graphql-poem", + "async-graphql-value", "base64 0.22.1", "base64-compat", "bigdecimal", diff --git a/Cargo.toml b/Cargo.toml index a12139ed49..87ee16d5c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,7 @@ raphtory-itertools = { version = "0.18.0", path = "raphtory-itertools" } clam-core = { version = "0.18.0", path = "clam-core" } optd-core = { version = "0.18.0", path = "optd/optd/core" } async-graphql = { version = "7.2.1", features = ["dynamic-schema"] } +async-graphql-value = "7.2.1" bincode = { version = "2", features = ["serde"] } async-graphql-poem = "7.2.1" dynamic-graphql = "0.10.1" diff --git a/python/tests/test_base_install/test_graphql/test_server_flags.py b/python/tests/test_base_install/test_graphql/test_server_flags.py index 7019bf5688..3ca466981b 100644 --- a/python/tests/test_base_install/test_graphql/test_server_flags.py +++ b/python/tests/test_base_install/test_graphql/test_server_flags.py @@ -262,15 +262,18 @@ def test_disable_lists_all_resolvers(): def test_disable_lists_page_still_works(): - """Even with `disable_lists=True`, `page` queries still succeed.""" + """Even with `disable_lists=True`, every `page` resolver still succeeds.""" work_dir = tempfile.mkdtemp() with GraphServer(work_dir, disable_lists=True).start(): client = RaphtoryClient(SERVER_URL) make_graph(client) - result = client.query( - '{ graph(path: "g") { nodes { page(limit: 10) { name } } } }' - ) - assert len(result["graph"]["nodes"]["page"]) == 3 + for name, query in PAGE_QUERIES: + try: + client.query(query) + except Exception as e: + raise AssertionError( + f"{name} unexpectedly failed while lists are disabled: {e}" + ) def test_max_page_size_all_resolvers(): @@ -289,15 +292,18 @@ def test_max_page_size_all_resolvers(): def test_max_page_size_under_cap_works(): - """Pages at or below max_page_size still succeed.""" + """With max_page_size=51, the same PAGE_QUERIES (all using limit=50) all succeed.""" work_dir = tempfile.mkdtemp() - with GraphServer(work_dir, max_page_size=2).start(): + with GraphServer(work_dir, max_page_size=51).start(): client = RaphtoryClient(SERVER_URL) make_graph(client) - result = client.query( - '{ graph(path: "g") { nodes { page(limit: 2) { name } } } }' - ) - assert len(result["graph"]["nodes"]["page"]) == 2 + for name, query in PAGE_QUERIES: + try: + client.query(query) + except Exception as e: + raise AssertionError( + f"{name} unexpectedly failed under max_page_size=51: {e}" + ) def test_disable_batching(): diff --git a/raphtory-graphql/Cargo.toml b/raphtory-graphql/Cargo.toml index 081ea82f76..bedfaba452 100644 --- a/raphtory-graphql/Cargo.toml +++ b/raphtory-graphql/Cargo.toml @@ -31,6 +31,7 @@ once_cell = { workspace = true } poem = { workspace = true } tokio = { workspace = true } async-graphql = { workspace = true, features = ["apollo_tracing"] } +async-graphql-value = { workspace = true } dynamic-graphql = { workspace = true } async-graphql-poem = { workspace = true } futures-util = { workspace = true } diff --git a/raphtory-graphql/src/collection_guard.rs b/raphtory-graphql/src/collection_guard.rs new file mode 100644 index 0000000000..3f6e6247e2 --- /dev/null +++ b/raphtory-graphql/src/collection_guard.rs @@ -0,0 +1,301 @@ +use crate::config::concurrency_config::ConcurrencyConfig; +use async_graphql::{ + async_trait, + extensions::{Extension, ExtensionContext, ExtensionFactory, NextParseQuery}, + parser::types::{ExecutableDocument, Field, Selection, SelectionSet, VariableDefinition}, + Name, Positioned, ServerError, ServerResult, Variables, +}; +use async_graphql_value::{ConstValue, Value}; +use std::{collections::HashSet, sync::Arc}; + +const LIST_DISABLED_ERROR: &str = + "Bulk list endpoints are disabled on this server. Use `page` instead."; + +/// Enforces `concurrency.disable_lists` and `concurrency.max_page_size` at parse time +/// by walking the `ExecutableDocument` and rejecting any `list`/`listRev` field (when +/// lists are disabled) or any `page`/`pageRev` field whose `limit` argument exceeds +/// the configured maximum. +pub struct CollectionGuard { + disable_lists: bool, + max_page_size: Option, +} + +impl CollectionGuard { + /// Returns `None` when neither guard is active — avoids installing an extension + /// that would only no-op. + pub fn from_config(config: &ConcurrencyConfig) -> Option { + if !config.disable_lists && config.max_page_size.is_none() { + return None; + } + Some(Self { + disable_lists: config.disable_lists, + max_page_size: config.max_page_size, + }) + } +} + +impl ExtensionFactory for CollectionGuard { + fn create(&self) -> Arc { + Arc::new(CollectionGuardExtension { + disable_lists: self.disable_lists, + max_page_size: self.max_page_size, + }) + } +} + +struct CollectionGuardExtension { + disable_lists: bool, + max_page_size: Option, +} + +#[async_trait::async_trait] +impl Extension for CollectionGuardExtension { + async fn parse_query( + &self, + ctx: &ExtensionContext<'_>, + query: &str, + variables: &Variables, + next: NextParseQuery<'_>, + ) -> ServerResult { + let doc = next.run(ctx, query, variables).await?; + for (_, op) in doc.operations.iter() { + let resolver = VariableResolver::new(&op.node.variable_definitions, variables); + let mut visited = HashSet::new(); + self.walk(&op.node.selection_set.node, &doc, &resolver, &mut visited)?; + } + Ok(doc) + } +} + +impl CollectionGuardExtension { + fn walk<'a>( + &self, + set: &'a SelectionSet, + doc: &'a ExecutableDocument, + resolver: &VariableResolver<'_>, + visited: &mut HashSet<&'a str>, + ) -> ServerResult<()> { + for item in &set.items { + match &item.node { + Selection::Field(field) => { + let field_node = &field.node; + let name = field_node.name.node.as_str(); + let pos = field.pos; + match name { + "list" | "listRev" if self.disable_lists => { + return Err(ServerError::new(LIST_DISABLED_ERROR, Some(pos))); + } + "page" | "pageRev" => { + if let Some(max) = self.max_page_size { + if let Some(limit) = field_limit(field_node, resolver) { + if limit > max { + return Err(ServerError::new( + format!( + "page limit {limit} exceeds the maximum allowed page size {max}" + ), + Some(pos), + )); + } + } + } + } + _ => {} + } + self.walk(&field_node.selection_set.node, doc, resolver, visited)?; + } + Selection::InlineFragment(frag) => { + self.walk(&frag.node.selection_set.node, doc, resolver, visited)?; + } + Selection::FragmentSpread(spread) => { + let fragment_name = spread.node.fragment_name.node.as_str(); + if !visited.insert(fragment_name) { + continue; + } + if let Some(def) = doc.fragments.get(&spread.node.fragment_name.node) { + self.walk(&def.node.selection_set.node, doc, resolver, visited)?; + } + } + } + } + Ok(()) + } +} + +fn field_limit(field: &Field, resolver: &VariableResolver<'_>) -> Option { + let (_, value) = field + .arguments + .iter() + .find(|(n, _)| n.node.as_str() == "limit")?; + match &value.node { + Value::Number(n) => n.as_u64().map(|v| v as usize), + Value::Variable(name) => match resolver.resolve(name)? { + ConstValue::Number(n) => n.as_u64().map(|v| v as usize), + _ => None, + }, + _ => None, + } +} + +/// Resolves a variable by name, falling back to the operation's declared default value +/// when the client omitted it. Stays scoped to a single operation because defaults are +/// per-operation. +struct VariableResolver<'a> { + variables: &'a Variables, + defaults: Vec<(&'a Name, &'a ConstValue)>, +} + +impl<'a> VariableResolver<'a> { + fn new( + definitions: &'a [Positioned], + variables: &'a Variables, + ) -> Self { + let defaults = definitions + .iter() + .filter_map(|def| { + def.node + .default_value + .as_ref() + .map(|v| (&def.node.name.node, &v.node)) + }) + .collect(); + Self { variables, defaults } + } + + fn resolve(&self, name: &Name) -> Option<&ConstValue> { + if let Some(value) = self.variables.get(name) { + return Some(value); + } + self.defaults + .iter() + .find_map(|(n, v)| (*n == name).then_some(*v)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use async_graphql::parser::parse_query; + + fn run( + disable_lists: bool, + max_page_size: Option, + query: &str, + variables: Variables, + ) -> Result<(), String> { + let ext = CollectionGuardExtension { + disable_lists, + max_page_size, + }; + let doc = parse_query(query).map_err(|e| e.to_string())?; + for (_, op) in doc.operations.iter() { + let resolver = VariableResolver::new(&op.node.variable_definitions, &variables); + let mut visited = HashSet::new(); + ext.walk(&op.node.selection_set.node, &doc, &resolver, &mut visited) + .map_err(|e| e.message)?; + } + Ok(()) + } + + #[test] + fn rejects_list_when_disabled() { + let err = run(true, None, "{ foo { list { bar } } }", Variables::default()).unwrap_err(); + assert!(err.contains("Bulk list endpoints are disabled")); + } + + #[test] + fn rejects_list_rev_when_disabled() { + let err = run(true, None, "{ foo { listRev { bar } } }", Variables::default()).unwrap_err(); + assert!(err.contains("Bulk list endpoints are disabled")); + } + + #[test] + fn allows_list_when_not_disabled() { + run(false, None, "{ foo { list { bar } } }", Variables::default()).unwrap(); + } + + #[test] + fn rejects_page_over_max() { + let err = run( + false, + Some(10), + "{ foo { page(limit: 50) { bar } } }", + Variables::default(), + ) + .unwrap_err(); + assert!(err.contains("page limit 50 exceeds the maximum allowed page size 10")); + } + + #[test] + fn allows_page_under_max() { + run( + false, + Some(50), + "{ foo { page(limit: 10) { bar } } }", + Variables::default(), + ) + .unwrap(); + } + + #[test] + fn resolves_limit_from_provided_variable() { + let vars = Variables::from_json(serde_json::json!({ "n": 100 })); + let err = run( + false, + Some(10), + "query ($n: Int!) { foo { page(limit: $n) { bar } } }", + vars, + ) + .unwrap_err(); + assert!(err.contains("page limit 100 exceeds")); + } + + #[test] + fn resolves_limit_from_variable_default() { + let err = run( + false, + Some(10), + "query ($n: Int = 100) { foo { page(limit: $n) { bar } } }", + Variables::default(), + ) + .unwrap_err(); + assert!(err.contains("page limit 100 exceeds")); + } + + #[test] + fn walks_into_inline_fragments() { + let err = run( + true, + None, + "{ foo { ... on Foo { list { bar } } } }", + Variables::default(), + ) + .unwrap_err(); + assert!(err.contains("Bulk list endpoints are disabled")); + } + + #[test] + fn walks_into_fragment_spreads() { + let err = run( + true, + None, + "fragment F on Foo { list { bar } } { foo { ...F } }", + Variables::default(), + ) + .unwrap_err(); + assert!(err.contains("Bulk list endpoints are disabled")); + } + + #[test] + fn handles_cyclic_fragments_without_looping() { + // Cycle is spec-invalid but arrives here before async-graphql's validation; the + // visited set must prevent infinite recursion. + let err = run( + true, + None, + "fragment A on T { ...B list { x } } fragment B on T { ...A } { root { ...A } }", + Variables::default(), + ) + .unwrap_err(); + assert!(err.contains("Bulk list endpoints are disabled")); + } +} diff --git a/raphtory-graphql/src/lib.rs b/raphtory-graphql/src/lib.rs index e4b50b87bb..ecc6691d8a 100644 --- a/raphtory-graphql/src/lib.rs +++ b/raphtory-graphql/src/lib.rs @@ -5,6 +5,7 @@ use std::sync::Arc; mod auth; pub mod client; +mod collection_guard; pub mod data; mod graph; pub mod model; diff --git a/raphtory-graphql/src/model/graph/collection.rs b/raphtory-graphql/src/model/graph/collection.rs index 21e1673239..aa345e984a 100644 --- a/raphtory-graphql/src/model/graph/collection.rs +++ b/raphtory-graphql/src/model/graph/collection.rs @@ -1,42 +1,10 @@ -use crate::{config::concurrency_config::ConcurrencyConfig, rayon::blocking_compute}; -use async_graphql::{Context, Error, Result}; +use crate::rayon::blocking_compute; use dynamic_graphql::{ internal::{OutputTypeName, ResolveOwned, TypeName}, ResolvedObject, ResolvedObjectFields, }; use std::{borrow::Cow, sync::Arc}; -/// Returns an error when `concurrency.disable_lists` is set. Called from every `list` -/// resolver on paginated collections. -pub(crate) fn check_list_allowed(ctx: &Context<'_>) -> Result<()> { - if ctx - .data_opt::() - .map(|cfg| cfg.disable_lists) - .unwrap_or(false) - { - return Err(Error::new( - "Bulk list endpoints are disabled on this server. Use `page` instead.", - )); - } - Ok(()) -} - -/// Returns an error when `limit` exceeds `concurrency.max_page_size`. Called from every -/// `page` resolver on paginated collections. -pub(crate) fn check_page_limit(ctx: &Context<'_>, limit: usize) -> Result<()> { - if let Some(max) = ctx - .data_opt::() - .and_then(|cfg| cfg.max_page_size) - { - if limit > max { - return Err(Error::new(format!( - "page limit {limit} exceeds the maximum allowed page size {max}" - ))); - } - } - Ok(()) -} - /// Collection of items #[derive(ResolvedObject, Clone)] #[graphql(get_type_name = true)] @@ -79,26 +47,18 @@ where T: for<'a> ResolveOwned<'a>, { /// Returns a list of collection objects. - async fn list(&self, ctx: &Context<'_>) -> Result> { - check_list_allowed(ctx)?; + async fn list(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || self_clone.items.to_vec()).await) + blocking_compute(move || self_clone.items.to_vec()).await } /// Fetch one page with a number of items up to a specified limit, optionally offset by a specified amount. The page_index sets the number of pages to skip (defaults to 0). /// /// For example, if page(5, 2, 1) is called, a page with 5 items, offset by 11 items (2 pages of 5 + 1), /// will be returned. - async fn page( - &self, - ctx: &Context<'_>, - limit: usize, - offset: Option, - page_index: Option, - ) -> Result> { - check_page_limit(ctx, limit)?; + async fn page(&self, limit: usize, offset: Option, page_index: Option) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .items @@ -108,7 +68,7 @@ where .cloned() .collect() }) - .await) + .await } /// Returns a count of collection objects. diff --git a/raphtory-graphql/src/model/graph/edges.rs b/raphtory-graphql/src/model/graph/edges.rs index e393c327e4..f83d21c67d 100644 --- a/raphtory-graphql/src/model/graph/edges.rs +++ b/raphtory-graphql/src/model/graph/edges.rs @@ -1,7 +1,6 @@ use crate::{ model::{ graph::{ - collection::{check_list_allowed, check_page_limit}, edge::GqlEdge, filtering::EdgesViewCollection, timeindex::{GqlEventTime, GqlTimeInput}, @@ -12,7 +11,6 @@ use crate::{ }, rayon::blocking_compute, }; -use async_graphql::Context; use dynamic_graphql::{ResolvedObject, ResolvedObjectFields}; use itertools::Itertools; use raphtory::{ @@ -346,25 +344,22 @@ impl GqlEdges { /// will be returned. async fn page( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone.iter().skip(start).take(limit).collect() }) - .await) + .await } /// Returns a list of all objects in the current selection of the collection. You should filter the collection first then call list. - async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || self_clone.iter().collect()).await) + blocking_compute(move || self_clone.iter().collect()).await } /// Returns a filtered view that applies to list down the chain diff --git a/raphtory-graphql/src/model/graph/history.rs b/raphtory-graphql/src/model/graph/history.rs index 7e66f5db17..58c15c34d3 100644 --- a/raphtory-graphql/src/model/graph/history.rs +++ b/raphtory-graphql/src/model/graph/history.rs @@ -1,11 +1,8 @@ use crate::{ - model::graph::{ - collection::{check_list_allowed, check_page_limit}, - timeindex::{dt_format_str_is_valid, GqlEventTime}, - }, + model::graph::timeindex::{dt_format_str_is_valid, GqlEventTime}, rayon::blocking_compute, }; -use async_graphql::{Context, Error}; +use async_graphql::Error; use dynamic_graphql::{ResolvedObject, ResolvedObjectFields}; use raphtory::db::api::view::history::{ History, HistoryDateTime, HistoryEventId, HistoryTimestamp, InternalHistoryOps, Intervals, @@ -53,20 +50,15 @@ impl GqlHistory { } /// List all time entries present in this history. - async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || self_clone.history.iter().map(|t| t.into()).collect()).await) + blocking_compute(move || self_clone.history.iter().map(|t| t.into()).collect()).await } /// List all time entries present in this history in reverse order. - async fn list_rev(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list_rev(&self) -> Vec { let self_clone = self.clone(); - Ok( - blocking_compute(move || self_clone.history.iter_rev().map(|t| t.into()).collect()) - .await, - ) + blocking_compute(move || self_clone.history.iter_rev().map(|t| t.into()).collect()).await } /// Fetch one page of EventTime entries with a number of items up to a specified limit, @@ -76,14 +68,12 @@ impl GqlHistory { /// will be returned. async fn page( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .history @@ -93,7 +83,7 @@ impl GqlHistory { .map(|t| t.into()) .collect() }) - .await) + .await } /// Fetch one page of EventTime entries with a number of items up to a specified limit, @@ -103,14 +93,12 @@ impl GqlHistory { /// will be returned. async fn page_rev( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .history @@ -120,7 +108,7 @@ impl GqlHistory { .map(|t| t.into()) .collect() }) - .await) + .await } /// Returns True if the history is empty. @@ -188,17 +176,15 @@ pub struct GqlHistoryTimestamp { #[ResolvedObjectFields] impl GqlHistoryTimestamp { /// List all timestamps. - async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || self_clone.history_t.collect()).await) + blocking_compute(move || self_clone.history_t.collect()).await } /// List all timestamps in reverse order. - async fn list_rev(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list_rev(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || self_clone.history_t.collect_rev()).await) + blocking_compute(move || self_clone.history_t.collect_rev()).await } /// Fetch one page of timestamps with a number of items up to a specified limit, optionally offset by a specified amount. @@ -208,14 +194,12 @@ impl GqlHistoryTimestamp { /// will be returned. async fn page( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .history_t @@ -224,7 +208,7 @@ impl GqlHistoryTimestamp { .take(limit) .collect() }) - .await) + .await } /// Fetch one page of timestamps in reverse order with a number of items up to a specified limit, @@ -234,14 +218,12 @@ impl GqlHistoryTimestamp { /// will be returned. async fn page_rev( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .history_t @@ -250,7 +232,7 @@ impl GqlHistoryTimestamp { .take(limit) .collect() }) - .await) + .await } } @@ -267,12 +249,7 @@ impl GqlHistoryDateTime { /// List all datetimes formatted as strings. /// If filter_broken is set to True, time conversion errors will be ignored. If set to False, a TimeError /// will be raised on time conversion error. Defaults to False. - async fn list( - &self, - ctx: &Context<'_>, - filter_broken: Option, - ) -> Result, Error> { - check_list_allowed(ctx)?; + async fn list(&self, filter_broken: Option) -> Result, Error> { let self_clone = self.clone(); blocking_compute(move || { let fmt_string = self_clone.format_string.as_deref().unwrap_or("%+"); // %+ is RFC 3339 @@ -303,12 +280,7 @@ impl GqlHistoryDateTime { /// List all datetimes formatted as strings in reverse chronological order. /// If filter_broken is set to True, time conversion errors will be ignored. If set to False, a TimeError /// will be raised on time conversion error. Defaults to False. - async fn list_rev( - &self, - ctx: &Context<'_>, - filter_broken: Option, - ) -> Result, Error> { - check_list_allowed(ctx)?; + async fn list_rev(&self, filter_broken: Option) -> Result, Error> { let self_clone = self.clone(); blocking_compute(move || { let fmt_string = self_clone.format_string.as_deref().unwrap_or("%+"); // %+ is RFC 3339 @@ -345,13 +317,11 @@ impl GqlHistoryDateTime { /// will be returned. async fn page( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, filter_broken: Option, ) -> Result, Error> { - check_page_limit(ctx, limit)?; let self_clone = self.clone(); blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); @@ -392,13 +362,11 @@ impl GqlHistoryDateTime { /// will be returned. async fn page_rev( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, filter_broken: Option, ) -> Result, Error> { - check_page_limit(ctx, limit)?; let self_clone = self.clone(); blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); @@ -441,31 +409,29 @@ pub struct GqlHistoryEventId { #[ResolvedObjectFields] impl GqlHistoryEventId { /// List event ids. - async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { self_clone .history_s .iter() .map(|s: usize| s as u64) .collect() }) - .await) + .await } /// List event ids in reverse order. - async fn list_rev(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list_rev(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { self_clone .history_s .iter_rev() .map(|s: usize| s as u64) .collect() }) - .await) + .await } /// Fetch one page of event ids with a number of items up to a specified limit, @@ -475,14 +441,12 @@ impl GqlHistoryEventId { /// will be returned. async fn page( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .history_s @@ -492,7 +456,7 @@ impl GqlHistoryEventId { .map(|s: usize| s as u64) .collect() }) - .await) + .await } /// Fetch one page of event ids in reverse chronological order with a number of items up to a specified limit, @@ -502,14 +466,12 @@ impl GqlHistoryEventId { /// will be returned. async fn page_rev( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .history_s @@ -519,7 +481,7 @@ impl GqlHistoryEventId { .map(|s: usize| s as u64) .collect() }) - .await) + .await } } @@ -533,17 +495,15 @@ pub struct GqlIntervals { #[ResolvedObjectFields] impl GqlIntervals { /// List time intervals between consecutive timestamps in milliseconds. - async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || self_clone.intervals.collect()).await) + blocking_compute(move || self_clone.intervals.collect()).await } /// List millisecond time intervals between consecutive timestamps in reverse order. - async fn list_rev(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list_rev(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || self_clone.intervals.collect_rev()).await) + blocking_compute(move || self_clone.intervals.collect_rev()).await } /// Fetch one page of intervals between consecutive timestamps with a number of items up to a specified limit, @@ -553,14 +513,12 @@ impl GqlIntervals { /// will be returned. async fn page( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .intervals @@ -569,7 +527,7 @@ impl GqlIntervals { .take(limit) .collect() }) - .await) + .await } /// Fetch one page of intervals between consecutive timestamps in reverse order with a number of items up to a specified limit, @@ -579,14 +537,12 @@ impl GqlIntervals { /// will be returned. async fn page_rev( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .intervals @@ -595,7 +551,7 @@ impl GqlIntervals { .take(limit) .collect() }) - .await) + .await } /// Compute the mean interval between consecutive timestamps. Returns None if fewer than 1 timestamp. diff --git a/raphtory-graphql/src/model/graph/nodes.rs b/raphtory-graphql/src/model/graph/nodes.rs index 99c6021f0b..82fa5a5077 100644 --- a/raphtory-graphql/src/model/graph/nodes.rs +++ b/raphtory-graphql/src/model/graph/nodes.rs @@ -1,7 +1,6 @@ use crate::{ model::{ graph::{ - collection::{check_list_allowed, check_page_limit}, filtering::{GqlNodeFilter, NodesViewCollection}, node::GqlNode, timeindex::{GqlEventTime, GqlTimeInput}, @@ -12,7 +11,6 @@ use crate::{ }, rayon::blocking_compute, }; -use async_graphql::{Context, Result}; use dynamic_graphql::{ResolvedObject, ResolvedObjectFields}; use itertools::Itertools; use raphtory::{ @@ -332,24 +330,21 @@ impl GqlNodes { /// will be returned. async fn page( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone.iter().skip(start).take(limit).collect() }) - .await) + .await } - async fn list(&self, ctx: &Context<'_>) -> Result> { - check_list_allowed(ctx)?; + async fn list(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || self_clone.iter().collect()).await) + blocking_compute(move || self_clone.iter().collect()).await } /// Returns a view of the node ids. diff --git a/raphtory-graphql/src/model/graph/path_from_node.rs b/raphtory-graphql/src/model/graph/path_from_node.rs index 83a193f83f..e4562fea23 100644 --- a/raphtory-graphql/src/model/graph/path_from_node.rs +++ b/raphtory-graphql/src/model/graph/path_from_node.rs @@ -1,6 +1,5 @@ use crate::{ model::graph::{ - collection::{check_list_allowed, check_page_limit}, filtering::{GqlNodeFilter, PathFromNodeViewCollection}, node::GqlNode, timeindex::{GqlEventTime, GqlTimeInput}, @@ -9,7 +8,6 @@ use crate::{ }, rayon::blocking_compute, }; -use async_graphql::Context; use dynamic_graphql::{ResolvedObject, ResolvedObjectFields}; use raphtory::{ core::utils::time::TryIntoInterval, @@ -207,24 +205,21 @@ impl GqlPathFromNode { /// will be returned. async fn page( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone.iter().skip(start).take(limit).collect() }) - .await) + .await } - async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || self_clone.iter().collect()).await) + blocking_compute(move || self_clone.iter().collect()).await } /// Returns the node ids. diff --git a/raphtory-graphql/src/model/graph/windowset.rs b/raphtory-graphql/src/model/graph/windowset.rs index da1fd6311e..ca1ad04454 100644 --- a/raphtory-graphql/src/model/graph/windowset.rs +++ b/raphtory-graphql/src/model/graph/windowset.rs @@ -1,6 +1,5 @@ use crate::{ model::graph::{ - collection::{check_list_allowed, check_page_limit}, edge::GqlEdge, edges::GqlEdges, graph::GqlGraph, @@ -11,7 +10,6 @@ use crate::{ paths::ExistingGraphFolder, rayon::blocking_compute, }; -use async_graphql::Context; use dynamic_graphql::{ResolvedObject, ResolvedObjectFields}; use raphtory::db::{ api::{ @@ -48,14 +46,12 @@ impl GqlGraphWindowSet { /// will be returned. async fn page( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .ws @@ -65,20 +61,19 @@ impl GqlGraphWindowSet { .map(|g| GqlGraph::new(self_clone.path.clone(), g)) .collect() }) - .await) + .await } - async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { self_clone .ws .clone() .map(|g| GqlGraph::new(self_clone.path.clone(), g)) .collect() }) - .await) + .await } } @@ -107,14 +102,12 @@ impl GqlNodeWindowSet { /// will be returned. async fn page( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .ws @@ -124,13 +117,12 @@ impl GqlNodeWindowSet { .map(|n| n.into()) .collect() }) - .await) + .await } - async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || self_clone.ws.clone().map(|n| n.into()).collect()).await) + blocking_compute(move || self_clone.ws.clone().map(|n| n.into()).collect()).await } } @@ -161,14 +153,12 @@ impl GqlNodesWindowSet { /// will be returned. async fn page( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .ws @@ -178,16 +168,12 @@ impl GqlNodesWindowSet { .map(|n| GqlNodes::new(n)) .collect() }) - .await) + .await } - async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list(&self) -> Vec { let self_clone = self.clone(); - Ok( - blocking_compute(move || self_clone.ws.clone().map(|n| GqlNodes::new(n)).collect()) - .await, - ) + blocking_compute(move || self_clone.ws.clone().map(|n| GqlNodes::new(n)).collect()).await } } @@ -216,14 +202,12 @@ impl GqlPathFromNodeWindowSet { /// will be returned. async fn page( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .ws @@ -233,20 +217,19 @@ impl GqlPathFromNodeWindowSet { .map(|n| GqlPathFromNode::new(n)) .collect() }) - .await) + .await } - async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { self_clone .ws .clone() .map(|n| GqlPathFromNode::new(n)) .collect() }) - .await) + .await } } @@ -275,14 +258,12 @@ impl GqlEdgeWindowSet { /// will be returned. async fn page( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .ws @@ -292,13 +273,12 @@ impl GqlEdgeWindowSet { .map(|e| e.into()) .collect() }) - .await) + .await } - async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list(&self) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || self_clone.ws.clone().map(|e| e.into()).collect()).await) + blocking_compute(move || self_clone.ws.clone().map(|e| e.into()).collect()).await } } @@ -327,14 +307,12 @@ impl GqlEdgesWindowSet { /// will be returned. async fn page( &self, - ctx: &Context<'_>, limit: usize, offset: Option, page_index: Option, - ) -> async_graphql::Result> { - check_page_limit(ctx, limit)?; + ) -> Vec { let self_clone = self.clone(); - Ok(blocking_compute(move || { + blocking_compute(move || { let start = page_index.unwrap_or(0) * limit + offset.unwrap_or(0); self_clone .ws @@ -344,15 +322,11 @@ impl GqlEdgesWindowSet { .map(|e| GqlEdges::new(e)) .collect() }) - .await) + .await } - async fn list(&self, ctx: &Context<'_>) -> async_graphql::Result> { - check_list_allowed(ctx)?; + async fn list(&self) -> Vec { let self_clone = self.clone(); - Ok( - blocking_compute(move || self_clone.ws.clone().map(|e| GqlEdges::new(e)).collect()) - .await, - ) + blocking_compute(move || self_clone.ws.clone().map(|e| GqlEdges::new(e)).collect()).await } } diff --git a/raphtory-graphql/src/server.rs b/raphtory-graphql/src/server.rs index 17b65732be..cddd660ecd 100644 --- a/raphtory-graphql/src/server.rs +++ b/raphtory-graphql/src/server.rs @@ -1,5 +1,6 @@ use crate::{ auth::{AuthenticatedGraphQL, MutationAuth}, + collection_guard::CollectionGuard, config::app_config::{load_config, AppConfig}, data::Data, model::{ @@ -250,8 +251,10 @@ impl GraphServer { let schema_cfg = &self.config.schema; let mut schema_builder = App::create_schema() .data(self.data.clone()) - .data(self.config.concurrency.clone()) .extension(MutationAuth); + if let Some(guard) = CollectionGuard::from_config(&self.config.concurrency) { + schema_builder = schema_builder.extension(guard); + } if let Some(depth) = schema_cfg.max_query_depth { schema_builder = schema_builder.limit_depth(depth); } From d486fe190612b4a635db82c544390534f150a1d9 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Mon, 20 Apr 2026 14:50:50 +0000 Subject: [PATCH 11/11] chore: apply tidy-public auto-fixes --- raphtory-graphql/src/collection_guard.rs | 26 ++++++++++++++----- raphtory-graphql/src/model/graph/windowset.rs | 6 +---- 2 files changed, 20 insertions(+), 12 deletions(-) diff --git a/raphtory-graphql/src/collection_guard.rs b/raphtory-graphql/src/collection_guard.rs index 3f6e6247e2..d5dd9cc6e1 100644 --- a/raphtory-graphql/src/collection_guard.rs +++ b/raphtory-graphql/src/collection_guard.rs @@ -145,10 +145,7 @@ struct VariableResolver<'a> { } impl<'a> VariableResolver<'a> { - fn new( - definitions: &'a [Positioned], - variables: &'a Variables, - ) -> Self { + fn new(definitions: &'a [Positioned], variables: &'a Variables) -> Self { let defaults = definitions .iter() .filter_map(|def| { @@ -158,7 +155,10 @@ impl<'a> VariableResolver<'a> { .map(|v| (&def.node.name.node, &v.node)) }) .collect(); - Self { variables, defaults } + Self { + variables, + defaults, + } } fn resolve(&self, name: &Name) -> Option<&ConstValue> { @@ -204,13 +204,25 @@ mod tests { #[test] fn rejects_list_rev_when_disabled() { - let err = run(true, None, "{ foo { listRev { bar } } }", Variables::default()).unwrap_err(); + let err = run( + true, + None, + "{ foo { listRev { bar } } }", + Variables::default(), + ) + .unwrap_err(); assert!(err.contains("Bulk list endpoints are disabled")); } #[test] fn allows_list_when_not_disabled() { - run(false, None, "{ foo { list { bar } } }", Variables::default()).unwrap(); + run( + false, + None, + "{ foo { list { bar } } }", + Variables::default(), + ) + .unwrap(); } #[test] diff --git a/raphtory-graphql/src/model/graph/windowset.rs b/raphtory-graphql/src/model/graph/windowset.rs index ca1ad04454..32b1501029 100644 --- a/raphtory-graphql/src/model/graph/windowset.rs +++ b/raphtory-graphql/src/model/graph/windowset.rs @@ -1,10 +1,6 @@ use crate::{ model::graph::{ - edge::GqlEdge, - edges::GqlEdges, - graph::GqlGraph, - node::GqlNode, - nodes::GqlNodes, + edge::GqlEdge, edges::GqlEdges, graph::GqlGraph, node::GqlNode, nodes::GqlNodes, path_from_node::GqlPathFromNode, }, paths::ExistingGraphFolder,