diff --git a/db4-storage/src/lib.rs b/db4-storage/src/lib.rs index 174444c2fb..623737957a 100644 --- a/db4-storage/src/lib.rs +++ b/db4-storage/src/lib.rs @@ -54,6 +54,7 @@ pub type GS

= GraphPropSegmentView

; pub type Layer

= GraphStore, ES

, GS

, P>; pub type Wal = ::Wal; +pub type ControlFile = ::ControlFile; pub type Config = ::Config; pub type GIDResolver = MappingResolver; diff --git a/db4-storage/src/pages/mod.rs b/db4-storage/src/pages/mod.rs index 04f2994159..dcc1c07b6f 100644 --- a/db4-storage/src/pages/mod.rs +++ b/db4-storage/src/pages/mod.rs @@ -3,7 +3,12 @@ use crate::{ api::{edges::EdgeSegmentOps, graph_props::GraphPropSegmentOps, nodes::NodeSegmentOps}, error::StorageError, pages::{edge_store::ReadLockedEdgeStorage, node_store::ReadLockedNodeStorage}, - persist::{config::ConfigOps, strategy::PersistenceStrategy}, + persist::{ + config::ConfigOps, + control_file::{ControlFileOps, DBState}, + strategy::PersistenceStrategy, + }, + properties::props_meta_writer::PropsMetaWriter, segments::{edge::segment::MemEdgeSegment, node::segment::MemNodeSegment}, state::StateIndex, wal::{GraphWalOps, WalOps}, @@ -349,32 +354,38 @@ impl< > Drop for GraphStore { fn drop(&mut self) { + let wal = self.ext.wal(); + let control_file = self.ext.control_file(); + match self.flush() { Ok(_) => { - let wal = self.ext.wal(); - - // INVARIANTS: - // 1. No new writes can occur since we are in a drop. - // 2. flush() has persisted all the segments to disk. - // - // Thus, we can safely discard all records with LSN <= latest_lsn_on_disk - // by rotating the WAL. - let latest_lsn_on_disk = wal.next_lsn() - 1; - - if let Err(e) = wal.rotate(latest_lsn_on_disk) { - eprintln!("Failed to rotate WAL in drop: {}", e); + // Log a checkpoint record in the WAL, indicating that the DB was shutdown + // with all the segments flushed to disk. + // On startup, recovery is skipped since there are no pending writes to replay. + let checkpoint_lsn = match wal.log_shutdown_checkpoint() { + Ok(lsn) => lsn, + Err(err) => { + eprintln!("Failed to log shutdown checkpoint in drop: {err}"); + return; + } + }; + + // Flush up to the end of the WAL stream. + let flush_lsn = wal.position(); + + if let Err(err) = wal.flush(flush_lsn) { + eprintln!("Failed to flush checkpoint record in drop: {err}"); + return; } - // FIXME: If the process crashes here after rotation, we lose the - // checkpoint record. Write next LSN to a separate file before rotation. + // Record the checkpoint and shutdown state and write control file to disk. + control_file.set_checkpoint(checkpoint_lsn); + control_file.set_db_state(DBState::Shutdown); - // Log a checkpoint record so we can restore the next LSN after reload. - let checkpoint_lsn = wal - .log_checkpoint(latest_lsn_on_disk) - .expect("Failed to log checkpoint in drop"); - - wal.flush(checkpoint_lsn) - .expect("Failed to flush checkpoint record in drop"); + if let Err(err) = control_file.save() { + eprintln!("Failed to save control file in drop: {err}"); + return; + } } Err(err) => { eprintln!("Failed to flush storage in drop: {err}") diff --git a/db4-storage/src/persist/config.rs b/db4-storage/src/persist/config.rs index 94eef349df..80435eaa16 100644 --- a/db4-storage/src/persist/config.rs +++ b/db4-storage/src/persist/config.rs @@ -9,7 +9,8 @@ use tracing::error; pub const DEFAULT_MAX_PAGE_LEN_NODES: u32 = 131_072; // 2^17 pub const DEFAULT_MAX_PAGE_LEN_EDGES: u32 = 1_048_576; // 2^20 -pub const CONFIG_FILE: &str = "config.json"; + +const CONFIG_FILE_NAME: &str = "config.json"; pub trait ConfigOps: Serialize + DeserializeOwned + Args + Sized { fn max_node_page_len(&self) -> u32; @@ -25,14 +26,14 @@ pub trait ConfigOps: Serialize + DeserializeOwned + Args + Sized { fn with_node_types(&self, node_types: impl IntoIterator>) -> Self; fn load_from_dir(dir: &Path) -> Result { - let config_file = dir.join(CONFIG_FILE); + let config_file = dir.join(CONFIG_FILE_NAME); let config_file = std::fs::File::open(config_file)?; let config = serde_json::from_reader(config_file)?; Ok(config) } fn save_to_dir(&self, dir: &Path) -> Result<(), StorageError> { - let config_file = dir.join(CONFIG_FILE); + let config_file = dir.join(CONFIG_FILE_NAME); let config_file = std::fs::File::create(&config_file)?; serde_json::to_writer_pretty(config_file, self)?; Ok(()) diff --git a/db4-storage/src/persist/control_file.rs b/db4-storage/src/persist/control_file.rs new file mode 100644 index 0000000000..9c8c942884 --- /dev/null +++ b/db4-storage/src/persist/control_file.rs @@ -0,0 +1,53 @@ +use crate::{error::StorageError, wal::LSN}; +use serde::{Deserialize, Serialize}; +use std::path::Path; + +#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)] +pub enum DBState { + Running, + Shutdown, + CrashRecovery, + NotSupported, +} + +// Starting value for `last_checkpoint` in the control file. +pub const LAST_CHECKPOINT_INIT: LSN = 0; + +pub trait ControlFileOps: Sized { + fn load(dir: &Path) -> Result; + + fn save(&self) -> Result<(), StorageError>; + + fn db_state(&self) -> DBState; + + fn last_checkpoint(&self) -> LSN; + + fn set_db_state(&self, state: DBState); + + fn set_checkpoint(&self, lsn: LSN); +} + +#[derive(Debug, Clone)] +pub struct NoControlFile; + +impl ControlFileOps for NoControlFile { + fn load(_dir: &Path) -> Result { + Ok(NoControlFile) + } + + fn save(&self) -> Result<(), StorageError> { + Ok(()) + } + + fn db_state(&self) -> DBState { + DBState::NotSupported + } + + fn last_checkpoint(&self) -> LSN { + 0 + } + + fn set_db_state(&self, state: DBState) {} + + fn set_checkpoint(&self, lsn: LSN) {} +} diff --git a/db4-storage/src/persist/mod.rs b/db4-storage/src/persist/mod.rs index 43275c62a7..7609d5b63e 100644 --- a/db4-storage/src/persist/mod.rs +++ b/db4-storage/src/persist/mod.rs @@ -1,2 +1,3 @@ pub mod config; +pub mod control_file; pub mod strategy; diff --git a/db4-storage/src/persist/strategy.rs b/db4-storage/src/persist/strategy.rs index 5f1f7aad07..f733365aab 100644 --- a/db4-storage/src/persist/strategy.rs +++ b/db4-storage/src/persist/strategy.rs @@ -1,7 +1,10 @@ use crate::{ api::{edges::EdgeSegmentOps, graph_props::GraphPropSegmentOps, nodes::NodeSegmentOps}, error::StorageError, - persist::config::{BaseConfig, ConfigOps}, + persist::{ + config::{BaseConfig, ConfigOps}, + control_file::{ControlFileOps, NoControlFile}, + }, segments::{ edge::segment::{EdgeSegmentView, MemEdgeSegment}, graph_prop::{GraphPropSegmentView, segment::MemGraphPropSegment}, @@ -25,6 +28,7 @@ pub trait PersistenceStrategy: Debug + Clone + Send + Sync + 'static { type GS: GraphPropSegmentOps; type Wal: WalOps + GraphWalOps; type Config: ConfigOps; + type ControlFile: ControlFileOps; fn new(config: Self::Config, graph_dir: Option<&Path>) -> Result; @@ -38,6 +42,8 @@ pub trait PersistenceStrategy: Debug + Clone + Send + Sync + 'static { fn wal(&self) -> &Self::Wal; + fn control_file(&self) -> &Self::ControlFile; + /// Called after every write and checks memory limits to decide if a flush is needed fn persist_node_segment>( &self, @@ -81,6 +87,7 @@ pub struct NoOpStrategy { config: BaseConfig, memory_tracker: Arc, wal: NoWal, + control_file: NoControlFile, } impl PersistenceStrategy for NoOpStrategy { @@ -89,12 +96,14 @@ impl PersistenceStrategy for NoOpStrategy { type GS = GraphPropSegmentView; type Wal = NoWal; type Config = BaseConfig; + type ControlFile = NoControlFile; fn new(config: BaseConfig, _graph_dir: Option<&Path>) -> Result { Ok(Self { config, - memory_tracker: Arc::new(AtomicUsize::new(0)), wal: NoWal, + control_file: NoControlFile, + memory_tracker: Arc::new(AtomicUsize::new(0)), }) } @@ -118,6 +127,10 @@ impl PersistenceStrategy for NoOpStrategy { &self.wal } + fn control_file(&self) -> &Self::ControlFile { + &self.control_file + } + fn persist_node_segment>( &self, _node_page: &Self::NS, diff --git a/db4-storage/src/wal/entry.rs b/db4-storage/src/wal/entry.rs index 18d33a116b..002723af7d 100644 --- a/db4-storage/src/wal/entry.rs +++ b/db4-storage/src/wal/entry.rs @@ -103,15 +103,31 @@ impl GraphWalOps for NoWal { Ok(0) } - fn log_checkpoint(&self, _lsn: LSN) -> Result { + fn log_checkpoint(&self, _redo: LSN) -> Result { Ok(0) } - fn replay_iter(&self) -> impl Iterator> { - std::iter::empty() + fn log_shutdown_checkpoint(&self) -> Result { + Ok(0) + } + + fn read_checkpoint(&self, _lsn: LSN) -> Result { + Err(StorageError::GenericFailure( + "read_checkpoint is not supported for NoWAL".to_string(), + )) } - fn replay_to_graph(&self, _graph: &mut G) -> Result<(), StorageError> { + fn read_shutdown_checkpoint(&self, _lsn: LSN) -> Result { + Err(StorageError::GenericFailure( + "read_shutdown_checkpoint is not supported for NoWAL".to_string(), + )) + } + + fn replay_to_graph( + &self, + _graph: &mut G, + _start: LSN, + ) -> Result { panic!("NoWAL does not support replay") } } diff --git a/db4-storage/src/wal/mod.rs b/db4-storage/src/wal/mod.rs index 9545bdfc33..e933504121 100644 --- a/db4-storage/src/wal/mod.rs +++ b/db4-storage/src/wal/mod.rs @@ -4,7 +4,6 @@ use raphtory_core::{ entities::{EID, GID, VID}, storage::timeindex::EventTime, }; -use std::path::Path; pub mod entry; pub mod no_wal; @@ -14,17 +13,6 @@ pub type TransactionID = u64; /// Core Wal methods. pub trait WalOps { - type Config; - - fn new(dir: Option<&Path>, config: Self::Config) -> Result - where - Self: Sized; - - /// Loads an existing WAL file from the given directory in append mode. - fn load(dir: Option<&Path>, config: Self::Config) -> Result - where - Self: Sized; - /// Appends data to the WAL and returns the assigned LSN. fn append(&self, data: &[u8]) -> Result; @@ -32,18 +20,18 @@ pub trait WalOps { /// Returns immediately if the given LSN is already flushed to disk. fn flush(&self, lsn: LSN) -> Result<(), StorageError>; - /// Rotates the underlying WAL file. - /// All records with LSN > `cutoff_lsn` are copied to the new WAL file. - fn rotate(&self, cutoff_lsn: LSN) -> Result<(), StorageError>; + /// Reads the WAL record at the given LSN. + /// Returns `Ok(None)` if there is no record at that LSN. + fn read(&self, lsn: LSN) -> Result, StorageError>; - /// Returns an iterator over the entries in the wal. - fn replay(&self) -> impl Iterator>; + /// Returns an iterator over the entries in the wal, starting from the given LSN. + fn replay(&self, start: LSN) -> impl Iterator>; - /// Returns true if there are entries in the WAL file on disk. - fn has_entries(&self) -> Result; + /// Returns the current position in the WAL stream. + fn position(&self) -> LSN; - /// Returns the LSN that will be assigned to the next appended record. - fn next_lsn(&self) -> LSN; + /// Sets the position in the WAL stream. + fn set_position(&self, lsn: LSN) -> Result<(), StorageError>; } #[derive(Debug)] @@ -52,16 +40,16 @@ pub struct ReplayRecord { data: Vec, - /// The raw bytes of the WAL entry stored on disk, including CRC data. - raw_bytes: Vec, + /// LSN immediately after this record in the WAL stream. + next_lsn: LSN, } impl ReplayRecord { - pub fn new(lsn: LSN, data: Vec, raw_bytes: Vec) -> Self { + pub fn new(lsn: LSN, data: Vec, next_lsn: LSN) -> Self { Self { lsn, data, - raw_bytes, + next_lsn, } } @@ -69,12 +57,13 @@ impl ReplayRecord { self.lsn } - pub fn data(&self) -> &[u8] { - &self.data + /// Returns the LSN immediately following this record in the WAL stream. + pub fn next_lsn(&self) -> LSN { + self.next_lsn } - pub fn raw_bytes(&self) -> &[u8] { - &self.raw_bytes + pub fn data(&self) -> &[u8] { + &self.data } } @@ -158,16 +147,28 @@ pub trait GraphWalOps { props: Vec<(&str, usize, Prop)>, ) -> Result; - /// Logs a checkpoint record, indicating that all Wal operations upto and including - /// `lsn` has been persisted to disk. - fn log_checkpoint(&self, lsn: LSN) -> Result; + /// Logs a checkpoint indicating that all LSN < `redo` are persisted. + /// On recovery, replay will start from `redo` in the WAL stream. + fn log_checkpoint(&self, redo: LSN) -> Result; + + /// Logs a shutdown checkpoint indicating a clean shutdown with all writes persisted. + fn log_shutdown_checkpoint(&self) -> Result; + + /// Reads and decodes the WAL entry at the given LSN and validates that it is a checkpoint. + /// Returns the checkpoint redo LSN, denoting where replay should start from. + fn read_checkpoint(&self, lsn: LSN) -> Result; - /// Returns an iterator over the entries in the wal. - fn replay_iter(&self) -> impl Iterator>; + /// Reads and decodes the WAL entry at the given LSN and validates that it is a shutdown checkpoint. + /// Returns the LSN immediately after this record, marking the end of the WAL stream. + fn read_shutdown_checkpoint(&self, lsn: LSN) -> Result; - /// Replays and applies all the entries in the wal to the given graph. - /// Subsequent appends to the WAL will start from the LSN of the last replayed entry. - fn replay_to_graph(&self, graph: &mut G) -> Result<(), StorageError>; + /// Replays and applies all the entries in the wal to the given graph, starting from the given LSN. + /// Returns the LSN immediately after the last entry in the WAL stream on success. + fn replay_to_graph( + &self, + graph: &mut G, + start: LSN, + ) -> Result; } /// Trait for defining callbacks for replaying from wal. diff --git a/db4-storage/src/wal/no_wal.rs b/db4-storage/src/wal/no_wal.rs index 8d5b865bde..7f563080ca 100644 --- a/db4-storage/src/wal/no_wal.rs +++ b/db4-storage/src/wal/no_wal.rs @@ -1,5 +1,3 @@ -use std::path::Path; - use crate::{ error::StorageError, wal::{LSN, ReplayRecord, WalOps}, @@ -11,16 +9,6 @@ use crate::{ pub struct NoWal; impl WalOps for NoWal { - type Config = (); - - fn new(_dir: Option<&Path>, _config: ()) -> Result { - Ok(Self) - } - - fn load(_dir: Option<&Path>, _config: ()) -> Result { - Ok(Self) - } - fn append(&self, _data: &[u8]) -> Result { Ok(0) } @@ -29,20 +17,24 @@ impl WalOps for NoWal { Ok(()) } - fn rotate(&self, _cutoff_lsn: LSN) -> Result<(), StorageError> { - Ok(()) - } - - fn replay(&self) -> impl Iterator> { + fn replay(&self, _start: LSN) -> impl Iterator> { let error = "Recovery is not supported for NoWAL"; std::iter::once(Err(StorageError::GenericFailure(error.to_string()))) } - fn has_entries(&self) -> Result { - Ok(false) + fn read(&self, _lsn: LSN) -> Result, StorageError> { + Err(StorageError::GenericFailure( + "read is not supported for NoWAL".to_string(), + )) + } + + fn position(&self) -> LSN { + 0 } - fn next_lsn(&self) -> LSN { - 1 + fn set_position(&self, _lsn: LSN) -> Result<(), StorageError> { + Err(StorageError::GenericFailure( + "set_position is not supported for NoWAL".to_string(), + )) } } diff --git a/raphtory-storage/src/mutation/durability_ops.rs b/raphtory-storage/src/durability_ops.rs similarity index 62% rename from raphtory-storage/src/mutation/durability_ops.rs rename to raphtory-storage/src/durability_ops.rs index 0a2205b5a8..663aa63164 100644 --- a/raphtory-storage/src/mutation/durability_ops.rs +++ b/raphtory-storage/src/durability_ops.rs @@ -1,11 +1,13 @@ use crate::{graph::graph::GraphStorage, mutation::MutationError}; -use storage::{transaction::TransactionManager, Wal}; +use storage::{transaction::TransactionManager, ControlFile, Wal}; -/// Accessor methods for transactions and write-ahead logging. +/// Accessor methods for supporting durability. pub trait DurabilityOps { fn transaction_manager(&self) -> Result<&TransactionManager, MutationError>; fn wal(&self) -> Result<&Wal, MutationError>; + + fn control_file(&self) -> Result<&ControlFile, MutationError>; } impl DurabilityOps for GraphStorage { @@ -16,4 +18,8 @@ impl DurabilityOps for GraphStorage { fn wal(&self) -> Result<&Wal, MutationError> { self.mutable()?.wal() } + + fn control_file(&self) -> Result<&ControlFile, MutationError> { + self.mutable()?.control_file() + } } diff --git a/raphtory-storage/src/lib.rs b/raphtory-storage/src/lib.rs index 8fba0f0625..5e9308d61d 100644 --- a/raphtory-storage/src/lib.rs +++ b/raphtory-storage/src/lib.rs @@ -1,4 +1,6 @@ pub mod core_ops; +pub mod durability_ops; pub mod graph; pub mod layer_ops; pub mod mutation; +pub mod recovery_ops; diff --git a/raphtory-storage/src/mutation/addition_ops_ext.rs b/raphtory-storage/src/mutation/addition_ops_ext.rs index ce0383f6a5..30c589b426 100644 --- a/raphtory-storage/src/mutation/addition_ops_ext.rs +++ b/raphtory-storage/src/mutation/addition_ops_ext.rs @@ -1,7 +1,10 @@ -use crate::mutation::{ - addition_ops::{EdgeWriteLock, InternalAdditionOps, NodeWriteLock, SessionAdditionOps}, +use crate::{ durability_ops::DurabilityOps, - MutationError, NodeWriterT, + mutation::{ + addition_ops::{EdgeWriteLock, InternalAdditionOps, NodeWriteLock, SessionAdditionOps}, + MutationError, NodeWriterT, + }, + recovery_ops::RecoveryOps, }; use db4_graph::{TemporalGraph, WriteLockedGraph}; use raphtory_api::core::{ @@ -36,7 +39,7 @@ use storage::{ resolver::{GIDResolverOps, Initialiser, MaybeInit}, transaction::TransactionManager, wal::LSN, - Extension, LocalPOS, Wal, ES, GS, NS, + ControlFile, Extension, LocalPOS, Wal, ES, GS, NS, }; pub struct AtomicAddEdge<'a, EXT> @@ -754,4 +757,10 @@ impl DurabilityOps for TemporalGraph { fn wal(&self) -> Result<&Wal, MutationError> { Ok(&self.extension().wal()) } + + fn control_file(&self) -> Result<&ControlFile, MutationError> { + Ok(&self.extension().control_file()) + } } + +impl RecoveryOps for TemporalGraph {} diff --git a/raphtory-storage/src/mutation/mod.rs b/raphtory-storage/src/mutation/mod.rs index 57bb85bcb3..7b8b5579fb 100644 --- a/raphtory-storage/src/mutation/mod.rs +++ b/raphtory-storage/src/mutation/mod.rs @@ -32,7 +32,6 @@ use thiserror::Error; pub mod addition_ops; pub mod addition_ops_ext; -pub mod durability_ops; pub mod property_addition_ops; pub type NodeWriterT<'a> = NodeWriter<'a, RwLockWriteGuard<'a, MemNodeSegment>, NS>; diff --git a/raphtory-storage/src/recovery_ops.rs b/raphtory-storage/src/recovery_ops.rs new file mode 100644 index 0000000000..1f94473a9a --- /dev/null +++ b/raphtory-storage/src/recovery_ops.rs @@ -0,0 +1,58 @@ +use storage::{ + persist::control_file::{ControlFileOps, DBState, LAST_CHECKPOINT_INIT}, + wal::{GraphWalOps, WalOps}, +}; + +use crate::{ + durability_ops::DurabilityOps, + mutation::{addition_ops::InternalAdditionOps, MutationError}, +}; + +pub trait RecoveryOps: DurabilityOps + InternalAdditionOps { + /// Recover from a crash if needed by replaying updates from the WAL. + fn run_recovery(&self) -> Result<(), MutationError> { + let wal = self.wal()?; + let control_file = self.control_file()?; + + match control_file.db_state() { + DBState::Shutdown => { + let checkpoint_lsn = control_file.last_checkpoint(); + let end_of_wal_lsn = wal.read_shutdown_checkpoint(checkpoint_lsn)?; + + // LSN after the shutdown checkpoint points to the end of WAL stream. + // Set this as the next LSN for future writes. + wal.set_position(end_of_wal_lsn)?; + } + DBState::Running | DBState::CrashRecovery => { + let checkpoint_lsn = control_file.last_checkpoint(); + + let redo_lsn = if checkpoint_lsn == LAST_CHECKPOINT_INIT { + // No successful checkpoint has been written yet, + // replay from the start of the WAL stream. + 0 + } else { + wal.read_checkpoint(checkpoint_lsn)? + }; + + // Set db state to indicate that recovery is in progress. + control_file.set_db_state(DBState::CrashRecovery); + control_file.save()?; + + let mut write_locked_graph = self.write_lock()?; + let end_of_wal_lsn = wal.replay_to_graph(&mut write_locked_graph, redo_lsn)?; + + // Set the next LSN for future writes to the end of the WAL stream. + wal.set_position(end_of_wal_lsn)?; + } + DBState::NotSupported => { + // Recovery is not supported, skip. + } + } + + // Always set db state to Running after recovery completes. + control_file.set_db_state(DBState::Running); + control_file.save()?; + + Ok(()) + } +} diff --git a/raphtory/src/db/api/mutation/addition_ops.rs b/raphtory/src/db/api/mutation/addition_ops.rs index 76d16b9360..5fc16b820f 100644 --- a/raphtory/src/db/api/mutation/addition_ops.rs +++ b/raphtory/src/db/api/mutation/addition_ops.rs @@ -16,10 +16,12 @@ use raphtory_api::core::{ }, utils::time::{IntoTimeWithFormat, TryIntoInputTime}, }; -use raphtory_storage::mutation::{ - addition_ops::{EdgeWriteLock, InternalAdditionOps, NodeWriteLock}, +use raphtory_storage::{ durability_ops::DurabilityOps, - MutationError, + mutation::{ + addition_ops::{EdgeWriteLock, InternalAdditionOps, NodeWriteLock}, + MutationError, + }, }; use storage::wal::{GraphWalOps, WalOps}; diff --git a/raphtory/src/db/api/mutation/deletion_ops.rs b/raphtory/src/db/api/mutation/deletion_ops.rs index 683991e4b3..2293e8c084 100644 --- a/raphtory/src/db/api/mutation/deletion_ops.rs +++ b/raphtory/src/db/api/mutation/deletion_ops.rs @@ -10,9 +10,9 @@ use crate::{ errors::{into_graph_err, GraphError}, }; use raphtory_api::core::{entities::edges::edge_ref::EdgeRef, utils::time::IntoTimeWithFormat}; -use raphtory_storage::mutation::{ - addition_ops::{EdgeWriteLock, InternalAdditionOps}, +use raphtory_storage::{ durability_ops::DurabilityOps, + mutation::addition_ops::{EdgeWriteLock, InternalAdditionOps}, }; use storage::wal::{GraphWalOps, WalOps}; diff --git a/raphtory/src/db/api/mutation/property_addition_ops.rs b/raphtory/src/db/api/mutation/property_addition_ops.rs index 7f475fdb68..e0c143ad0b 100644 --- a/raphtory/src/db/api/mutation/property_addition_ops.rs +++ b/raphtory/src/db/api/mutation/property_addition_ops.rs @@ -5,9 +5,9 @@ use crate::{ use raphtory_api::core::entities::properties::prop::Prop; use raphtory_storage::{ core_ops::CoreGraphOps, + durability_ops::DurabilityOps, mutation::{ - addition_ops::InternalAdditionOps, durability_ops::DurabilityOps, - property_addition_ops::InternalPropertyAdditionOps, + addition_ops::InternalAdditionOps, property_addition_ops::InternalPropertyAdditionOps, }, }; use storage::wal::{GraphWalOps, WalOps}; diff --git a/raphtory/src/db/api/storage/storage.rs b/raphtory/src/db/api/storage/storage.rs index 09d6a79084..6aba8cfb92 100644 --- a/raphtory/src/db/api/storage/storage.rs +++ b/raphtory/src/db/api/storage/storage.rs @@ -24,17 +24,17 @@ use raphtory_storage::{ mutation::{ addition_ops::{EdgeWriteLock, InternalAdditionOps, SessionAdditionOps}, addition_ops_ext::{AtomicAddEdge, AtomicAddNode, UnlockedSession}, - durability_ops::DurabilityOps, property_addition_ops::InternalPropertyAdditionOps, EdgeWriterT, GraphPropWriterT, NodeWriterT, }, + recovery_ops::RecoveryOps, }; use std::{ fmt::{Display, Formatter}, path::Path, sync::Arc, }; -use storage::wal::{GraphWalOps, WalOps, LSN}; +use storage::wal::LSN; #[cfg(feature = "search")] use { @@ -139,13 +139,9 @@ impl Storage { fn load_with_extension(path: &Path, ext: Extension) -> Result { let temporal_graph = TemporalGraph::load(path, ext)?; - let wal = temporal_graph.wal()?; - // Replay any pending writes from the WAL. - if wal.has_entries()? { - let mut write_locked_graph = temporal_graph.write_lock()?; - wal.replay_to_graph(&mut write_locked_graph)?; - } + // Run crash recovery if needed. + temporal_graph.run_recovery()?; Ok(Self { graph: GraphStorage::Unlocked(Arc::new(temporal_graph)), @@ -157,12 +153,14 @@ impl Storage { pub fn load(path: impl AsRef) -> Result { let path = path.as_ref(); let ext = Extension::load(path)?; + Self::load_with_extension(path, ext) } pub fn load_with_config(path: impl AsRef, config: Config) -> Result { let path = path.as_ref(); let ext = Extension::load_with_config(path, config)?; + Self::load_with_extension(path, ext) } diff --git a/raphtory/src/db/graph/edge.rs b/raphtory/src/db/graph/edge.rs index 7cdb8498f3..e77ceae9bf 100644 --- a/raphtory/src/db/graph/edge.rs +++ b/raphtory/src/db/graph/edge.rs @@ -41,10 +41,10 @@ use raphtory_core::entities::{ nodes::node_ref::{AsNodeRef, NodeRef}, }; use raphtory_storage::{ + durability_ops::DurabilityOps, graph::edges::edge_storage_ops::EdgeStorageOps, mutation::{ addition_ops::{EdgeWriteLock, InternalAdditionOps}, - durability_ops::DurabilityOps, property_addition_ops::InternalPropertyAdditionOps, }, }; diff --git a/raphtory/src/db/graph/node.rs b/raphtory/src/db/graph/node.rs index 0636f3e72b..087d4ee53a 100644 --- a/raphtory/src/db/graph/node.rs +++ b/raphtory/src/db/graph/node.rs @@ -38,10 +38,10 @@ use raphtory_api::core::{ }; use raphtory_storage::{ core_ops::CoreGraphOps, + durability_ops::DurabilityOps, graph::graph::GraphStorage, mutation::{ addition_ops::{InternalAdditionOps, NodeWriteLock}, - durability_ops::DurabilityOps, MutationError, }, }; diff --git a/raphtory/src/serialise/graph_folder.rs b/raphtory/src/serialise/graph_folder.rs index 27b905606b..45a5d5cbfd 100644 --- a/raphtory/src/serialise/graph_folder.rs +++ b/raphtory/src/serialise/graph_folder.rs @@ -268,12 +268,19 @@ pub trait GraphPaths { } else { fs::create_dir_all(self.root())? } - let meta_path = self.relative_data_path()?; - fs::create_dir(self.root().join(&meta_path))?; + + // Create the data folder and have the root metadata file point to it. + let data_path = self.relative_data_path()?; + fs::create_dir(self.root().join(&data_path))?; fs::write( self.root_meta_path(), - serde_json::to_string(&RelativePath { path: meta_path })?, + serde_json::to_string(&RelativePath { path: data_path })?, )?; + + // Create the graph folder inside the data folder. + let graph_path = self.graph_path()?; + fs::create_dir(&graph_path)?; + Ok(()) } }