From d38bf46856d0461dbdffe18ea6dcdb733927c6d2 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Wed, 3 Jun 2026 16:31:41 -0400 Subject: [PATCH 01/24] First try at sqlite-less --- simple/stats/db.py | 99 +++++++++++++++++++++++++++++++ simple/stats/runner.py | 54 +++-------------- simple/tests/stats/runner_test.py | 51 ++++++++++++++++ 3 files changed, 159 insertions(+), 45 deletions(-) diff --git a/simple/stats/db.py b/simple/stats/db.py index e1aa30393..f1ede5472 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -40,6 +40,7 @@ from stats.data import STAT_VAR_GROUP from stats.data import STATISTICAL_VARIABLE from stats.data import Triple +from stats.data import strip_namespace from util.filesystem import create_store from util.filesystem import Dir from util.filesystem import File @@ -335,6 +336,104 @@ def _add_triple(self, triple: Triple): node.add_triple(triple) +class JsonLdStreamDb(Db): + """A DB implementation that streams triples and observations directly to JSON-LD shards on GCS/Disk, bypassing SQLite.""" + + def __init__(self, output_dir, import_names, nodes) -> None: + from datetime import timezone + from stats.jsonld_exporter import DCID_URL + self.output_dir = output_dir + self.import_names = import_names + self.nodes = nodes + + # Generate unique folder name based on import name and timestamp + import_name = None + if isinstance(import_names, list): + if import_names == [constants.ALL_IMPORTS]: + import_name = constants.ALL_IMPORTS + else: + import_name = "_".join(import_names) + + self.import_name = import_name or nodes.config.data.get( + "importName") or "default_import_name" + if self.import_name and "/" in self.import_name: + self.import_name = self.import_name.replace("/", "_") + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f") + unique_dir_name = f"{self.import_name}_{timestamp}" + self.jsonld_dir = output_dir.open_dir("jsonld").open_dir(unique_dir_name) + + self.obs_shard_index = 0 + self.node_shard_index = 0 + self.ns_map = {"dcid": DCID_URL} + + def insert_observations(self, observations_df: pd.DataFrame, input_file: File): + from stats.jsonld_exporter import DCID_URL, _add_observation_to_graph, write_shard + + logging.info("Streaming %s observations to JSON-LD shards in %s", + len(observations_df), self.jsonld_dir.full_path()) + + prov_urls = {} + for prov in self.nodes.provenances.values(): + prov_id = strip_namespace(prov.id) + prov_urls[prov_id] = prov.url + prov_urls[prov.id] = prov.url + + records = observations_df.to_records(index=False).tolist() + + chunk_size = 10000 + DCID = Namespace(DCID_URL) + + for i in range(0, len(records), chunk_size): + chunk = records[i:i + chunk_size] + g = Graph() + g.bind("dcid", DCID) + + for row in chunk: + _add_observation_to_graph(g, row, DCID, prov_urls) + + write_shard(g, self.obs_shard_index, self.jsonld_dir, self.ns_map, + prefix="observation") + self.obs_shard_index += 1 + + def insert_triples(self, triples: list[Triple]): + from stats.jsonld_exporter import DCID_URL, expand_id, write_shard + + logging.info("Streaming %s triples to JSON-LD shards in %s", + len(triples), self.jsonld_dir.full_path()) + + chunk_size = 10000 + DCID = Namespace(DCID_URL) + + for i in range(0, len(triples), chunk_size): + chunk = triples[i:i + chunk_size] + g = Graph() + g.bind("dcid", DCID) + + for row in chunk: + sub = expand_id(row.subject_id) + p = expand_id(row.predicate) + if row.object_id: + o = expand_id(row.object_id) + else: + o = Literal(row.object_value) + + if row.predicate == 'typeOf': + g.add((sub, RDF.type, o)) + else: + g.add((sub, p, o)) + + write_shard(g, self.node_shard_index, self.jsonld_dir, self.ns_map, + prefix="node") + self.node_shard_index += 1 + + def commit(self): + pass + + def commit_and_close(self): + pass + + class SqlDb(Db): """Class to insert triples and observations into a SQL DB.""" diff --git a/simple/stats/runner.py b/simple/stats/runner.py index 37f891cf5..63e910648 100644 --- a/simple/stats/runner.py +++ b/simple/stats/runner.py @@ -41,6 +41,7 @@ from stats.db import get_sqlite_path_from_env from stats.db import ImportStatus from stats.db import TYPE_CLOUD_SQL +from stats.db import JsonLdStreamDb from stats.db_cache import get_db_cache_from_env from stats.db_transfer import transfer_sqlite_to_cloud_sql from stats.entities_importer import EntitiesImporter @@ -159,7 +160,9 @@ def __init__( _check_not_overlapping(input_store, output_store) self.all_stores.append(output_store) self.output_dir = output_store.as_dir() - self.nl_dir = self.output_dir.open_dir(constants.NL_DIR_NAME) + self.nl_dir = None + if self.mode != RunMode.DCP_BRIDGE: + self.nl_dir = self.output_dir.open_dir(constants.NL_DIR_NAME) self.process_dir = self.output_dir.open_dir(constants.PROCESS_DIR_NAME) # Reporter. @@ -712,58 +715,19 @@ def _create_importer(self, input_file: File) -> Importer: f"Unsupported import type: {import_type} ({input_file.full_path()})") def _run_imports_and_export_jsonld(self): - # Force local SQLite DB for staging data in dcpbridge mode - logging.info("Forcing local SQLite DB for staging data in dcpbridge mode") - sqlite_file = self.output_dir.open_file("staging.db") - db_cfg = create_sqlite_config(sqlite_file) - self.db = create_and_update_db(db_cfg) - - # Clear tables if needed - self.db.maybe_clear_before_import() + logging.info("Initializing JsonLdStreamDb to stream JSON-LD directly to GCS/Disk") + self.db = JsonLdStreamDb(self.output_dir, self.import_names, self.nodes) # Run data imports (CSV and MCF) self._run_all_data_imports() - # Generate triples from nodes + # Generate triples from nodes and write directly triples = self.nodes.triples() self.db.insert_triples(triples) - # Generate SVG hierarchy - self._generate_svg_hierarchy() - - # Generate NL artifacts - self._generate_nl_artifacts() - - # Write import info - self.db.insert_import_info(status=ImportStatus.SUCCESS) - - # Export to JSON-LD - jsonld_dir = self.output_dir.open_dir("jsonld") - - # Create a unique subfolder based on import name and timestamp for parallel runs - import_name = None - import_names = self.import_names - if isinstance(import_names, list): - if import_names == [constants.ALL_IMPORTS]: - import_name = constants.ALL_IMPORTS - else: - import_name = "_".join(import_names) - - # TODO(gmechali): Remove the fallbacks. - import_name = import_name or self.config.data.get( - "importName") or "default_import_name" - if import_name and "/" in import_name: - import_name = import_name.replace("/", "_") - - timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f") - unique_dir_name = f"{import_name}_{timestamp}" - unique_jsonld_dir = jsonld_dir.open_dir(unique_dir_name) - - self.db.commit() - export_to_jsonld(self.db, unique_jsonld_dir) - # Auto-trigger workflow if output is on GCS - output_path = unique_jsonld_dir.full_path() + output_path = self.db.jsonld_dir.full_path() + import_name = self.db.import_name if os.getenv("INGESTION_WORKFLOW_NAME") and output_path.startswith("gs://"): gcs_pattern = f"{output_path.rstrip('/')}/*.jsonld" trigger_ingestion_workflow(gcs_pattern, import_name) diff --git a/simple/tests/stats/runner_test.py b/simple/tests/stats/runner_test.py index 7a8638702..1aca65a3f 100644 --- a/simple/tests/stats/runner_test.py +++ b/simple/tests/stats/runner_test.py @@ -235,6 +235,57 @@ def test_with_redis_db_cache_schema_update(self): # Redis cache should NOT be cleared in schema update mode. self.assertEqual(1, len(fake_redis.keys("*"))) + def test_dcp_bridge(self): + self.maxDiff = None + with tempfile.TemporaryDirectory() as temp_dir: + input_dir = os.path.join(_INPUT_DIR, "input_dir_driven") + dc_client.get_property_of_entities = mock.MagicMock(return_value={}) + + Runner( + config_file_path=None, + input_dir_path=input_dir, + output_dir_path=temp_dir, + mode=RunMode.DCP_BRIDGE, + ).run() + + # Verify that NO SQLite database file is created + db_path = os.path.join(temp_dir, "datacommons.db") + self.assertFalse(os.path.exists(db_path)) + + # Verify that NO nl directory is created (since GCS/local embeddings are stripped/disabled) + nl_dir = os.path.join(temp_dir, "nl") + self.assertFalse(os.path.exists(nl_dir)) + + # Verify that a jsonld directory is created + jsonld_dir = os.path.join(temp_dir, "jsonld") + self.assertTrue(os.path.exists(jsonld_dir)) + + # Find the subdirectory inside jsonld/ + subdirs = os.listdir(jsonld_dir) + # There should be exactly 1 folder in jsonld/ + self.assertEqual(len(subdirs), 1) + timestamped_dir = os.path.join(jsonld_dir, subdirs[0]) + self.assertTrue(os.path.isdir(timestamped_dir)) + + # Ensure the timestamped directory has files + shard_files = os.listdir(timestamped_dir) + self.assertGreater(len(shard_files), 0) + + # Check that we have both node and observation shard files + node_shards = [f for f in shard_files if f.startswith("node-")] + obs_shards = [f for f in shard_files if f.startswith("observation-")] + + self.assertGreater(len(node_shards), 0) + self.assertGreater(len(obs_shards), 0) + + # Verify that files are valid JSON-LD + for filename in shard_files: + filepath = os.path.join(timestamped_dir, filename) + self.assertTrue(filename.endswith(".jsonld")) + with open(filepath, "r") as f: + data = json.load(f) + self.assertTrue(isinstance(data, (dict, list))) + def test_read_configs_from_subdirs(self): with tempfile.TemporaryDirectory() as temp_dir: # Create subdirectories From 87a2509b7a48d33133fc8772a70a5c6831288673 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Wed, 3 Jun 2026 17:20:07 -0400 Subject: [PATCH 02/24] Multi threaded --- simple/stats/db.py | 94 ++++++++++++++++++++++++++++++++-------------- 1 file changed, 66 insertions(+), 28 deletions(-) diff --git a/simple/stats/db.py b/simple/stats/db.py index f1ede5472..df16219ba 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -336,6 +336,52 @@ def _add_triple(self, triple: Triple): node.add_triple(triple) +def _write_observation_shard(args): + chunk, shard_index, jsonld_dir_path, ns_map, prov_urls = args + from rdflib import Graph, Namespace + from stats.jsonld_exporter import DCID_URL, _add_observation_to_graph, write_shard + from util.filesystem import create_store + + DCID = Namespace(DCID_URL) + g = Graph() + g.bind("dcid", DCID) + + for row in chunk: + _add_observation_to_graph(g, row, DCID, prov_urls) + + with create_store(jsonld_dir_path) as store: + output_dir = store.as_dir() + write_shard(g, shard_index, output_dir, ns_map, prefix="observation") + + +def _write_node_shard(args): + chunk, shard_index, jsonld_dir_path, ns_map = args + from rdflib import Graph, Namespace, RDF, Literal + from stats.jsonld_exporter import DCID_URL, expand_id, write_shard + from util.filesystem import create_store + + DCID = Namespace(DCID_URL) + g = Graph() + g.bind("dcid", DCID) + + for row in chunk: + sub = expand_id(row.subject_id) + p = expand_id(row.predicate) + if row.object_id: + o = expand_id(row.object_id) + else: + o = Literal(row.object_value) + + if row.predicate == 'typeOf': + g.add((sub, RDF.type, o)) + else: + g.add((sub, p, o)) + + with create_store(jsonld_dir_path) as store: + output_dir = store.as_dir() + write_shard(g, shard_index, output_dir, ns_map, prefix="node") + + class JsonLdStreamDb(Db): """A DB implementation that streams triples and observations directly to JSON-LD shards on GCS/Disk, bypassing SQLite.""" @@ -368,7 +414,7 @@ def __init__(self, output_dir, import_names, nodes) -> None: self.ns_map = {"dcid": DCID_URL} def insert_observations(self, observations_df: pd.DataFrame, input_file: File): - from stats.jsonld_exporter import DCID_URL, _add_observation_to_graph, write_shard + import multiprocessing logging.info("Streaming %s observations to JSON-LD shards in %s", len(observations_df), self.jsonld_dir.full_path()) @@ -382,50 +428,42 @@ def insert_observations(self, observations_df: pd.DataFrame, input_file: File): records = observations_df.to_records(index=False).tolist() chunk_size = 10000 - DCID = Namespace(DCID_URL) + jsonld_dir_path = self.jsonld_dir.full_path() + args_list = [] for i in range(0, len(records), chunk_size): chunk = records[i:i + chunk_size] - g = Graph() - g.bind("dcid", DCID) + args_list.append((chunk, self.obs_shard_index, jsonld_dir_path, self.ns_map, prov_urls)) + self.obs_shard_index += 1 - for row in chunk: - _add_observation_to_graph(g, row, DCID, prov_urls) + num_processes = min(multiprocessing.cpu_count(), 8) + logging.info("Starting observations export with %d processes for %d chunks", + num_processes, len(args_list)) - write_shard(g, self.obs_shard_index, self.jsonld_dir, self.ns_map, - prefix="observation") - self.obs_shard_index += 1 + with multiprocessing.Pool(processes=num_processes) as pool: + pool.map(_write_observation_shard, args_list) def insert_triples(self, triples: list[Triple]): - from stats.jsonld_exporter import DCID_URL, expand_id, write_shard + import multiprocessing logging.info("Streaming %s triples to JSON-LD shards in %s", len(triples), self.jsonld_dir.full_path()) chunk_size = 10000 - DCID = Namespace(DCID_URL) + jsonld_dir_path = self.jsonld_dir.full_path() + args_list = [] for i in range(0, len(triples), chunk_size): chunk = triples[i:i + chunk_size] - g = Graph() - g.bind("dcid", DCID) - - for row in chunk: - sub = expand_id(row.subject_id) - p = expand_id(row.predicate) - if row.object_id: - o = expand_id(row.object_id) - else: - o = Literal(row.object_value) + args_list.append((chunk, self.node_shard_index, jsonld_dir_path, self.ns_map)) + self.node_shard_index += 1 - if row.predicate == 'typeOf': - g.add((sub, RDF.type, o)) - else: - g.add((sub, p, o)) + num_processes = min(multiprocessing.cpu_count(), 8) + logging.info("Starting nodes export with %d processes for %d chunks", + num_processes, len(args_list)) - write_shard(g, self.node_shard_index, self.jsonld_dir, self.ns_map, - prefix="node") - self.node_shard_index += 1 + with multiprocessing.Pool(processes=num_processes) as pool: + pool.map(_write_node_shard, args_list) def commit(self): pass From e7436d832138399059a52228935dfa85b7ceeb7e Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 10:31:30 -0400 Subject: [PATCH 03/24] More speedu p --- simple/stats/db.py | 102 +++++++++++++++++++++++---------------------- 1 file changed, 53 insertions(+), 49 deletions(-) diff --git a/simple/stats/db.py b/simple/stats/db.py index df16219ba..38c62e28d 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -412,64 +412,68 @@ def __init__(self, output_dir, import_names, nodes) -> None: self.obs_shard_index = 0 self.node_shard_index = 0 self.ns_map = {"dcid": DCID_URL} + self._obs_dfs = [] + self._triples = [] def insert_observations(self, observations_df: pd.DataFrame, input_file: File): - import multiprocessing - - logging.info("Streaming %s observations to JSON-LD shards in %s", - len(observations_df), self.jsonld_dir.full_path()) - - prov_urls = {} - for prov in self.nodes.provenances.values(): - prov_id = strip_namespace(prov.id) - prov_urls[prov_id] = prov.url - prov_urls[prov.id] = prov.url - - records = observations_df.to_records(index=False).tolist() - - chunk_size = 10000 - jsonld_dir_path = self.jsonld_dir.full_path() - - args_list = [] - for i in range(0, len(records), chunk_size): - chunk = records[i:i + chunk_size] - args_list.append((chunk, self.obs_shard_index, jsonld_dir_path, self.ns_map, prov_urls)) - self.obs_shard_index += 1 - - num_processes = min(multiprocessing.cpu_count(), 8) - logging.info("Starting observations export with %d processes for %d chunks", - num_processes, len(args_list)) - - with multiprocessing.Pool(processes=num_processes) as pool: - pool.map(_write_observation_shard, args_list) + if not observations_df.empty: + self._obs_dfs.append(observations_df) def insert_triples(self, triples: list[Triple]): - import multiprocessing - - logging.info("Streaming %s triples to JSON-LD shards in %s", - len(triples), self.jsonld_dir.full_path()) - - chunk_size = 10000 - jsonld_dir_path = self.jsonld_dir.full_path() - - args_list = [] - for i in range(0, len(triples), chunk_size): - chunk = triples[i:i + chunk_size] - args_list.append((chunk, self.node_shard_index, jsonld_dir_path, self.ns_map)) - self.node_shard_index += 1 - - num_processes = min(multiprocessing.cpu_count(), 8) - logging.info("Starting nodes export with %d processes for %d chunks", - num_processes, len(args_list)) - - with multiprocessing.Pool(processes=num_processes) as pool: - pool.map(_write_node_shard, args_list) + if triples: + self._triples.extend(triples) def commit(self): pass def commit_and_close(self): - pass + import multiprocessing + num_processes = min(multiprocessing.cpu_count(), 8) + + # Export observations in parallel + if self._obs_dfs: + logging.info("Combining and exporting buffered observations to JSON-LD shards in %s", + self.jsonld_dir.full_path()) + combined_obs_df = pd.concat(self._obs_dfs, ignore_index=True) + records = combined_obs_df.to_records(index=False).tolist() + + prov_urls = {} + for prov in self.nodes.provenances.values(): + prov_id = strip_namespace(prov.id) + prov_urls[prov_id] = prov.url + prov_urls[prov.id] = prov.url + + chunk_size = 10000 + jsonld_dir_path = self.jsonld_dir.full_path() + + obs_args = [] + for i in range(0, len(records), chunk_size): + chunk = records[i:i + chunk_size] + obs_args.append((chunk, self.obs_shard_index, jsonld_dir_path, self.ns_map, prov_urls)) + self.obs_shard_index += 1 + + logging.info("Starting observations export with %d processes for %d chunks", + num_processes, len(obs_args)) + with multiprocessing.Pool(processes=num_processes) as pool: + pool.map(_write_observation_shard, obs_args) + + # Export triples in parallel + if self._triples: + logging.info("Exporting buffered %d triples to JSON-LD shards in %s", + len(self._triples), self.jsonld_dir.full_path()) + chunk_size = 10000 + jsonld_dir_path = self.jsonld_dir.full_path() + + node_args = [] + for i in range(0, len(self._triples), chunk_size): + chunk = self._triples[i:i + chunk_size] + node_args.append((chunk, self.node_shard_index, jsonld_dir_path, self.ns_map)) + self.node_shard_index += 1 + + logging.info("Starting nodes export with %d processes for %d chunks", + num_processes, len(node_args)) + with multiprocessing.Pool(processes=num_processes) as pool: + pool.map(_write_node_shard, node_args) class SqlDb(Db): From db0877d74acbbab6107f6731713c16fdcf74e3e0 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 10:47:39 -0400 Subject: [PATCH 04/24] Logging buffering --- simple/stats/logger.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/simple/stats/logger.py b/simple/stats/logger.py index c15c1a8a5..f85a254aa 100644 --- a/simple/stats/logger.py +++ b/simple/stats/logger.py @@ -37,6 +37,12 @@ def initialize_logger(): logging.root.removeHandler(handler) # Initialize logging + try: + sys.stdout.reconfigure(line_buffering=True) + sys.stderr.reconfigure(line_buffering=True) + except AttributeError: + # In some test environments sys.stdout may not support reconfigure + pass logger = logging.getLogger() logger.setLevel(log_level) handler = logging.StreamHandler(sys.stdout) From bd4865e156499398f8cc4b367965c326df900d78 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 10:56:04 -0400 Subject: [PATCH 05/24] Threadit --- simple/stats/db.py | 8 ++++++-- simple/stats/nodes.py | 22 ++++++++++++++++++++++ simple/stats/reporter.py | 12 ++++++++---- simple/stats/runner.py | 24 ++++++++++++++++++++---- 4 files changed, 56 insertions(+), 10 deletions(-) diff --git a/simple/stats/db.py b/simple/stats/db.py index 38c62e28d..58869ed56 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -412,16 +412,20 @@ def __init__(self, output_dir, import_names, nodes) -> None: self.obs_shard_index = 0 self.node_shard_index = 0 self.ns_map = {"dcid": DCID_URL} + import threading + self.lock = threading.Lock() self._obs_dfs = [] self._triples = [] def insert_observations(self, observations_df: pd.DataFrame, input_file: File): if not observations_df.empty: - self._obs_dfs.append(observations_df) + with self.lock: + self._obs_dfs.append(observations_df) def insert_triples(self, triples: list[Triple]): if triples: - self._triples.extend(triples) + with self.lock: + self._triples.extend(triples) def commit(self): pass diff --git a/simple/stats/nodes.py b/simple/stats/nodes.py index df31d1259..fd636bd8b 100644 --- a/simple/stats/nodes.py +++ b/simple/stats/nodes.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +from functools import wraps import re import pandas as pd @@ -52,9 +53,20 @@ url="custom-import") +def thread_safe(func): + """Decorator to make a method thread-safe using the object's reentrant lock.""" + @wraps(func) + def wrapper(self, *args, **kwargs): + with self.lock: + return func(self, *args, **kwargs) + return wrapper + + class Nodes: def __init__(self, config: Config) -> None: + import threading + self.lock = threading.RLock() self.config = config # Custom namespace self._custom_id_namespace = self.config.custom_id_namespace() @@ -122,10 +134,12 @@ def _source_id(self, source_cfg: Source | None) -> str: return source.id + @thread_safe def provenance(self, input_file: File) -> Provenance: prov_name = self.config.provenance_name(input_file) return self.provenances.get(prov_name, _DEFAULT_PROVENANCE) + @thread_safe def variable(self, sv_column_name: str, input_file: File) -> StatVar: if not sv_column_name in self.variables: var_cfg = self.config.variable(sv_column_name) @@ -142,6 +156,7 @@ def variable(self, sv_column_name: str, input_file: File) -> StatVar: return self._add_provenance(self.variables[sv_column_name], self.provenance(input_file)) + @thread_safe def property(self, property_column_name: str) -> Property: if not property_column_name in self.properties: self.properties[property_column_name] = Property( @@ -149,6 +164,7 @@ def property(self, property_column_name: str) -> Property: return self.properties[property_column_name] + @thread_safe def event_type(self, event_type_name: str, input_file: File) -> EventType: if not event_type_name in self.event_types: event_type_cfg = self.config.event(event_type_name) @@ -160,6 +176,7 @@ def event_type(self, event_type_name: str, input_file: File) -> EventType: return self.event_types[event_type_name].add_provenance( self.provenance(input_file)) + @thread_safe def entity_type(self, entity_type_name: str, input_file: File) -> EntityType: if not entity_type_name in self.entity_types: entity_type_cfg = self.config.entity(entity_type_name) @@ -227,6 +244,7 @@ def _entity_type_id(self, entity_type_name: str) -> str: self._entity_type_generated_id_count += 1 return f"{_CUSTOM_ENTITY_TYPE_ID_PREFIX}{self._entity_type_generated_id_count}" + @thread_safe def group(self, group_path: str) -> StatVarGroup | None: if not group_path: return self._default_custom_group() @@ -257,14 +275,17 @@ def _default_custom_group(self) -> StatVarGroup: self.groups[_DEFAULT_CUSTOM_GROUP_PATH] = svg return self.groups[_DEFAULT_CUSTOM_GROUP_PATH] + @thread_safe def entity_with_type(self, entity_dcid: str, entity_type: str): if entity_dcid not in self.entities: self.entities[entity_dcid] = Entity(entity_dcid, entity_type) + @thread_safe def entities_with_type(self, entity_dcids: list[str], entity_type: str): for entity_dcid in entity_dcids: self.entity_with_type(entity_dcid, entity_type) + @thread_safe def entities_with_types(self, dcid2type: dict[str, str]): """ Adds each dcid2type mapping to the list of entities with their types. @@ -273,6 +294,7 @@ def entities_with_types(self, dcid2type: dict[str, str]): for entity_dcid, entity_type in dcid2type.items(): self.entity_with_type(entity_dcid, entity_type) + @thread_safe def triples(self, triples_file: File | None = None) -> list[Triple]: triples: list[Triple] = [] for source in self.sources.values(): diff --git a/simple/stats/reporter.py b/simple/stats/reporter.py index 51232dd38..363f20f6b 100644 --- a/simple/stats/reporter.py +++ b/simple/stats/reporter.py @@ -44,6 +44,8 @@ class ImportReporter: """ def __init__(self, report_file: File) -> None: + import threading + self.lock = threading.RLock() self.status = Status.NOT_STARTED self.start_time = None self.last_update = datetime.now() @@ -58,8 +60,9 @@ def _report(func): @wraps(func) def wrapper(self, *args, **kwargs): - result = func(self, *args, **kwargs) - ImportReporter.save(self) + with self.lock: + result = func(self, *args, **kwargs) + ImportReporter.save(self) return result return wrapper @@ -87,8 +90,9 @@ def get_file_reporter(self, import_file: File): return self.file_reporters_by_full_path[import_file.full_path()] def recompute_progress(self): - self._compute_all_done() - self.save() + with self.lock: + self._compute_all_done() + self.save() def _compute_all_done(self): if self._all_file_imports(Status.SUCCESS): diff --git a/simple/stats/runner.py b/simple/stats/runner.py index 63e910648..ed8afce68 100644 --- a/simple/stats/runner.py +++ b/simple/stats/runner.py @@ -643,10 +643,26 @@ def _run_all_data_imports(self): self.reporter.report_started(import_files=list(input_csv_files + input_mcf_files)) - for input_csv_file in input_csv_files: - self._run_single_import(input_csv_file) - for input_mcf_file in input_mcf_files: - self._run_single_mcf_import(input_mcf_file) + if self.mode == RunMode.DCP_BRIDGE: + import concurrent.futures + num_threads = min(32, (len(input_csv_files) + len(input_mcf_files)) or 1) + logging.info("Starting parallel ingestion of data files with %d threads", num_threads) + + with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [] + for input_csv_file in input_csv_files: + futures.append(executor.submit(self._run_single_import, input_csv_file)) + for input_mcf_file in input_mcf_files: + futures.append(executor.submit(self._run_single_mcf_import, input_mcf_file)) + + # Wait for all files to be processed and propagate any exception + for future in concurrent.futures.as_completed(futures): + future.result() + else: + for input_csv_file in input_csv_files: + self._run_single_import(input_csv_file) + for input_mcf_file in input_mcf_files: + self._run_single_mcf_import(input_mcf_file) def _run_single_import(self, input_file: File): logging.info("Importing file: %s", input_file) From 4a7bfad3a76fe8ab6f53ae37d5cc6d976065e9f1 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 11:26:04 -0400 Subject: [PATCH 06/24] more parallelization --- simple/stats/db.py | 121 ++++++++++++++++++++++++++++++++++++--------- 1 file changed, 99 insertions(+), 22 deletions(-) diff --git a/simple/stats/db.py b/simple/stats/db.py index 58869ed56..95a8bc612 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -338,20 +338,96 @@ def _add_triple(self, triple: Triple): def _write_observation_shard(args): chunk, shard_index, jsonld_dir_path, ns_map, prov_urls = args - from rdflib import Graph, Namespace - from stats.jsonld_exporter import DCID_URL, _add_observation_to_graph, write_shard from util.filesystem import create_store + import hashlib + import json + import logging - DCID = Namespace(DCID_URL) - g = Graph() - g.bind("dcid", DCID) - + def _uri_ref(val): + if not val: + return None + if val.startswith("http://") or val.startswith("https://"): + return {"@id": val} + if val.startswith("dcid:"): + return {"@id": val} + return {"@id": f"dcid:{val.lstrip('/')}"} + + graph_list = [] + for row in chunk: - _add_observation_to_graph(g, row, DCID, prov_urls) + entity, variable, date, value, provenance, unit, scaling_factor, mmethod, period, props = row + + key = f"{entity}_{variable}_{date}_{provenance}_{unit}_{mmethod}_{period}" + obs_hash = hashlib.sha256(key.encode('utf-8')).hexdigest() + + obs_obj = { + "@id": f"dcid:obs_{obs_hash}", + "@type": "dcid:StatVarObservation", + "dcid:observationAbout": _uri_ref(entity), + "dcid:variableMeasured": _uri_ref(variable) + } + + # Format date + try: + if str(date).isdigit(): + obs_obj["dcid:observationDate"] = int(date) + else: + try: + obs_obj["dcid:observationDate"] = float(date) + except ValueError: + obs_obj["dcid:observationDate"] = str(date) + except Exception: + obs_obj["dcid:observationDate"] = str(date) + + # Format value + try: + if '.' in str(value): + obs_obj["dcid:value"] = float(value) + else: + obs_obj["dcid:value"] = int(value) + except ValueError: + obs_obj["dcid:value"] = value + + if provenance: + obs_obj["dcid:provenance"] = _uri_ref(provenance) + if provenance in prov_urls and prov_urls[provenance]: + obs_obj["dcid:provenanceUrl"] = prov_urls[provenance] + if unit: + obs_obj["dcid:unit"] = _uri_ref(unit) + if scaling_factor: + try: + if '.' in str(scaling_factor): + obs_obj["dcid:scalingFactor"] = float(scaling_factor) + else: + obs_obj["dcid:scalingFactor"] = int(scaling_factor) + except ValueError: + obs_obj["dcid:scalingFactor"] = scaling_factor + if mmethod: + obs_obj["dcid:measurementMethod"] = _uri_ref(mmethod) + if period: + obs_obj["dcid:observationPeriod"] = period + + if props: + try: + props_dict = json.loads(props) + for k, v in props_dict.items(): + prop_key = f"dcid:{k}" if not k.startswith("dcid:") and not k.startswith("http") else k + obs_obj[prop_key] = v + except Exception: + pass + + graph_list.append(obs_obj) + + compacted_jsonld = { + "@context": ns_map, + "@graph": graph_list + } + shard_name = f"observation-{shard_index:05d}.jsonld" with create_store(jsonld_dir_path) as store: output_dir = store.as_dir() - write_shard(g, shard_index, output_dir, ns_map, prefix="observation") + output_dir.open_file(shard_name).write(json.dumps(compacted_jsonld, indent=4)) + logging.info(f"Saved JSON-LD shard to {shard_name}") def _write_node_shard(args): @@ -434,9 +510,12 @@ def commit_and_close(self): import multiprocessing num_processes = min(multiprocessing.cpu_count(), 8) - # Export observations in parallel + obs_args = [] + node_args = [] + + # Prepare observations chunks if self._obs_dfs: - logging.info("Combining and exporting buffered observations to JSON-LD shards in %s", + logging.info("Combining and preparing buffered observations to JSON-LD shards in %s", self.jsonld_dir.full_path()) combined_obs_df = pd.concat(self._obs_dfs, ignore_index=True) records = combined_obs_df.to_records(index=False).tolist() @@ -450,34 +529,32 @@ def commit_and_close(self): chunk_size = 10000 jsonld_dir_path = self.jsonld_dir.full_path() - obs_args = [] for i in range(0, len(records), chunk_size): chunk = records[i:i + chunk_size] obs_args.append((chunk, self.obs_shard_index, jsonld_dir_path, self.ns_map, prov_urls)) self.obs_shard_index += 1 - logging.info("Starting observations export with %d processes for %d chunks", - num_processes, len(obs_args)) - with multiprocessing.Pool(processes=num_processes) as pool: - pool.map(_write_observation_shard, obs_args) - - # Export triples in parallel + # Prepare triples chunks if self._triples: - logging.info("Exporting buffered %d triples to JSON-LD shards in %s", + logging.info("Preparing buffered %d triples to JSON-LD shards in %s", len(self._triples), self.jsonld_dir.full_path()) chunk_size = 10000 jsonld_dir_path = self.jsonld_dir.full_path() - node_args = [] for i in range(0, len(self._triples), chunk_size): chunk = self._triples[i:i + chunk_size] node_args.append((chunk, self.node_shard_index, jsonld_dir_path, self.ns_map)) self.node_shard_index += 1 - logging.info("Starting nodes export with %d processes for %d chunks", - num_processes, len(node_args)) + # Execute exports in parallel using a single Pool + if obs_args or node_args: + logging.info("Starting JSON-LD export with %d processes (observations: %d chunks, nodes: %d chunks)", + num_processes, len(obs_args), len(node_args)) with multiprocessing.Pool(processes=num_processes) as pool: - pool.map(_write_node_shard, node_args) + if obs_args: + pool.map(_write_observation_shard, obs_args) + if node_args: + pool.map(_write_node_shard, node_args) class SqlDb(Db): From fc01fc8f022407b059c513fb6e1672c3b8c7ebd0 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 11:31:14 -0400 Subject: [PATCH 07/24] fix timing for workflow trigger --- simple/stats/runner.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/simple/stats/runner.py b/simple/stats/runner.py index ed8afce68..00af6b83e 100644 --- a/simple/stats/runner.py +++ b/simple/stats/runner.py @@ -172,6 +172,7 @@ def __init__( self.nodes = Nodes(self.config) self.db = None self.db_cache = None + self.trigger_workflow_info = None def run(self): # Check if blue-green is enabled @@ -216,6 +217,11 @@ def run(self): store.close() logging.info("File storage closed.") + # Auto-trigger workflow now that all data is guaranteed to be exported and written to GCS + if self.trigger_workflow_info: + gcs_pattern, import_name = self.trigger_workflow_info + trigger_ingestion_workflow(gcs_pattern, import_name) + except Exception as e: logging.exception("Error updating stats") self.reporter.report_failure(error=str(e)) @@ -746,7 +752,7 @@ def _run_imports_and_export_jsonld(self): import_name = self.db.import_name if os.getenv("INGESTION_WORKFLOW_NAME") and output_path.startswith("gs://"): gcs_pattern = f"{output_path.rstrip('/')}/*.jsonld" - trigger_ingestion_workflow(gcs_pattern, import_name) + self.trigger_workflow_info = (gcs_pattern, import_name) else: logging.info( "Output is local or workflow is missing, skipping auto-trigger of ingestion workflow. Please upload files to GCS and trigger manually." From 6a1dbec3d801722b7a2f6098c0e1c03f39bea7c0 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 11:34:09 -0400 Subject: [PATCH 08/24] Export faster --- simple/stats/db.py | 114 +++++++++++++++++++++++++++------------------ 1 file changed, 69 insertions(+), 45 deletions(-) diff --git a/simple/stats/db.py b/simple/stats/db.py index 95a8bc612..2c7e3fd35 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -508,53 +508,77 @@ def commit(self): def commit_and_close(self): import multiprocessing + import tempfile + import os + import concurrent.futures + from util.filesystem import create_store + num_processes = min(multiprocessing.cpu_count(), 8) - obs_args = [] - node_args = [] - - # Prepare observations chunks - if self._obs_dfs: - logging.info("Combining and preparing buffered observations to JSON-LD shards in %s", - self.jsonld_dir.full_path()) - combined_obs_df = pd.concat(self._obs_dfs, ignore_index=True) - records = combined_obs_df.to_records(index=False).tolist() - - prov_urls = {} - for prov in self.nodes.provenances.values(): - prov_id = strip_namespace(prov.id) - prov_urls[prov_id] = prov.url - prov_urls[prov.id] = prov.url - - chunk_size = 10000 - jsonld_dir_path = self.jsonld_dir.full_path() - - for i in range(0, len(records), chunk_size): - chunk = records[i:i + chunk_size] - obs_args.append((chunk, self.obs_shard_index, jsonld_dir_path, self.ns_map, prov_urls)) - self.obs_shard_index += 1 - - # Prepare triples chunks - if self._triples: - logging.info("Preparing buffered %d triples to JSON-LD shards in %s", - len(self._triples), self.jsonld_dir.full_path()) - chunk_size = 10000 - jsonld_dir_path = self.jsonld_dir.full_path() - - for i in range(0, len(self._triples), chunk_size): - chunk = self._triples[i:i + chunk_size] - node_args.append((chunk, self.node_shard_index, jsonld_dir_path, self.ns_map)) - self.node_shard_index += 1 - - # Execute exports in parallel using a single Pool - if obs_args or node_args: - logging.info("Starting JSON-LD export with %d processes (observations: %d chunks, nodes: %d chunks)", - num_processes, len(obs_args), len(node_args)) - with multiprocessing.Pool(processes=num_processes) as pool: - if obs_args: - pool.map(_write_observation_shard, obs_args) - if node_args: - pool.map(_write_node_shard, node_args) + with tempfile.TemporaryDirectory() as temp_local_dir: + logging.info("Using local temporary directory for export buffering: %s", temp_local_dir) + + obs_args = [] + node_args = [] + + # Prepare observations chunks + if self._obs_dfs: + logging.info("Combining and preparing buffered observations to JSON-LD shards") + combined_obs_df = pd.concat(self._obs_dfs, ignore_index=True) + records = combined_obs_df.to_records(index=False).tolist() + + prov_urls = {} + for prov in self.nodes.provenances.values(): + prov_id = strip_namespace(prov.id) + prov_urls[prov_id] = prov.url + prov_urls[prov.id] = prov.url + + chunk_size = 10000 + + for i in range(0, len(records), chunk_size): + chunk = records[i:i + chunk_size] + obs_args.append((chunk, self.obs_shard_index, temp_local_dir, self.ns_map, prov_urls)) + self.obs_shard_index += 1 + + # Prepare triples chunks + if self._triples: + logging.info("Preparing buffered %d triples to JSON-LD shards", + len(self._triples)) + chunk_size = 10000 + + for i in range(0, len(self._triples), chunk_size): + chunk = self._triples[i:i + chunk_size] + node_args.append((chunk, self.node_shard_index, temp_local_dir, self.ns_map)) + self.node_shard_index += 1 + + # Execute exports in parallel locally (very fast, no GCS latency) + if obs_args or node_args: + logging.info("Starting JSON-LD local export with %d processes (observations: %d chunks, nodes: %d chunks)", + num_processes, len(obs_args), len(node_args)) + with multiprocessing.Pool(processes=num_processes) as pool: + if obs_args: + pool.map(_write_observation_shard, obs_args) + if node_args: + pool.map(_write_node_shard, node_args) + + # Bulk upload all generated JSON-LD shards from local temp dir to self.jsonld_dir + files_to_upload = sorted(os.listdir(temp_local_dir)) + if files_to_upload: + logging.info("Bulk uploading %d JSON-LD shards to target directory %s in parallel", + len(files_to_upload), self.jsonld_dir.full_path()) + + local_store = create_store(temp_local_dir).as_dir() + target_store = self.jsonld_dir + + def _upload_fn(filename): + content = local_store.open_file(filename).read() + target_store.open_file(filename).write(content) + + # Use 32 threads for high concurrency GCS uploads + with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor: + list(executor.map(_upload_fn, files_to_upload)) + + logging.info("Bulk upload of JSON-LD shards completed successfully.") class SqlDb(Db): From 97b2e8156e6cccd17285cbbcd0033dd3ba2bee82 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 11:58:50 -0400 Subject: [PATCH 09/24] Adds a global counter --- simple/stats/runner.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/simple/stats/runner.py b/simple/stats/runner.py index 00af6b83e..3f6cc0735 100644 --- a/simple/stats/runner.py +++ b/simple/stats/runner.py @@ -649,6 +649,11 @@ def _run_all_data_imports(self): self.reporter.report_started(import_files=list(input_csv_files + input_mcf_files)) + import threading + self._completed_files_count = 0 + self._total_files_count = len(input_csv_files) + len(input_mcf_files) + self._counter_lock = threading.Lock() + if self.mode == RunMode.DCP_BRIDGE: import concurrent.futures num_threads = min(32, (len(input_csv_files) + len(input_mcf_files)) or 1) @@ -671,11 +676,17 @@ def _run_all_data_imports(self): self._run_single_mcf_import(input_mcf_file) def _run_single_import(self, input_file: File): - logging.info("Importing file: %s", input_file) + with self._counter_lock: + self._completed_files_count += 1 + current_count = self._completed_files_count + logging.info("[%d/%d] Importing file: %s", current_count, self._total_files_count, input_file) self._create_importer(input_file).do_import() def _run_single_mcf_import(self, input_mcf_file: File): - logging.info("Importing MCF file: %s", input_mcf_file) + with self._counter_lock: + self._completed_files_count += 1 + current_count = self._completed_files_count + logging.info("[%d/%d] Importing MCF file: %s", current_count, self._total_files_count, input_mcf_file) self._create_mcf_importer(input_mcf_file, self.output_dir, self.mode == RunMode.MAIN_DC).do_import() From 3bae59d77d72d820aa838df343cf59518493fb40 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 12:12:35 -0400 Subject: [PATCH 10/24] Adds gardbage collection to help performance, and prevent ooms --- simple/stats/db.py | 70 +++++++++++++++++++++++++++------------------- 1 file changed, 41 insertions(+), 29 deletions(-) diff --git a/simple/stats/db.py b/simple/stats/db.py index 2c7e3fd35..8c861b29c 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -510,6 +510,7 @@ def commit_and_close(self): import multiprocessing import tempfile import os + import gc import concurrent.futures from util.filesystem import create_store @@ -518,14 +519,10 @@ def commit_and_close(self): with tempfile.TemporaryDirectory() as temp_local_dir: logging.info("Using local temporary directory for export buffering: %s", temp_local_dir) - obs_args = [] - node_args = [] - - # Prepare observations chunks - if self._obs_dfs: - logging.info("Combining and preparing buffered observations to JSON-LD shards") - combined_obs_df = pd.concat(self._obs_dfs, ignore_index=True) - records = combined_obs_df.to_records(index=False).tolist() + # 1. Observations streaming generator + def _obs_chunk_generator(): + chunk_size = 10000 + current_chunk = [] prov_urls = {} for prov in self.nodes.provenances.values(): @@ -533,33 +530,48 @@ def commit_and_close(self): prov_urls[prov_id] = prov.url prov_urls[prov.id] = prov.url - chunk_size = 10000 - - for i in range(0, len(records), chunk_size): - chunk = records[i:i + chunk_size] - obs_args.append((chunk, self.obs_shard_index, temp_local_dir, self.ns_map, prov_urls)) + # Pop DataFrames one-by-one to release their memory immediately + while self._obs_dfs: + df = self._obs_dfs.pop(0) + records = df.to_records(index=False).tolist() + + # Explicitly delete DataFrame reference and collect memory + del df + gc.collect() + + for record in records: + current_chunk.append(record) + if len(current_chunk) == chunk_size: + yield (current_chunk, self.obs_shard_index, temp_local_dir, self.ns_map, prov_urls) + self.obs_shard_index += 1 + current_chunk = [] + + if current_chunk: + yield (current_chunk, self.obs_shard_index, temp_local_dir, self.ns_map, prov_urls) self.obs_shard_index += 1 - # Prepare triples chunks - if self._triples: - logging.info("Preparing buffered %d triples to JSON-LD shards", - len(self._triples)) + # 2. Triples streaming generator + def _node_chunk_generator(): chunk_size = 10000 - - for i in range(0, len(self._triples), chunk_size): - chunk = self._triples[i:i + chunk_size] - node_args.append((chunk, self.node_shard_index, temp_local_dir, self.ns_map)) + while self._triples: + chunk = self._triples[:chunk_size] + del self._triples[:chunk_size] + gc.collect() + yield (chunk, self.node_shard_index, temp_local_dir, self.ns_map) self.node_shard_index += 1 - # Execute exports in parallel locally (very fast, no GCS latency) - if obs_args or node_args: - logging.info("Starting JSON-LD local export with %d processes (observations: %d chunks, nodes: %d chunks)", - num_processes, len(obs_args), len(node_args)) + # Execute exports in parallel streaming format (flat memory usage) + if self._obs_dfs or self._triples: + logging.info("Starting JSON-LD local export with %d processes in streaming mode", num_processes) with multiprocessing.Pool(processes=num_processes) as pool: - if obs_args: - pool.map(_write_observation_shard, obs_args) - if node_args: - pool.map(_write_node_shard, node_args) + if self._obs_dfs: + logging.info("Streaming observations export...") + for _ in pool.imap(_write_observation_shard, _obs_chunk_generator()): + pass + if self._triples: + logging.info("Streaming triples export...") + for _ in pool.imap(_write_node_shard, _node_chunk_generator()): + pass # Bulk upload all generated JSON-LD shards from local temp dir to self.jsonld_dir files_to_upload = sorted(os.listdir(temp_local_dir)) From 8044e945da256c4fe6840e41e1a7a2293b209877 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 12:13:32 -0400 Subject: [PATCH 11/24] Bypass rdflib for speed. --- simple/stats/db.py | 78 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/simple/stats/db.py b/simple/stats/db.py index 8c861b29c..e081d9a17 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -431,6 +431,84 @@ def _uri_ref(val): def _write_node_shard(args): + import os + fast_export = os.getenv("FAST_NODE_EXPORT", "true").lower() in ("true", "1", "yes") + if fast_export: + _write_node_shard_fast(args) + else: + _write_node_shard_rdflib(args) + + +def _write_node_shard_fast(args): + chunk, shard_index, jsonld_dir_path, ns_map = args + from util.filesystem import create_store + import json + import logging + + def _uri_ref(val): + if not val: + return None + if val.startswith("http://") or val.startswith("https://"): + return {"@id": val} + if val.startswith("dcid:"): + return {"@id": val} + return {"@id": f"dcid:{val.lstrip('/')}"} + + subjects = {} + for row in chunk: + sub_id = row.subject_id + if sub_id not in subjects: + subjects[sub_id] = { + "@id": f"dcid:{sub_id.lstrip('/')}" if not sub_id.startswith("http") and not sub_id.startswith("dcid:") else sub_id + } + + pred = row.predicate + pred_key = f"dcid:{pred}" if not pred.startswith("dcid:") and not pred.startswith("http") else pred + + if pred == "typeOf": + pred_key = "@type" + + if row.object_id: + val = _uri_ref(row.object_id) + else: + obj_val = row.object_value + try: + if '.' in str(obj_val): + val = float(obj_val) + else: + val = int(obj_val) + except ValueError: + val = str(obj_val) + + if pred_key == "@type": + val_str = val["@id"] if isinstance(val, dict) and "@id" in val else str(val) + subjects[sub_id]["@type"] = val_str + else: + if pred_key in subjects[sub_id]: + existing = subjects[sub_id][pred_key] + if isinstance(existing, list): + existing.append(val) + else: + subjects[sub_id][pred_key] = [existing, val] + else: + subjects[sub_id][pred_key] = val + + # Sort by @id to match rdflib output order + graph_list = sorted(list(subjects.values()), key=lambda x: x["@id"]) + + compacted_jsonld = { + "@context": ns_map, + "@graph": graph_list + } + + shard_name = f"node-{shard_index:05d}.jsonld" + with create_store(jsonld_dir_path) as store: + output_dir = store.as_dir() + output_dir.open_file(shard_name).write(json.dumps(compacted_jsonld, indent=4)) + logging.info(f"Saved JSON-LD shard to {shard_name} (fast path)") + + +def _write_node_shard_rdflib(args): chunk, shard_index, jsonld_dir_path, ns_map = args from rdflib import Graph, Namespace, RDF, Literal from stats.jsonld_exporter import DCID_URL, expand_id, write_shard From 15eff19c4ddfa06ae61c6444f4c8730b76e0e019 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 12:43:16 -0400 Subject: [PATCH 12/24] Remove gccollect --- simple/stats/db.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/simple/stats/db.py b/simple/stats/db.py index e081d9a17..e87af24b2 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -615,7 +615,6 @@ def _obs_chunk_generator(): # Explicitly delete DataFrame reference and collect memory del df - gc.collect() for record in records: current_chunk.append(record) @@ -634,7 +633,6 @@ def _node_chunk_generator(): while self._triples: chunk = self._triples[:chunk_size] del self._triples[:chunk_size] - gc.collect() yield (chunk, self.node_shard_index, temp_local_dir, self.ns_map) self.node_shard_index += 1 From a6eb2a541087cc4c3a7aeb8f6e31e620448315b5 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 12:50:32 -0400 Subject: [PATCH 13/24] Use native GCS --- simple/stats/db.py | 43 ++++++++++++++++++++++++++++++++----------- simple/stats/main.py | 4 ++++ 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/simple/stats/db.py b/simple/stats/db.py index e87af24b2..e1921ff23 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -652,19 +652,40 @@ def _node_chunk_generator(): # Bulk upload all generated JSON-LD shards from local temp dir to self.jsonld_dir files_to_upload = sorted(os.listdir(temp_local_dir)) if files_to_upload: + target_path = self.jsonld_dir.full_path() logging.info("Bulk uploading %d JSON-LD shards to target directory %s in parallel", - len(files_to_upload), self.jsonld_dir.full_path()) + len(files_to_upload), target_path) - local_store = create_store(temp_local_dir).as_dir() - target_store = self.jsonld_dir - - def _upload_fn(filename): - content = local_store.open_file(filename).read() - target_store.open_file(filename).write(content) - - # Use 32 threads for high concurrency GCS uploads - with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor: - list(executor.map(_upload_fn, files_to_upload)) + if target_path.startswith("gs://"): + # Native GCS upload for high-speed multi-threaded execution + from google.cloud import storage + + parts = target_path[5:].split("/", 1) + bucket_name = parts[0] + blob_prefix = parts[1].rstrip("/") if len(parts) > 1 else "" + + client = storage.Client() + bucket = client.bucket(bucket_name) + + def _upload_gcs(filename): + local_file_path = os.path.join(temp_local_dir, filename) + blob_key = f"{blob_prefix}/{filename}" if blob_prefix else filename + blob = bucket.blob(blob_key) + blob.upload_from_filename(local_file_path) + + with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor: + list(executor.map(_upload_gcs, files_to_upload)) + else: + # Local fallback (e.g. unit tests) + local_store = create_store(temp_local_dir).as_dir() + target_store = self.jsonld_dir + + def _upload_local(filename): + content = local_store.open_file(filename).read() + target_store.open_file(filename).write(content) + + with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor: + list(executor.map(_upload_local, files_to_upload)) logging.info("Bulk upload of JSON-LD shards completed successfully.") diff --git a/simple/stats/main.py b/simple/stats/main.py index 1eef195c4..9c3773ba7 100644 --- a/simple/stats/main.py +++ b/simple/stats/main.py @@ -55,6 +55,10 @@ def _run(): + # Configure requests adapter default pool size to support parallel GCS uploads + import requests.adapters + requests.adapters.DEFAULT_POOLSIZE = 32 + initialize_logger() logging.info("Starting stats data importer job in mode: %s", FLAGS.mode) From 60133453bb61ca35c40c0b6432dc75373be3a802 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 13:30:00 -0400 Subject: [PATCH 14/24] More performance fix --- simple/stats/db.py | 32 ++++++++++---------------------- 1 file changed, 10 insertions(+), 22 deletions(-) diff --git a/simple/stats/db.py b/simple/stats/db.py index e1921ff23..789ce1860 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -568,13 +568,14 @@ def __init__(self, output_dir, import_names, nodes) -> None: self.ns_map = {"dcid": DCID_URL} import threading self.lock = threading.Lock() - self._obs_dfs = [] + self._obs_records = [] self._triples = [] def insert_observations(self, observations_df: pd.DataFrame, input_file: File): if not observations_df.empty: + records = observations_df.to_records(index=False).tolist() with self.lock: - self._obs_dfs.append(observations_df) + self._obs_records.extend(records) def insert_triples(self, triples: list[Triple]): if triples: @@ -600,7 +601,6 @@ def commit_and_close(self): # 1. Observations streaming generator def _obs_chunk_generator(): chunk_size = 10000 - current_chunk = [] prov_urls = {} for prov in self.nodes.provenances.values(): @@ -608,23 +608,11 @@ def _obs_chunk_generator(): prov_urls[prov_id] = prov.url prov_urls[prov.id] = prov.url - # Pop DataFrames one-by-one to release their memory immediately - while self._obs_dfs: - df = self._obs_dfs.pop(0) - records = df.to_records(index=False).tolist() - - # Explicitly delete DataFrame reference and collect memory - del df - - for record in records: - current_chunk.append(record) - if len(current_chunk) == chunk_size: - yield (current_chunk, self.obs_shard_index, temp_local_dir, self.ns_map, prov_urls) - self.obs_shard_index += 1 - current_chunk = [] - - if current_chunk: - yield (current_chunk, self.obs_shard_index, temp_local_dir, self.ns_map, prov_urls) + while self._obs_records: + chunk = self._obs_records[:chunk_size] + del self._obs_records[:chunk_size] + gc.collect() # Release memory chunk + yield (chunk, self.obs_shard_index, temp_local_dir, self.ns_map, prov_urls) self.obs_shard_index += 1 # 2. Triples streaming generator @@ -637,10 +625,10 @@ def _node_chunk_generator(): self.node_shard_index += 1 # Execute exports in parallel streaming format (flat memory usage) - if self._obs_dfs or self._triples: + if self._obs_records or self._triples: logging.info("Starting JSON-LD local export with %d processes in streaming mode", num_processes) with multiprocessing.Pool(processes=num_processes) as pool: - if self._obs_dfs: + if self._obs_records: logging.info("Streaming observations export...") for _ in pool.imap(_write_observation_shard, _obs_chunk_generator()): pass From 85a305e83f3feb4b008bdb616b7a3765f83fa422 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 15:49:46 -0400 Subject: [PATCH 15/24] Minor cleanup --- simple/stats/db.py | 170 ++++++++++++++++++++++++--------------------- 1 file changed, 92 insertions(+), 78 deletions(-) diff --git a/simple/stats/db.py b/simple/stats/db.py index 789ce1860..bf6c2f6b5 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -12,10 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures from dataclasses import dataclass from datetime import datetime +from datetime import timezone from enum import auto from enum import Enum +import gc +import multiprocessing +import tempfile +import threading import json import logging import os @@ -536,11 +542,16 @@ def _write_node_shard_rdflib(args): write_shard(g, shard_index, output_dir, ns_map, prefix="node") +# Constants for JsonLdStreamDb configurations +_CHUNK_SIZE = 10000 +_UPLOAD_CONCURRENCY = 32 +_EXPORT_PROCESSES_MAX = 8 + + class JsonLdStreamDb(Db): """A DB implementation that streams triples and observations directly to JSON-LD shards on GCS/Disk, bypassing SQLite.""" def __init__(self, output_dir, import_names, nodes) -> None: - from datetime import timezone from stats.jsonld_exporter import DCID_URL self.output_dir = output_dir self.import_names = import_names @@ -566,7 +577,6 @@ def __init__(self, output_dir, import_names, nodes) -> None: self.obs_shard_index = 0 self.node_shard_index = 0 self.ns_map = {"dcid": DCID_URL} - import threading self.lock = threading.Lock() self._obs_records = [] self._triples = [] @@ -586,96 +596,100 @@ def commit(self): pass def commit_and_close(self): - import multiprocessing - import tempfile - import os - import gc - import concurrent.futures - from util.filesystem import create_store - - num_processes = min(multiprocessing.cpu_count(), 8) + num_processes = min(multiprocessing.cpu_count(), _EXPORT_PROCESSES_MAX) with tempfile.TemporaryDirectory() as temp_local_dir: logging.info("Using local temporary directory for export buffering: %s", temp_local_dir) - # 1. Observations streaming generator - def _obs_chunk_generator(): - chunk_size = 10000 - - prov_urls = {} - for prov in self.nodes.provenances.values(): - prov_id = strip_namespace(prov.id) - prov_urls[prov_id] = prov.url - prov_urls[prov.id] = prov.url - - while self._obs_records: - chunk = self._obs_records[:chunk_size] - del self._obs_records[:chunk_size] - gc.collect() # Release memory chunk - yield (chunk, self.obs_shard_index, temp_local_dir, self.ns_map, prov_urls) - self.obs_shard_index += 1 - - # 2. Triples streaming generator - def _node_chunk_generator(): - chunk_size = 10000 - while self._triples: - chunk = self._triples[:chunk_size] - del self._triples[:chunk_size] - yield (chunk, self.node_shard_index, temp_local_dir, self.ns_map) - self.node_shard_index += 1 - - # Execute exports in parallel streaming format (flat memory usage) if self._obs_records or self._triples: logging.info("Starting JSON-LD local export with %d processes in streaming mode", num_processes) with multiprocessing.Pool(processes=num_processes) as pool: if self._obs_records: logging.info("Streaming observations export...") - for _ in pool.imap(_write_observation_shard, _obs_chunk_generator()): + obs_gen = self._generate_observation_chunks(temp_local_dir) + for _ in pool.imap(_write_observation_shard, obs_gen): pass + if self._triples: logging.info("Streaming triples export...") - for _ in pool.imap(_write_node_shard, _node_chunk_generator()): + node_gen = self._generate_node_chunks(temp_local_dir) + for _ in pool.imap(_write_node_shard, node_gen): pass - # Bulk upload all generated JSON-LD shards from local temp dir to self.jsonld_dir - files_to_upload = sorted(os.listdir(temp_local_dir)) - if files_to_upload: - target_path = self.jsonld_dir.full_path() - logging.info("Bulk uploading %d JSON-LD shards to target directory %s in parallel", - len(files_to_upload), target_path) - - if target_path.startswith("gs://"): - # Native GCS upload for high-speed multi-threaded execution - from google.cloud import storage - - parts = target_path[5:].split("/", 1) - bucket_name = parts[0] - blob_prefix = parts[1].rstrip("/") if len(parts) > 1 else "" - - client = storage.Client() - bucket = client.bucket(bucket_name) - - def _upload_gcs(filename): - local_file_path = os.path.join(temp_local_dir, filename) - blob_key = f"{blob_prefix}/{filename}" if blob_prefix else filename - blob = bucket.blob(blob_key) - blob.upload_from_filename(local_file_path) - - with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor: - list(executor.map(_upload_gcs, files_to_upload)) - else: - # Local fallback (e.g. unit tests) - local_store = create_store(temp_local_dir).as_dir() - target_store = self.jsonld_dir - - def _upload_local(filename): - content = local_store.open_file(filename).read() - target_store.open_file(filename).write(content) - - with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor: - list(executor.map(_upload_local, files_to_upload)) - - logging.info("Bulk upload of JSON-LD shards completed successfully.") + self._upload_shards(temp_local_dir) + + def _generate_observation_chunks(self, temp_local_dir: str): + """Generates observation chunks of size _CHUNK_SIZE, cleaning memory dynamically.""" + prov_urls = {} + for prov in self.nodes.provenances.values(): + prov_id = strip_namespace(prov.id) + prov_urls[prov_id] = prov.url + prov_urls[prov.id] = prov.url + + while self._obs_records: + chunk = self._obs_records[:_CHUNK_SIZE] + del self._obs_records[:_CHUNK_SIZE] + gc.collect() # Release memory chunk + yield (chunk, self.obs_shard_index, temp_local_dir, self.ns_map, prov_urls) + self.obs_shard_index += 1 + + def _generate_node_chunks(self, temp_local_dir: str): + """Generates node chunks of size _CHUNK_SIZE.""" + while self._triples: + chunk = self._triples[:_CHUNK_SIZE] + del self._triples[:_CHUNK_SIZE] + yield (chunk, self.node_shard_index, temp_local_dir, self.ns_map) + self.node_shard_index += 1 + + def _upload_shards(self, temp_local_dir: str): + """Uploads files in temp_local_dir to jsonld_dir, optimizing for GCS via native SDK.""" + files_to_upload = sorted(os.listdir(temp_local_dir)) + if not files_to_upload: + return + + target_path = self.jsonld_dir.full_path() + logging.info("Bulk uploading %d JSON-LD shards to target directory %s in parallel", + len(files_to_upload), target_path) + + if target_path.startswith("gs://"): + self._upload_shards_gcs(temp_local_dir, files_to_upload, target_path) + else: + self._upload_shards_local(temp_local_dir, files_to_upload) + + logging.info("Bulk upload of JSON-LD shards completed successfully.") + + def _upload_shards_gcs(self, temp_local_dir: str, files: list[str], target_path: str): + """Performs concurrent GCS uploads using native google-cloud-storage client.""" + from google.cloud import storage + + # Parse bucket and blob prefix + parts = target_path[5:].split("/", 1) + bucket_name = parts[0] + blob_prefix = parts[1].rstrip("/") if len(parts) > 1 else "" + + client = storage.Client() + bucket = client.bucket(bucket_name) + + def _upload_single(filename: str): + local_file_path = os.path.join(temp_local_dir, filename) + blob_key = f"{blob_prefix}/{filename}" if blob_prefix else filename + blob = bucket.blob(blob_key) + blob.upload_from_filename(local_file_path) + + with concurrent.futures.ThreadPoolExecutor(max_workers=_UPLOAD_CONCURRENCY) as executor: + list(executor.map(_upload_single, files)) + + def _upload_shards_local(self, temp_local_dir: str, files: list[str]): + """Performs concurrent local file copy (for test environments).""" + local_store = create_store(temp_local_dir).as_dir() + target_store = self.jsonld_dir + + def _copy_single(filename: str): + content = local_store.open_file(filename).read() + target_store.open_file(filename).write(content) + + with concurrent.futures.ThreadPoolExecutor(max_workers=_UPLOAD_CONCURRENCY) as executor: + list(executor.map(_copy_single, files)) class SqlDb(Db): From e420a5a4c44a4a751bdf12725f80fd8ca15086a7 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 15:50:00 -0400 Subject: [PATCH 16/24] Lint --- simple/stats/db.py | 102 ++++++++++++++++++++++++----------------- simple/stats/nodes.py | 4 +- simple/stats/runner.py | 23 ++++++---- 3 files changed, 78 insertions(+), 51 deletions(-) diff --git a/simple/stats/db.py b/simple/stats/db.py index bf6c2f6b5..1887fc255 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -19,13 +19,13 @@ from enum import auto from enum import Enum import gc -import multiprocessing -import tempfile -import threading import json import logging +import multiprocessing import os import sqlite3 +import tempfile +import threading from typing import Any from google.cloud.sql.connector.connector import Connector @@ -45,8 +45,8 @@ from stats.data import McfNode from stats.data import STAT_VAR_GROUP from stats.data import STATISTICAL_VARIABLE -from stats.data import Triple from stats.data import strip_namespace +from stats.data import Triple from util.filesystem import create_store from util.filesystem import Dir from util.filesystem import File @@ -344,11 +344,12 @@ def _add_triple(self, triple: Triple): def _write_observation_shard(args): chunk, shard_index, jsonld_dir_path, ns_map, prov_urls = args - from util.filesystem import create_store import hashlib import json import logging + from util.filesystem import create_store + def _uri_ref(val): if not val: return None @@ -359,20 +360,20 @@ def _uri_ref(val): return {"@id": f"dcid:{val.lstrip('/')}"} graph_list = [] - + for row in chunk: entity, variable, date, value, provenance, unit, scaling_factor, mmethod, period, props = row - + key = f"{entity}_{variable}_{date}_{provenance}_{unit}_{mmethod}_{period}" obs_hash = hashlib.sha256(key.encode('utf-8')).hexdigest() - + obs_obj = { - "@id": f"dcid:obs_{obs_hash}", - "@type": "dcid:StatVarObservation", - "dcid:observationAbout": _uri_ref(entity), - "dcid:variableMeasured": _uri_ref(variable) + "@id": f"dcid:obs_{obs_hash}", + "@type": "dcid:StatVarObservation", + "dcid:observationAbout": _uri_ref(entity), + "dcid:variableMeasured": _uri_ref(variable) } - + # Format date try: if str(date).isdigit(): @@ -417,28 +418,28 @@ def _uri_ref(val): try: props_dict = json.loads(props) for k, v in props_dict.items(): - prop_key = f"dcid:{k}" if not k.startswith("dcid:") and not k.startswith("http") else k + prop_key = f"dcid:{k}" if not k.startswith( + "dcid:") and not k.startswith("http") else k obs_obj[prop_key] = v except Exception: pass graph_list.append(obs_obj) - compacted_jsonld = { - "@context": ns_map, - "@graph": graph_list - } + compacted_jsonld = {"@context": ns_map, "@graph": graph_list} shard_name = f"observation-{shard_index:05d}.jsonld" with create_store(jsonld_dir_path) as store: output_dir = store.as_dir() - output_dir.open_file(shard_name).write(json.dumps(compacted_jsonld, indent=4)) + output_dir.open_file(shard_name).write( + json.dumps(compacted_jsonld, indent=4)) logging.info(f"Saved JSON-LD shard to {shard_name}") def _write_node_shard(args): import os - fast_export = os.getenv("FAST_NODE_EXPORT", "true").lower() in ("true", "1", "yes") + fast_export = os.getenv("FAST_NODE_EXPORT", + "true").lower() in ("true", "1", "yes") if fast_export: _write_node_shard_fast(args) else: @@ -447,10 +448,11 @@ def _write_node_shard(args): def _write_node_shard_fast(args): chunk, shard_index, jsonld_dir_path, ns_map = args - from util.filesystem import create_store import json import logging + from util.filesystem import create_store + def _uri_ref(val): if not val: return None @@ -465,12 +467,15 @@ def _uri_ref(val): sub_id = row.subject_id if sub_id not in subjects: subjects[sub_id] = { - "@id": f"dcid:{sub_id.lstrip('/')}" if not sub_id.startswith("http") and not sub_id.startswith("dcid:") else sub_id + "@id": + f"dcid:{sub_id.lstrip('/')}" if not sub_id.startswith("http") and + not sub_id.startswith("dcid:") else sub_id } - + pred = row.predicate - pred_key = f"dcid:{pred}" if not pred.startswith("dcid:") and not pred.startswith("http") else pred - + pred_key = f"dcid:{pred}" if not pred.startswith( + "dcid:") and not pred.startswith("http") else pred + if pred == "typeOf": pred_key = "@type" @@ -487,7 +492,8 @@ def _uri_ref(val): val = str(obj_val) if pred_key == "@type": - val_str = val["@id"] if isinstance(val, dict) and "@id" in val else str(val) + val_str = val["@id"] if isinstance(val, + dict) and "@id" in val else str(val) subjects[sub_id]["@type"] = val_str else: if pred_key in subjects[sub_id]: @@ -502,22 +508,25 @@ def _uri_ref(val): # Sort by @id to match rdflib output order graph_list = sorted(list(subjects.values()), key=lambda x: x["@id"]) - compacted_jsonld = { - "@context": ns_map, - "@graph": graph_list - } + compacted_jsonld = {"@context": ns_map, "@graph": graph_list} shard_name = f"node-{shard_index:05d}.jsonld" with create_store(jsonld_dir_path) as store: output_dir = store.as_dir() - output_dir.open_file(shard_name).write(json.dumps(compacted_jsonld, indent=4)) + output_dir.open_file(shard_name).write( + json.dumps(compacted_jsonld, indent=4)) logging.info(f"Saved JSON-LD shard to {shard_name} (fast path)") def _write_node_shard_rdflib(args): chunk, shard_index, jsonld_dir_path, ns_map = args - from rdflib import Graph, Namespace, RDF, Literal - from stats.jsonld_exporter import DCID_URL, expand_id, write_shard + from rdflib import Graph + from rdflib import Literal + from rdflib import Namespace + from rdflib import RDF + from stats.jsonld_exporter import DCID_URL + from stats.jsonld_exporter import expand_id + from stats.jsonld_exporter import write_shard from util.filesystem import create_store DCID = Namespace(DCID_URL) @@ -581,7 +590,8 @@ def __init__(self, output_dir, import_names, nodes) -> None: self._obs_records = [] self._triples = [] - def insert_observations(self, observations_df: pd.DataFrame, input_file: File): + def insert_observations(self, observations_df: pd.DataFrame, + input_file: File): if not observations_df.empty: records = observations_df.to_records(index=False).tolist() with self.lock: @@ -599,10 +609,13 @@ def commit_and_close(self): num_processes = min(multiprocessing.cpu_count(), _EXPORT_PROCESSES_MAX) with tempfile.TemporaryDirectory() as temp_local_dir: - logging.info("Using local temporary directory for export buffering: %s", temp_local_dir) + logging.info("Using local temporary directory for export buffering: %s", + temp_local_dir) if self._obs_records or self._triples: - logging.info("Starting JSON-LD local export with %d processes in streaming mode", num_processes) + logging.info( + "Starting JSON-LD local export with %d processes in streaming mode", + num_processes) with multiprocessing.Pool(processes=num_processes) as pool: if self._obs_records: logging.info("Streaming observations export...") @@ -630,7 +643,8 @@ def _generate_observation_chunks(self, temp_local_dir: str): chunk = self._obs_records[:_CHUNK_SIZE] del self._obs_records[:_CHUNK_SIZE] gc.collect() # Release memory chunk - yield (chunk, self.obs_shard_index, temp_local_dir, self.ns_map, prov_urls) + yield (chunk, self.obs_shard_index, temp_local_dir, self.ns_map, + prov_urls) self.obs_shard_index += 1 def _generate_node_chunks(self, temp_local_dir: str): @@ -648,8 +662,9 @@ def _upload_shards(self, temp_local_dir: str): return target_path = self.jsonld_dir.full_path() - logging.info("Bulk uploading %d JSON-LD shards to target directory %s in parallel", - len(files_to_upload), target_path) + logging.info( + "Bulk uploading %d JSON-LD shards to target directory %s in parallel", + len(files_to_upload), target_path) if target_path.startswith("gs://"): self._upload_shards_gcs(temp_local_dir, files_to_upload, target_path) @@ -658,7 +673,8 @@ def _upload_shards(self, temp_local_dir: str): logging.info("Bulk upload of JSON-LD shards completed successfully.") - def _upload_shards_gcs(self, temp_local_dir: str, files: list[str], target_path: str): + def _upload_shards_gcs(self, temp_local_dir: str, files: list[str], + target_path: str): """Performs concurrent GCS uploads using native google-cloud-storage client.""" from google.cloud import storage @@ -676,7 +692,8 @@ def _upload_single(filename: str): blob = bucket.blob(blob_key) blob.upload_from_filename(local_file_path) - with concurrent.futures.ThreadPoolExecutor(max_workers=_UPLOAD_CONCURRENCY) as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=_UPLOAD_CONCURRENCY) as executor: list(executor.map(_upload_single, files)) def _upload_shards_local(self, temp_local_dir: str, files: list[str]): @@ -688,7 +705,8 @@ def _copy_single(filename: str): content = local_store.open_file(filename).read() target_store.open_file(filename).write(content) - with concurrent.futures.ThreadPoolExecutor(max_workers=_UPLOAD_CONCURRENCY) as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=_UPLOAD_CONCURRENCY) as executor: list(executor.map(_copy_single, files)) diff --git a/simple/stats/nodes.py b/simple/stats/nodes.py index fd636bd8b..fb5015b8c 100644 --- a/simple/stats/nodes.py +++ b/simple/stats/nodes.py @@ -12,8 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging from functools import wraps +import logging import re import pandas as pd @@ -55,10 +55,12 @@ def thread_safe(func): """Decorator to make a method thread-safe using the object's reentrant lock.""" + @wraps(func) def wrapper(self, *args, **kwargs): with self.lock: return func(self, *args, **kwargs) + return wrapper diff --git a/simple/stats/runner.py b/simple/stats/runner.py index 3f6cc0735..a5610d6e9 100644 --- a/simple/stats/runner.py +++ b/simple/stats/runner.py @@ -40,8 +40,8 @@ from stats.db import get_datacommons_platform_config_from_env from stats.db import get_sqlite_path_from_env from stats.db import ImportStatus -from stats.db import TYPE_CLOUD_SQL from stats.db import JsonLdStreamDb +from stats.db import TYPE_CLOUD_SQL from stats.db_cache import get_db_cache_from_env from stats.db_transfer import transfer_sqlite_to_cloud_sql from stats.entities_importer import EntitiesImporter @@ -657,14 +657,18 @@ def _run_all_data_imports(self): if self.mode == RunMode.DCP_BRIDGE: import concurrent.futures num_threads = min(32, (len(input_csv_files) + len(input_mcf_files)) or 1) - logging.info("Starting parallel ingestion of data files with %d threads", num_threads) + logging.info("Starting parallel ingestion of data files with %d threads", + num_threads) - with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor: + with concurrent.futures.ThreadPoolExecutor( + max_workers=num_threads) as executor: futures = [] for input_csv_file in input_csv_files: - futures.append(executor.submit(self._run_single_import, input_csv_file)) + futures.append( + executor.submit(self._run_single_import, input_csv_file)) for input_mcf_file in input_mcf_files: - futures.append(executor.submit(self._run_single_mcf_import, input_mcf_file)) + futures.append( + executor.submit(self._run_single_mcf_import, input_mcf_file)) # Wait for all files to be processed and propagate any exception for future in concurrent.futures.as_completed(futures): @@ -679,14 +683,16 @@ def _run_single_import(self, input_file: File): with self._counter_lock: self._completed_files_count += 1 current_count = self._completed_files_count - logging.info("[%d/%d] Importing file: %s", current_count, self._total_files_count, input_file) + logging.info("[%d/%d] Importing file: %s", current_count, + self._total_files_count, input_file) self._create_importer(input_file).do_import() def _run_single_mcf_import(self, input_mcf_file: File): with self._counter_lock: self._completed_files_count += 1 current_count = self._completed_files_count - logging.info("[%d/%d] Importing MCF file: %s", current_count, self._total_files_count, input_mcf_file) + logging.info("[%d/%d] Importing MCF file: %s", current_count, + self._total_files_count, input_mcf_file) self._create_mcf_importer(input_mcf_file, self.output_dir, self.mode == RunMode.MAIN_DC).do_import() @@ -748,7 +754,8 @@ def _create_importer(self, input_file: File) -> Importer: f"Unsupported import type: {import_type} ({input_file.full_path()})") def _run_imports_and_export_jsonld(self): - logging.info("Initializing JsonLdStreamDb to stream JSON-LD directly to GCS/Disk") + logging.info( + "Initializing JsonLdStreamDb to stream JSON-LD directly to GCS/Disk") self.db = JsonLdStreamDb(self.output_dir, self.import_names, self.nodes) # Run data imports (CSV and MCF) From 40ba5a401781a301dc4fe4a1e88916dc611a6d8c Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 16:24:01 -0400 Subject: [PATCH 17/24] Major cleanup on the code --- simple/stats/config.py | 2 +- simple/stats/db.py | 368 ------------------ simple/stats/jsonld_stream_db.py | 389 ++++++++++++++++++++ simple/stats/main.py | 2 +- simple/stats/nodes.py | 2 +- simple/stats/reporter.py | 9 +- simple/stats/runner.py | 100 ++--- simple/tests/stats/jsonld_stream_db_test.py | 297 +++++++++++++++ 8 files changed, 745 insertions(+), 424 deletions(-) create mode 100644 simple/stats/jsonld_stream_db.py create mode 100644 simple/tests/stats/jsonld_stream_db_test.py diff --git a/simple/stats/config.py b/simple/stats/config.py index 36baa5d0b..a4b3b4423 100644 --- a/simple/stats/config.py +++ b/simple/stats/config.py @@ -242,7 +242,7 @@ def custom_svg_prefix(self) -> str: - If explicitly set via 'customSvgPrefix', return it. - Else if 'customIdNamespace' is explicitly provided in config, derive as f"{namespace}/g/" where namespace is the validated value of - customIdNamespace. + customIdNamespace. - Else fall back to the built-in default (e.g., 'c/g/'). """ from stats import schema_constants as sc diff --git a/simple/stats/db.py b/simple/stats/db.py index 1887fc255..0ee00f793 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -342,374 +342,6 @@ def _add_triple(self, triple: Triple): node.add_triple(triple) -def _write_observation_shard(args): - chunk, shard_index, jsonld_dir_path, ns_map, prov_urls = args - import hashlib - import json - import logging - - from util.filesystem import create_store - - def _uri_ref(val): - if not val: - return None - if val.startswith("http://") or val.startswith("https://"): - return {"@id": val} - if val.startswith("dcid:"): - return {"@id": val} - return {"@id": f"dcid:{val.lstrip('/')}"} - - graph_list = [] - - for row in chunk: - entity, variable, date, value, provenance, unit, scaling_factor, mmethod, period, props = row - - key = f"{entity}_{variable}_{date}_{provenance}_{unit}_{mmethod}_{period}" - obs_hash = hashlib.sha256(key.encode('utf-8')).hexdigest() - - obs_obj = { - "@id": f"dcid:obs_{obs_hash}", - "@type": "dcid:StatVarObservation", - "dcid:observationAbout": _uri_ref(entity), - "dcid:variableMeasured": _uri_ref(variable) - } - - # Format date - try: - if str(date).isdigit(): - obs_obj["dcid:observationDate"] = int(date) - else: - try: - obs_obj["dcid:observationDate"] = float(date) - except ValueError: - obs_obj["dcid:observationDate"] = str(date) - except Exception: - obs_obj["dcid:observationDate"] = str(date) - - # Format value - try: - if '.' in str(value): - obs_obj["dcid:value"] = float(value) - else: - obs_obj["dcid:value"] = int(value) - except ValueError: - obs_obj["dcid:value"] = value - - if provenance: - obs_obj["dcid:provenance"] = _uri_ref(provenance) - if provenance in prov_urls and prov_urls[provenance]: - obs_obj["dcid:provenanceUrl"] = prov_urls[provenance] - if unit: - obs_obj["dcid:unit"] = _uri_ref(unit) - if scaling_factor: - try: - if '.' in str(scaling_factor): - obs_obj["dcid:scalingFactor"] = float(scaling_factor) - else: - obs_obj["dcid:scalingFactor"] = int(scaling_factor) - except ValueError: - obs_obj["dcid:scalingFactor"] = scaling_factor - if mmethod: - obs_obj["dcid:measurementMethod"] = _uri_ref(mmethod) - if period: - obs_obj["dcid:observationPeriod"] = period - - if props: - try: - props_dict = json.loads(props) - for k, v in props_dict.items(): - prop_key = f"dcid:{k}" if not k.startswith( - "dcid:") and not k.startswith("http") else k - obs_obj[prop_key] = v - except Exception: - pass - - graph_list.append(obs_obj) - - compacted_jsonld = {"@context": ns_map, "@graph": graph_list} - - shard_name = f"observation-{shard_index:05d}.jsonld" - with create_store(jsonld_dir_path) as store: - output_dir = store.as_dir() - output_dir.open_file(shard_name).write( - json.dumps(compacted_jsonld, indent=4)) - logging.info(f"Saved JSON-LD shard to {shard_name}") - - -def _write_node_shard(args): - import os - fast_export = os.getenv("FAST_NODE_EXPORT", - "true").lower() in ("true", "1", "yes") - if fast_export: - _write_node_shard_fast(args) - else: - _write_node_shard_rdflib(args) - - -def _write_node_shard_fast(args): - chunk, shard_index, jsonld_dir_path, ns_map = args - import json - import logging - - from util.filesystem import create_store - - def _uri_ref(val): - if not val: - return None - if val.startswith("http://") or val.startswith("https://"): - return {"@id": val} - if val.startswith("dcid:"): - return {"@id": val} - return {"@id": f"dcid:{val.lstrip('/')}"} - - subjects = {} - for row in chunk: - sub_id = row.subject_id - if sub_id not in subjects: - subjects[sub_id] = { - "@id": - f"dcid:{sub_id.lstrip('/')}" if not sub_id.startswith("http") and - not sub_id.startswith("dcid:") else sub_id - } - - pred = row.predicate - pred_key = f"dcid:{pred}" if not pred.startswith( - "dcid:") and not pred.startswith("http") else pred - - if pred == "typeOf": - pred_key = "@type" - - if row.object_id: - val = _uri_ref(row.object_id) - else: - obj_val = row.object_value - try: - if '.' in str(obj_val): - val = float(obj_val) - else: - val = int(obj_val) - except ValueError: - val = str(obj_val) - - if pred_key == "@type": - val_str = val["@id"] if isinstance(val, - dict) and "@id" in val else str(val) - subjects[sub_id]["@type"] = val_str - else: - if pred_key in subjects[sub_id]: - existing = subjects[sub_id][pred_key] - if isinstance(existing, list): - existing.append(val) - else: - subjects[sub_id][pred_key] = [existing, val] - else: - subjects[sub_id][pred_key] = val - - # Sort by @id to match rdflib output order - graph_list = sorted(list(subjects.values()), key=lambda x: x["@id"]) - - compacted_jsonld = {"@context": ns_map, "@graph": graph_list} - - shard_name = f"node-{shard_index:05d}.jsonld" - with create_store(jsonld_dir_path) as store: - output_dir = store.as_dir() - output_dir.open_file(shard_name).write( - json.dumps(compacted_jsonld, indent=4)) - logging.info(f"Saved JSON-LD shard to {shard_name} (fast path)") - - -def _write_node_shard_rdflib(args): - chunk, shard_index, jsonld_dir_path, ns_map = args - from rdflib import Graph - from rdflib import Literal - from rdflib import Namespace - from rdflib import RDF - from stats.jsonld_exporter import DCID_URL - from stats.jsonld_exporter import expand_id - from stats.jsonld_exporter import write_shard - from util.filesystem import create_store - - DCID = Namespace(DCID_URL) - g = Graph() - g.bind("dcid", DCID) - - for row in chunk: - sub = expand_id(row.subject_id) - p = expand_id(row.predicate) - if row.object_id: - o = expand_id(row.object_id) - else: - o = Literal(row.object_value) - - if row.predicate == 'typeOf': - g.add((sub, RDF.type, o)) - else: - g.add((sub, p, o)) - - with create_store(jsonld_dir_path) as store: - output_dir = store.as_dir() - write_shard(g, shard_index, output_dir, ns_map, prefix="node") - - -# Constants for JsonLdStreamDb configurations -_CHUNK_SIZE = 10000 -_UPLOAD_CONCURRENCY = 32 -_EXPORT_PROCESSES_MAX = 8 - - -class JsonLdStreamDb(Db): - """A DB implementation that streams triples and observations directly to JSON-LD shards on GCS/Disk, bypassing SQLite.""" - - def __init__(self, output_dir, import_names, nodes) -> None: - from stats.jsonld_exporter import DCID_URL - self.output_dir = output_dir - self.import_names = import_names - self.nodes = nodes - - # Generate unique folder name based on import name and timestamp - import_name = None - if isinstance(import_names, list): - if import_names == [constants.ALL_IMPORTS]: - import_name = constants.ALL_IMPORTS - else: - import_name = "_".join(import_names) - - self.import_name = import_name or nodes.config.data.get( - "importName") or "default_import_name" - if self.import_name and "/" in self.import_name: - self.import_name = self.import_name.replace("/", "_") - - timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f") - unique_dir_name = f"{self.import_name}_{timestamp}" - self.jsonld_dir = output_dir.open_dir("jsonld").open_dir(unique_dir_name) - - self.obs_shard_index = 0 - self.node_shard_index = 0 - self.ns_map = {"dcid": DCID_URL} - self.lock = threading.Lock() - self._obs_records = [] - self._triples = [] - - def insert_observations(self, observations_df: pd.DataFrame, - input_file: File): - if not observations_df.empty: - records = observations_df.to_records(index=False).tolist() - with self.lock: - self._obs_records.extend(records) - - def insert_triples(self, triples: list[Triple]): - if triples: - with self.lock: - self._triples.extend(triples) - - def commit(self): - pass - - def commit_and_close(self): - num_processes = min(multiprocessing.cpu_count(), _EXPORT_PROCESSES_MAX) - - with tempfile.TemporaryDirectory() as temp_local_dir: - logging.info("Using local temporary directory for export buffering: %s", - temp_local_dir) - - if self._obs_records or self._triples: - logging.info( - "Starting JSON-LD local export with %d processes in streaming mode", - num_processes) - with multiprocessing.Pool(processes=num_processes) as pool: - if self._obs_records: - logging.info("Streaming observations export...") - obs_gen = self._generate_observation_chunks(temp_local_dir) - for _ in pool.imap(_write_observation_shard, obs_gen): - pass - - if self._triples: - logging.info("Streaming triples export...") - node_gen = self._generate_node_chunks(temp_local_dir) - for _ in pool.imap(_write_node_shard, node_gen): - pass - - self._upload_shards(temp_local_dir) - - def _generate_observation_chunks(self, temp_local_dir: str): - """Generates observation chunks of size _CHUNK_SIZE, cleaning memory dynamically.""" - prov_urls = {} - for prov in self.nodes.provenances.values(): - prov_id = strip_namespace(prov.id) - prov_urls[prov_id] = prov.url - prov_urls[prov.id] = prov.url - - while self._obs_records: - chunk = self._obs_records[:_CHUNK_SIZE] - del self._obs_records[:_CHUNK_SIZE] - gc.collect() # Release memory chunk - yield (chunk, self.obs_shard_index, temp_local_dir, self.ns_map, - prov_urls) - self.obs_shard_index += 1 - - def _generate_node_chunks(self, temp_local_dir: str): - """Generates node chunks of size _CHUNK_SIZE.""" - while self._triples: - chunk = self._triples[:_CHUNK_SIZE] - del self._triples[:_CHUNK_SIZE] - yield (chunk, self.node_shard_index, temp_local_dir, self.ns_map) - self.node_shard_index += 1 - - def _upload_shards(self, temp_local_dir: str): - """Uploads files in temp_local_dir to jsonld_dir, optimizing for GCS via native SDK.""" - files_to_upload = sorted(os.listdir(temp_local_dir)) - if not files_to_upload: - return - - target_path = self.jsonld_dir.full_path() - logging.info( - "Bulk uploading %d JSON-LD shards to target directory %s in parallel", - len(files_to_upload), target_path) - - if target_path.startswith("gs://"): - self._upload_shards_gcs(temp_local_dir, files_to_upload, target_path) - else: - self._upload_shards_local(temp_local_dir, files_to_upload) - - logging.info("Bulk upload of JSON-LD shards completed successfully.") - - def _upload_shards_gcs(self, temp_local_dir: str, files: list[str], - target_path: str): - """Performs concurrent GCS uploads using native google-cloud-storage client.""" - from google.cloud import storage - - # Parse bucket and blob prefix - parts = target_path[5:].split("/", 1) - bucket_name = parts[0] - blob_prefix = parts[1].rstrip("/") if len(parts) > 1 else "" - - client = storage.Client() - bucket = client.bucket(bucket_name) - - def _upload_single(filename: str): - local_file_path = os.path.join(temp_local_dir, filename) - blob_key = f"{blob_prefix}/{filename}" if blob_prefix else filename - blob = bucket.blob(blob_key) - blob.upload_from_filename(local_file_path) - - with concurrent.futures.ThreadPoolExecutor( - max_workers=_UPLOAD_CONCURRENCY) as executor: - list(executor.map(_upload_single, files)) - - def _upload_shards_local(self, temp_local_dir: str, files: list[str]): - """Performs concurrent local file copy (for test environments).""" - local_store = create_store(temp_local_dir).as_dir() - target_store = self.jsonld_dir - - def _copy_single(filename: str): - content = local_store.open_file(filename).read() - target_store.open_file(filename).write(content) - - with concurrent.futures.ThreadPoolExecutor( - max_workers=_UPLOAD_CONCURRENCY) as executor: - list(executor.map(_copy_single, files)) - - class SqlDb(Db): """Class to insert triples and observations into a SQL DB.""" diff --git a/simple/stats/jsonld_stream_db.py b/simple/stats/jsonld_stream_db.py new file mode 100644 index 000000000..0870c7f91 --- /dev/null +++ b/simple/stats/jsonld_stream_db.py @@ -0,0 +1,389 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""A DB implementation that streams JSON-LD shards directly to GCS/Disk.""" + +import concurrent.futures +from datetime import datetime +from datetime import timezone +import gc +import hashlib +import json +import logging +import multiprocessing +import os +import requests +import tempfile +import threading + +from google.cloud import storage + +import pandas as pd +from rdflib import Graph +from rdflib import Literal +from rdflib import Namespace +from rdflib import RDF + +from stats import constants +from stats.data import Triple +from stats.data import strip_namespace +from stats.db import Db +from stats.jsonld_exporter import DCID_URL +from stats.jsonld_exporter import expand_id +from stats.jsonld_exporter import write_shard +from util.filesystem import create_store +from util.filesystem import Dir +from util.filesystem import File + +# Configuration Constants +_CHUNK_SIZE = 10000 +_UPLOAD_CONCURRENCY = 32 +_EXPORT_PROCESSES_MAX = 8 + + +def _uri_ref(val): + if not val: + return None + if val.startswith("http://") or val.startswith("https://"): + return {"@id": val} + if val.startswith("dcid:"): + return {"@id": val} + return {"@id": f"dcid:{val.lstrip('/')}"} + + +def _write_observation_shard(args): + chunk, shard_index, jsonld_dir_path, ns_map, prov_urls = args + graph_list = [] + + for row in chunk: + entity, variable, date, value, provenance, unit, scaling_factor, mmethod, period, props = row + + key = f"{entity}_{variable}_{date}_{provenance}_{unit}_{mmethod}_{period}" + obs_hash = hashlib.sha256(key.encode('utf-8')).hexdigest() + + obs_obj = { + "@id": f"dcid:obs_{obs_hash}", + "@type": "dcid:StatVarObservation", + "dcid:observationAbout": _uri_ref(entity), + "dcid:variableMeasured": _uri_ref(variable) + } + + # Format date + try: + if str(date).isdigit(): + obs_obj["dcid:observationDate"] = int(date) + else: + try: + obs_obj["dcid:observationDate"] = float(date) + except ValueError: + obs_obj["dcid:observationDate"] = str(date) + except Exception: + obs_obj["dcid:observationDate"] = str(date) + + # Format value + try: + if '.' in str(value): + obs_obj["dcid:value"] = float(value) + else: + obs_obj["dcid:value"] = int(value) + except ValueError: + obs_obj["dcid:value"] = value + + if provenance: + obs_obj["dcid:provenance"] = _uri_ref(provenance) + if provenance in prov_urls and prov_urls[provenance]: + obs_obj["dcid:provenanceUrl"] = prov_urls[provenance] + if unit: + obs_obj["dcid:unit"] = _uri_ref(unit) + if scaling_factor: + try: + if '.' in str(scaling_factor): + obs_obj["dcid:scalingFactor"] = float(scaling_factor) + else: + obs_obj["dcid:scalingFactor"] = int(scaling_factor) + except ValueError: + obs_obj["dcid:scalingFactor"] = scaling_factor + if mmethod: + obs_obj["dcid:measurementMethod"] = _uri_ref(mmethod) + if period: + obs_obj["dcid:observationPeriod"] = period + + if props: + try: + props_dict = json.loads(props) + for k, v in props_dict.items(): + prop_key = f"dcid:{k}" if not k.startswith( + "dcid:") and not k.startswith("http") else k + obs_obj[prop_key] = v + except Exception: + pass + + graph_list.append(obs_obj) + + compacted_jsonld = {"@context": ns_map, "@graph": graph_list} + + shard_name = f"observation-{shard_index:05d}.jsonld" + with create_store(jsonld_dir_path) as store: + output_dir = store.as_dir() + output_dir.open_file(shard_name).write( + json.dumps(compacted_jsonld, indent=4)) + logging.info(f"Saved JSON-LD shard to {shard_name}") + + +def _write_node_shard(args): + fast_export = os.getenv("FAST_NODE_EXPORT", + "true").lower() in ("true", "1", "yes") + if fast_export: + _write_node_shard_fast(args) + else: + _write_node_shard_rdflib(args) + + +def _write_node_shard_fast(args): + chunk, shard_index, jsonld_dir_path, ns_map = args + subjects = {} + + for row in chunk: + sub_id = row.subject_id + if sub_id not in subjects: + subjects[sub_id] = { + "@id": + f"dcid:{sub_id.lstrip('/')}" if not sub_id.startswith("http") and + not sub_id.startswith("dcid:") else sub_id + } + + pred = row.predicate + pred_key = f"dcid:{pred}" if not pred.startswith( + "dcid:") and not pred.startswith("http") else pred + + if pred == "typeOf": + pred_key = "@type" + + if row.object_id: + val = _uri_ref(row.object_id) + else: + obj_val = row.object_value + try: + if '.' in str(obj_val): + val = float(obj_val) + else: + val = int(obj_val) + except ValueError: + val = str(obj_val) + + if pred_key == "@type": + val_str = val["@id"] if isinstance(val, + dict) and "@id" in val else str(val) + subjects[sub_id]["@type"] = val_str + else: + if pred_key in subjects[sub_id]: + existing = subjects[sub_id][pred_key] + if isinstance(existing, list): + existing.append(val) + else: + subjects[sub_id][pred_key] = [existing, val] + else: + subjects[sub_id][pred_key] = val + + # Sort by @id to match rdflib output order + graph_list = sorted(list(subjects.values()), key=lambda x: x["@id"]) + compacted_jsonld = {"@context": ns_map, "@graph": graph_list} + + shard_name = f"node-{shard_index:05d}.jsonld" + with create_store(jsonld_dir_path) as store: + output_dir = store.as_dir() + output_dir.open_file(shard_name).write( + json.dumps(compacted_jsonld, indent=4)) + logging.info(f"Saved JSON-LD shard to {shard_name} (fast path)") + + +def _write_node_shard_rdflib(args): + chunk, shard_index, jsonld_dir_path, ns_map = args + DCID = Namespace(DCID_URL) + g = Graph() + g.bind("dcid", DCID) + + for row in chunk: + sub = expand_id(row.subject_id) + p = expand_id(row.predicate) + if row.object_id: + o = expand_id(row.object_id) + else: + o = Literal(row.object_value) + + if row.predicate == 'typeOf': + g.add((sub, RDF.type, o)) + else: + g.add((sub, p, o)) + + with create_store(jsonld_dir_path) as store: + output_dir = store.as_dir() + write_shard(g, shard_index, output_dir, ns_map, prefix="node") + + +class JsonLdStreamDb(Db): + """A DB implementation that streams triples and observations directly to JSON-LD shards on GCS/Disk, bypassing SQLite.""" + + def __init__(self, output_dir, import_names, nodes) -> None: + self.output_dir = output_dir + self.import_names = import_names + self.nodes = nodes + + # Generate unique folder name based on import name and timestamp + import_name = None + if isinstance(import_names, list): + if import_names == [constants.ALL_IMPORTS]: + import_name = constants.ALL_IMPORTS + else: + import_name = "_".join(import_names) + + self.import_name = import_name or nodes.config.data.get( + "importName") or "default_import_name" + if self.import_name and "/" in self.import_name: + self.import_name = self.import_name.replace("/", "_") + + timestamp = datetime.now(timezone.utc).strftime("%Y%m%d_%H%M%S_%f") + unique_dir_name = f"{self.import_name}_{timestamp}" + self.jsonld_dir = output_dir.open_dir("jsonld").open_dir(unique_dir_name) + + self.obs_shard_index = 0 + self.node_shard_index = 0 + self.ns_map = {"dcid": DCID_URL} + self.lock = threading.Lock() + self._obs_records = [] + self._triples = [] + + def insert_observations(self, observations_df: pd.DataFrame, + input_file: File): + if not observations_df.empty: + records = observations_df.to_records(index=False).tolist() + with self.lock: + self._obs_records.extend(records) + + def insert_triples(self, triples: list[Triple]): + if triples: + with self.lock: + self._triples.extend(triples) + + def commit(self): + pass + + def commit_and_close(self): + num_processes = min(multiprocessing.cpu_count(), _EXPORT_PROCESSES_MAX) + + with tempfile.TemporaryDirectory() as temp_local_dir: + logging.info("Using local temporary directory for export buffering: %s", + temp_local_dir) + + if self._obs_records or self._triples: + logging.info( + "Starting JSON-LD local export with %d processes in streaming mode", + num_processes) + with multiprocessing.Pool(processes=num_processes) as pool: + if self._obs_records: + logging.info("Streaming observations export...") + obs_gen = self._generate_observation_chunks(temp_local_dir) + for _ in pool.imap(_write_observation_shard, obs_gen): + pass + + if self._triples: + logging.info("Streaming triples export...") + node_gen = self._generate_node_chunks(temp_local_dir) + for _ in pool.imap(_write_node_shard, node_gen): + pass + + self._upload_shards(temp_local_dir) + + def _generate_observation_chunks(self, temp_local_dir: str): + """Generates observation chunks of size _CHUNK_SIZE, cleaning memory dynamically.""" + prov_urls = {} + for prov in self.nodes.provenances.values(): + prov_id = strip_namespace(prov.id) + prov_urls[prov_id] = prov.url + prov_urls[prov.id] = prov.url + + while self._obs_records: + chunk = self._obs_records[:_CHUNK_SIZE] + del self._obs_records[:_CHUNK_SIZE] + yield (chunk, self.obs_shard_index, temp_local_dir, self.ns_map, + prov_urls) + self.obs_shard_index += 1 + + def _generate_node_chunks(self, temp_local_dir: str): + """Generates node chunks of size _CHUNK_SIZE.""" + while self._triples: + chunk = self._triples[:_CHUNK_SIZE] + del self._triples[:_CHUNK_SIZE] + yield (chunk, self.node_shard_index, temp_local_dir, self.ns_map) + self.node_shard_index += 1 + + def _upload_shards(self, temp_local_dir: str): + """Uploads files in temp_local_dir to jsonld_dir, optimizing for GCS via native SDK.""" + files_to_upload = sorted(os.listdir(temp_local_dir)) + if not files_to_upload: + return + + target_path = self.jsonld_dir.full_path() + logging.info( + "Bulk uploading %d JSON-LD shards to target directory %s in parallel", + len(files_to_upload), target_path) + + if target_path.startswith("gs://"): + self._upload_shards_gcs(temp_local_dir, files_to_upload, target_path) + else: + self._upload_shards_local(temp_local_dir, files_to_upload) + + logging.info("Bulk upload of JSON-LD shards completed successfully.") + + def _upload_shards_gcs(self, temp_local_dir: str, files: list[str], + target_path: str): + """Performs concurrent GCS uploads using native google-cloud-storage client.""" + # Parse bucket and blob prefix + parts = target_path[5:].split("/", 1) + bucket_name = parts[0] + blob_prefix = parts[1].rstrip("/") if len(parts) > 1 else "" + + client = storage.Client() + + # Configure connection pool size for concurrent GCS uploads + adapter = requests.adapters.HTTPAdapter( + pool_connections=_UPLOAD_CONCURRENCY, + pool_maxsize=_UPLOAD_CONCURRENCY + ) + client._http.mount("https://", adapter) + client._http.mount("http://", adapter) + + bucket = client.bucket(bucket_name) + + def _upload_single(filename: str): + local_file_path = os.path.join(temp_local_dir, filename) + blob_key = f"{blob_prefix}/{filename}" if blob_prefix else filename + blob = bucket.blob(blob_key) + blob.upload_from_filename(local_file_path) + + with concurrent.futures.ThreadPoolExecutor( + max_workers=_UPLOAD_CONCURRENCY) as executor: + list(executor.map(_upload_single, files)) + + def _upload_shards_local(self, temp_local_dir: str, files: list[str]): + """Performs concurrent local file copy (for test environments).""" + local_store = create_store(temp_local_dir).as_dir() + target_store = self.jsonld_dir + + def _copy_single(filename: str): + content = local_store.open_file(filename).read() + target_store.open_file(filename).write(content) + + with concurrent.futures.ThreadPoolExecutor( + max_workers=_UPLOAD_CONCURRENCY) as executor: + list(executor.map(_copy_single, files)) diff --git a/simple/stats/main.py b/simple/stats/main.py index 9c3773ba7..59273f190 100644 --- a/simple/stats/main.py +++ b/simple/stats/main.py @@ -14,6 +14,7 @@ import logging import os +import requests.adapters from absl import app from absl import flags @@ -56,7 +57,6 @@ def _run(): # Configure requests adapter default pool size to support parallel GCS uploads - import requests.adapters requests.adapters.DEFAULT_POOLSIZE = 32 initialize_logger() diff --git a/simple/stats/nodes.py b/simple/stats/nodes.py index fb5015b8c..5c6f49a84 100644 --- a/simple/stats/nodes.py +++ b/simple/stats/nodes.py @@ -15,6 +15,7 @@ from functools import wraps import logging import re +import threading import pandas as pd from stats.config import Config @@ -67,7 +68,6 @@ def wrapper(self, *args, **kwargs): class Nodes: def __init__(self, config: Config) -> None: - import threading self.lock = threading.RLock() self.config = config # Custom namespace diff --git a/simple/stats/reporter.py b/simple/stats/reporter.py index 363f20f6b..47e36afca 100644 --- a/simple/stats/reporter.py +++ b/simple/stats/reporter.py @@ -17,6 +17,7 @@ from enum import Enum from functools import wraps import json +import threading import time from util.filesystem import File @@ -44,7 +45,6 @@ class ImportReporter: """ def __init__(self, report_file: File) -> None: - import threading self.lock = threading.RLock() self.status = Status.NOT_STARTED self.start_time = None @@ -156,8 +156,9 @@ def _report(func): @wraps(func) def wrapper(self, *args, **kwargs): - result = func(self, *args, **kwargs) - FileImportReporter.report(self) + with self.parent.lock: + result = func(self, *args, **kwargs) + FileImportReporter.report(self) return result return wrapper @@ -173,7 +174,7 @@ def report_success(self): @_report def report_failure(self, error: str): - self.status = Status.SUCCESS + self.status = Status.FAILURE self.data["error"] = error def json(self) -> dict: diff --git a/simple/stats/runner.py b/simple/stats/runner.py index a5610d6e9..ba18811f5 100644 --- a/simple/stats/runner.py +++ b/simple/stats/runner.py @@ -12,11 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import concurrent.futures from datetime import datetime from datetime import timezone from enum import StrEnum import json import logging +import threading import os from typing import Optional @@ -40,8 +42,8 @@ from stats.db import get_datacommons_platform_config_from_env from stats.db import get_sqlite_path_from_env from stats.db import ImportStatus -from stats.db import JsonLdStreamDb from stats.db import TYPE_CLOUD_SQL +from stats.jsonld_stream_db import JsonLdStreamDb from stats.db_cache import get_db_cache_from_env from stats.db_transfer import transfer_sqlite_to_cloud_sql from stats.entities_importer import EntitiesImporter @@ -160,9 +162,6 @@ def __init__( _check_not_overlapping(input_store, output_store) self.all_stores.append(output_store) self.output_dir = output_store.as_dir() - self.nl_dir = None - if self.mode != RunMode.DCP_BRIDGE: - self.nl_dir = self.output_dir.open_dir(constants.NL_DIR_NAME) self.process_dir = self.output_dir.open_dir(constants.PROCESS_DIR_NAME) # Reporter. @@ -526,6 +525,7 @@ def _run_local_sqlite_build_import(self): logging.warning(f"Failed to cleanup local database: {e}") def _generate_nl_artifacts(self): + nl_dir = self.output_dir.open_dir(constants.NL_DIR_NAME) triples: list[Triple] = [] topic_triples = self.db.select_triples_by_subject_type(sc.TYPE_TOPIC) sv_triples = self.db.select_triples_by_subject_type( @@ -533,14 +533,14 @@ def _generate_nl_artifacts(self): triples = topic_triples + sv_triples # Generate sentences. - nl.generate_nl_sentences(triples, self.nl_dir) + nl.generate_nl_sentences(triples, nl_dir) # If generating topics, fetch svpg triples as well and generate topic cache if topic_triples: sv_peer_group_triples = self.db.select_triples_by_subject_type( sc.TYPE_STAT_VAR_PEER_GROUP) topic_cache_triples = topic_triples + sv_peer_group_triples - nl.generate_topic_cache(topic_cache_triples, self.nl_dir) + nl.generate_topic_cache(topic_cache_triples, nl_dir) def _generate_svg_hierarchy(self): if self.mode == RunMode.MAIN_DC: @@ -616,11 +616,9 @@ def _check_if_special_file(self, file: File) -> bool: return True return False - def _run_all_data_imports(self): + def _find_and_filter_input_files(self) -> tuple[list[File], list[File]]: + """Discovers, filters, sorts, and returns matched CSV and MCF files.""" input_files: list[File] = [] - input_csv_files: list[File] = [] - input_mcf_files: list[File] = [] - for input_store in self.input_stores: if input_store.isdir(): input_files.extend(input_store.as_dir().all_files( @@ -628,71 +626,75 @@ def _run_all_data_imports(self): else: input_files.append(input_store.as_file()) - for input_file in input_files: - if _ARCHIVES_DIR_NAME in input_file.path.split("/"): + csv_files: list[File] = [] + mcf_files: list[File] = [] + + for file in input_files: + if _ARCHIVES_DIR_NAME in file.path.split("/"): continue - if self._check_if_special_file(input_file): + if self._check_if_special_file(file): continue - if match(input_file, "*.csv"): - input_csv_files.append(input_file) - if match(input_file, "*.mcf"): - input_mcf_files.append(input_file) + if match(file, "*.csv"): + csv_files.append(file) + elif match(file, "*.mcf"): + mcf_files.append(file) - # Sort input files alphabetically. - input_csv_files.sort(key=lambda f: f.full_path()) - input_mcf_files.sort(key=lambda f: f.full_path()) + # Sort alphabetically to guarantee consistent order + csv_files.sort(key=lambda f: f.full_path()) + mcf_files.sort(key=lambda f: f.full_path()) + return csv_files, mcf_files - logging.info(f"Found {len(input_csv_files)} csv files to import") - logging.info(f"Found {len(input_mcf_files)} mcf files to import") + def _run_all_data_imports(self): + """Orchestrates file scanning, thread-pool configuration, and file ingestion.""" + csv_files, mcf_files = self._find_and_filter_input_files() + + logging.info("Found %d CSV files to import", len(csv_files)) + logging.info("Found %d MCF files to import", len(mcf_files)) logging.info("Matched files to process: %s", - [f.full_path() for f in input_csv_files + input_mcf_files]) + [f.full_path() for f in csv_files + mcf_files]) + + self.reporter.report_started(import_files=list(csv_files + mcf_files)) - self.reporter.report_started(import_files=list(input_csv_files + - input_mcf_files)) - import threading self._completed_files_count = 0 - self._total_files_count = len(input_csv_files) + len(input_mcf_files) + self._total_files_count = len(csv_files) + len(mcf_files) self._counter_lock = threading.Lock() if self.mode == RunMode.DCP_BRIDGE: - import concurrent.futures - num_threads = min(32, (len(input_csv_files) + len(input_mcf_files)) or 1) + num_threads = min(32, self._total_files_count or 1) logging.info("Starting parallel ingestion of data files with %d threads", num_threads) with concurrent.futures.ThreadPoolExecutor( max_workers=num_threads) as executor: futures = [] - for input_csv_file in input_csv_files: - futures.append( - executor.submit(self._run_single_import, input_csv_file)) - for input_mcf_file in input_mcf_files: - futures.append( - executor.submit(self._run_single_mcf_import, input_mcf_file)) - - # Wait for all files to be processed and propagate any exception + for file in csv_files: + futures.append(executor.submit(self._run_single_import, file)) + for file in mcf_files: + futures.append(executor.submit(self._run_single_mcf_import, file)) + + # Wait for completion and raise any thread exceptions for future in concurrent.futures.as_completed(futures): future.result() else: - for input_csv_file in input_csv_files: - self._run_single_import(input_csv_file) - for input_mcf_file in input_mcf_files: - self._run_single_mcf_import(input_mcf_file) + for file in csv_files: + self._run_single_import(file) + for file in mcf_files: + self._run_single_mcf_import(file) - def _run_single_import(self, input_file: File): + def _log_file_progress(self, file_prefix: str, file: File): + """Increments file progress counter thread-safely and logs standard progress line.""" with self._counter_lock: self._completed_files_count += 1 current_count = self._completed_files_count - logging.info("[%d/%d] Importing file: %s", current_count, - self._total_files_count, input_file) + logging.info("[%d/%d] %s: %s", current_count, self._total_files_count, + file_prefix, file) + + def _run_single_import(self, input_file: File): + self._log_file_progress("Importing CSV file", input_file) self._create_importer(input_file).do_import() def _run_single_mcf_import(self, input_mcf_file: File): - with self._counter_lock: - self._completed_files_count += 1 - current_count = self._completed_files_count - logging.info("[%d/%d] Importing MCF file: %s", current_count, - self._total_files_count, input_mcf_file) + self._log_file_progress("Importing MCF file", input_mcf_file) self._create_mcf_importer(input_mcf_file, self.output_dir, self.mode == RunMode.MAIN_DC).do_import() diff --git a/simple/tests/stats/jsonld_stream_db_test.py b/simple/tests/stats/jsonld_stream_db_test.py new file mode 100644 index 000000000..e0f8942a3 --- /dev/null +++ b/simple/tests/stats/jsonld_stream_db_test.py @@ -0,0 +1,297 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import tempfile +import unittest +from unittest import mock + +import pandas as pd + +from stats.data import Triple +from stats.jsonld_stream_db import JsonLdStreamDb +from util.filesystem import create_store + + +class TestJsonLdStreamDb(unittest.TestCase): + + def setUp(self): + self.mock_config = mock.MagicMock() + self.mock_config.custom_id_namespace.return_value = "custom" + self.mock_config.data = {"importName": "test_import"} + + self.mock_nodes = mock.MagicMock() + self.mock_nodes.config = self.mock_config + self.mock_nodes.provenances = {} + + def test_directory_creation(self): + with tempfile.TemporaryDirectory() as temp_dir: + temp_store = create_store(temp_dir) + + db = JsonLdStreamDb( + output_dir=temp_store.as_dir(), + import_names=["test_import"], + nodes=self.mock_nodes + ) + + # The jsonld folder should be created under the output dir + self.assertTrue(os.path.isdir(os.path.join(temp_dir, "jsonld"))) + + # The db path should contain test_import + self.assertEqual(db.import_name, "test_import") + self.assertTrue(db.jsonld_dir.full_path().startswith( + os.path.join(temp_dir, "jsonld", "test_import_"))) + + def test_insert_observations_and_triples(self): + with tempfile.TemporaryDirectory() as temp_dir: + temp_store = create_store(temp_dir) + db = JsonLdStreamDb( + output_dir=temp_store.as_dir(), + import_names=["test_import"], + nodes=self.mock_nodes + ) + + # Insert observations + df = pd.DataFrame([("e1", "v1", "2026", "100", "p1", "", "", "", "", "")], + columns=[ + "entity", "variable", "date", "value", "provenance", + "unit", "scaling_factor", "measurement_method", + "observation_period", "properties" + ]) + mock_file = mock.Mock() + db.insert_observations(df, mock_file) + self.assertEqual(len(db._obs_records), 1) + self.assertEqual(db._obs_records[0][0], "e1") + + # Insert triples + triples = [Triple("sub1", "pred1", object_value="val1")] + db.insert_triples(triples) + self.assertEqual(len(db._triples), 1) + + def test_commit_and_close_local(self): + with tempfile.TemporaryDirectory() as temp_dir: + temp_store = create_store(temp_dir) + db = JsonLdStreamDb( + output_dir=temp_store.as_dir(), + import_names=["test_import"], + nodes=self.mock_nodes + ) + + # Insert observations + df = pd.DataFrame([("e1", "v1", "2026", "100", "p1", "", "", "", "", "")], + columns=[ + "entity", "variable", "date", "value", "provenance", + "unit", "scaling_factor", "measurement_method", + "observation_period", "properties" + ]) + mock_file = mock.Mock() + db.insert_observations(df, mock_file) + + # Insert triples + triples = [Triple("sub1", "typeOf", object_id="StatisticalVariable")] + db.insert_triples(triples) + + db.commit_and_close() + + # Shards should be written directly to the target unique directory + target_dir_path = db.jsonld_dir.full_path() + obs_shard = os.path.join(target_dir_path, "observation-00000.jsonld") + node_shard = os.path.join(target_dir_path, "node-00000.jsonld") + + self.assertTrue(os.path.exists(obs_shard)) + self.assertTrue(os.path.exists(node_shard)) + + # Validate observation shard content + with open(obs_shard, "r") as f: + data = json.load(f) + self.assertIn("@graph", data) + graph = data["@graph"] + self.assertEqual(len(graph), 1) + self.assertEqual(graph[0]["dcid:observationAbout"]["@id"], "dcid:e1") + self.assertEqual(graph[0]["dcid:value"], 100) + + # Validate node shard content + with open(node_shard, "r") as f: + data = json.load(f) + self.assertIn("@graph", data) + graph = data["@graph"] + self.assertEqual(len(graph), 1) + self.assertEqual(graph[0]["@id"], "dcid:sub1") + self.assertEqual(graph[0]["@type"], "dcid:StatisticalVariable") + + @mock.patch("google.cloud.storage.Client") + def test_commit_and_close_gcs(self, mock_storage_client): + # Setup GCS mock + mock_client_instance = mock_storage_client.return_value + mock_bucket = mock_client_instance.bucket.return_value + mock_blob = mock_bucket.blob.return_value + + with tempfile.TemporaryDirectory() as temp_dir: + temp_store = create_store(temp_dir) + + # Mock the output dir as a GCS path + mock_output_dir = mock.MagicMock() + mock_output_dir.open_dir.return_value.open_dir.return_value.full_path.return_value = "gs://my-bucket/ingestion/test" + mock_output_dir.open_dir.return_value.open_dir.return_value.isdir.return_value = False + + db = JsonLdStreamDb( + output_dir=mock_output_dir, + import_names=["test_import"], + nodes=self.mock_nodes + ) + + # Insert observation + df = pd.DataFrame([("e1", "v1", "2026", "100", "p1", "", "", "", "", "")], + columns=[ + "entity", "variable", "date", "value", "provenance", + "unit", "scaling_factor", "measurement_method", + "observation_period", "properties" + ]) + mock_file = mock.Mock() + db.insert_observations(df, mock_file) + + db.commit_and_close() + + # Verify storage bucket call was made + mock_storage_client.assert_called_once() + mock_client_instance.bucket.assert_called_with("my-bucket") + + # Verify upload blob calls + mock_bucket.blob.assert_called_with("ingestion/test/observation-00000.jsonld") + mock_blob.upload_from_filename.assert_called_once() + + def test_node_fast_vs_rdflib_parity(self): + """Rigorous parity test: Compares fast path output with rdflib path output.""" + from stats.jsonld_stream_db import _write_node_shard_fast, _write_node_shard_rdflib + + complex_triples = [ + Triple(subject_id="sub1", predicate="typeOf", object_id="StatisticalVariable"), + Triple(subject_id="sub1", predicate="name", object_value="Test Node"), + # Multi-valued properties + Triple(subject_id="sub1", predicate="alternateName", object_value="Alias A"), + Triple(subject_id="sub1", predicate="alternateName", object_value="Alias B"), + # References vs Values + Triple(subject_id="sub1", predicate="memberOf", object_id="groupA"), + # Number types + Triple(subject_id="sub1", predicate="countValue", object_value=15.8), + Triple(subject_id="sub1", predicate="intValue", object_value=99), + # External URL predicate/object + Triple(subject_id="sub1", predicate="http://schema.org/url", object_id="https://example.org"), + ] + from stats.jsonld_exporter import DCID_URL + ns_map = {"dcid": DCID_URL} + + with tempfile.TemporaryDirectory() as temp_dir_fast, \ + tempfile.TemporaryDirectory() as temp_dir_rdflib: + + _write_node_shard_fast((complex_triples, 0, temp_dir_fast, ns_map)) + _write_node_shard_rdflib((complex_triples, 0, temp_dir_rdflib, ns_map)) + + fast_file = os.path.join(temp_dir_fast, "node-00000.jsonld") + rdflib_file = os.path.join(temp_dir_rdflib, "node-00000.jsonld") + + self.assertTrue(os.path.exists(fast_file)) + self.assertTrue(os.path.exists(rdflib_file)) + + with open(fast_file, "r") as f: + fast_json = json.load(f) + with open(rdflib_file, "r") as f: + rdflib_json = json.load(f) + + # Helper to normalize a JSON-LD graph for strict comparison + def normalize_graph(graph): + normalized = {} + for item in graph["@graph"]: + item_id = item["@id"] + normalized_item = {} + for k, v in item.items(): + if k == "@id": + continue + # If value is list, sort it to ensure order-independence + if isinstance(v, list): + sorted_v = sorted( + v, + key=lambda x: x["@id"] if isinstance(x, dict) and "@id" in x else str(x) + ) + normalized_item[k] = sorted_v + else: + normalized_item[k] = v + normalized[item_id] = normalized_item + return normalized + + self.assertEqual(normalize_graph(fast_json), normalize_graph(rdflib_json)) + + def test_observation_parsing_edge_cases(self): + """Rigorous data type & properties parsing check to ensure zero property loss.""" + from stats.jsonld_stream_db import _write_observation_shard + + # Custom properties as nested JSON + custom_props = json.dumps({ + "customIntProp": 42, + "dcid:customStrProp": "customVal", + "http://schema.org/url": "https://test-prop.org" + }) + + # Rows with edge-case numbers and strings + chunk = [ + # entity, variable, date, value, provenance, unit, scaling_factor, mmethod, period, props + ("country/ALB", "v1", "2026", "99", "p1", "unit1", "100", "m1", "P1Y", custom_props), + ("country/USA", "v1", "2026.5", "123.45", "p1", None, "10.5", None, None, None), + ("country/IND", "v1", "2026-06", "Unavailable", "p1", None, None, None, None, None), + ] + + ns_map = {"dcid": "https://datacommons.org/ontology/"} + prov_urls = {"p1": "http://my-provenance.org/url"} + + with tempfile.TemporaryDirectory() as temp_dir: + _write_observation_shard((chunk, 0, temp_dir, ns_map, prov_urls)) + + shard_file = os.path.join(temp_dir, "observation-00000.jsonld") + self.assertTrue(os.path.exists(shard_file)) + + with open(shard_file, "r") as f: + data = json.load(f) + + self.assertIn("@graph", data) + graph = data["@graph"] + self.assertEqual(len(graph), 3) + + # 1. Verify first observation (Int types, Custom Props) + obs1 = [o for o in graph if o["dcid:observationAbout"]["@id"] == "dcid:country/ALB"][0] + self.assertEqual(obs1["dcid:value"], 99) + self.assertEqual(obs1["dcid:observationDate"], 2026) + self.assertEqual(obs1["dcid:scalingFactor"], 100) + self.assertEqual(obs1["dcid:provenanceUrl"], "http://my-provenance.org/url") + self.assertEqual(obs1["dcid:observationPeriod"], "P1Y") + + # Verify custom properties from JSON string + self.assertEqual(obs1["dcid:customIntProp"], 42) + self.assertEqual(obs1["dcid:customStrProp"], "customVal") + self.assertEqual(obs1["http://schema.org/url"], "https://test-prop.org") + + # 2. Verify second observation (Float types) + obs2 = [o for o in graph if o["dcid:observationAbout"]["@id"] == "dcid:country/USA"][0] + self.assertEqual(obs2["dcid:value"], 123.45) + self.assertEqual(obs2["dcid:observationDate"], 2026.5) + self.assertEqual(obs2["dcid:scalingFactor"], 10.5) + + # 3. Verify third observation (Non-numeric value & Date string) + obs3 = [o for o in graph if o["dcid:observationAbout"]["@id"] == "dcid:country/IND"][0] + self.assertEqual(obs3["dcid:value"], "Unavailable") + self.assertEqual(obs3["dcid:observationDate"], "2026-06") + + +if __name__ == "__main__": + unittest.main() From 7a45722adacb994627f03607f53074b24af0e42b Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 17:39:32 -0400 Subject: [PATCH 18/24] Cleanup the JSONLD parsing code. --- simple/stats/jsonld_stream_db.py | 63 +++++++++++++------------------- 1 file changed, 25 insertions(+), 38 deletions(-) diff --git a/simple/stats/jsonld_stream_db.py b/simple/stats/jsonld_stream_db.py index 0870c7f91..9fc617202 100644 --- a/simple/stats/jsonld_stream_db.py +++ b/simple/stats/jsonld_stream_db.py @@ -61,6 +61,17 @@ def _uri_ref(val): return {"@id": f"dcid:{val.lstrip('/')}"} +def _parse_numeric(val): + if val is None or val == "": + return None + try: + if "." in str(val): + return float(val) + return int(val) + except ValueError: + return str(val) + + def _write_observation_shard(args): chunk, shard_index, jsonld_dir_path, ns_map, prov_urls = args graph_list = [] @@ -75,30 +86,11 @@ def _write_observation_shard(args): "@id": f"dcid:obs_{obs_hash}", "@type": "dcid:StatVarObservation", "dcid:observationAbout": _uri_ref(entity), - "dcid:variableMeasured": _uri_ref(variable) + "dcid:variableMeasured": _uri_ref(variable), + "dcid:observationDate": _parse_numeric(date), + "dcid:value": _parse_numeric(value), } - # Format date - try: - if str(date).isdigit(): - obs_obj["dcid:observationDate"] = int(date) - else: - try: - obs_obj["dcid:observationDate"] = float(date) - except ValueError: - obs_obj["dcid:observationDate"] = str(date) - except Exception: - obs_obj["dcid:observationDate"] = str(date) - - # Format value - try: - if '.' in str(value): - obs_obj["dcid:value"] = float(value) - else: - obs_obj["dcid:value"] = int(value) - except ValueError: - obs_obj["dcid:value"] = value - if provenance: obs_obj["dcid:provenance"] = _uri_ref(provenance) if provenance in prov_urls and prov_urls[provenance]: @@ -106,13 +98,7 @@ def _write_observation_shard(args): if unit: obs_obj["dcid:unit"] = _uri_ref(unit) if scaling_factor: - try: - if '.' in str(scaling_factor): - obs_obj["dcid:scalingFactor"] = float(scaling_factor) - else: - obs_obj["dcid:scalingFactor"] = int(scaling_factor) - except ValueError: - obs_obj["dcid:scalingFactor"] = scaling_factor + obs_obj["dcid:scalingFactor"] = _parse_numeric(scaling_factor) if mmethod: obs_obj["dcid:measurementMethod"] = _uri_ref(mmethod) if period: @@ -172,14 +158,7 @@ def _write_node_shard_fast(args): if row.object_id: val = _uri_ref(row.object_id) else: - obj_val = row.object_value - try: - if '.' in str(obj_val): - val = float(obj_val) - else: - val = int(obj_val) - except ValueError: - val = str(obj_val) + val = _parse_numeric(row.object_value) if pred_key == "@type": val_str = val["@id"] if isinstance(val, @@ -208,6 +187,14 @@ def _write_node_shard_fast(args): def _write_node_shard_rdflib(args): + """ + Writes a chunk of triples to a JSON-LD shard using rdflib. + Args: + args: Tuple containing (chunk, shard_index, jsonld_dir_path, ns_map) + """ + + # TODO(gmechali): Completely deprecate this path after we have 100% certainty in the direct export. + # note that this path is exponentially slower. chunk, shard_index, jsonld_dir_path, ns_map = args DCID = Namespace(DCID_URL) g = Graph() @@ -232,7 +219,7 @@ def _write_node_shard_rdflib(args): class JsonLdStreamDb(Db): - """A DB implementation that streams triples and observations directly to JSON-LD shards on GCS/Disk, bypassing SQLite.""" + """A DB implementation that streams triples and observations directly to JSON-LD shards on GCS/Disk.""" def __init__(self, output_dir, import_names, nodes) -> None: self.output_dir = output_dir From 37aab53b05d07b877a3149d0901a2bc100b666e2 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 17:46:18 -0400 Subject: [PATCH 19/24] Minor cleanup --- simple/stats/config.py | 2 +- simple/stats/db.py | 7 ------- 2 files changed, 1 insertion(+), 8 deletions(-) diff --git a/simple/stats/config.py b/simple/stats/config.py index a4b3b4423..36baa5d0b 100644 --- a/simple/stats/config.py +++ b/simple/stats/config.py @@ -242,7 +242,7 @@ def custom_svg_prefix(self) -> str: - If explicitly set via 'customSvgPrefix', return it. - Else if 'customIdNamespace' is explicitly provided in config, derive as f"{namespace}/g/" where namespace is the validated value of - customIdNamespace. + customIdNamespace. - Else fall back to the built-in default (e.g., 'c/g/'). """ from stats import schema_constants as sc diff --git a/simple/stats/db.py b/simple/stats/db.py index 0ee00f793..e1aa30393 100644 --- a/simple/stats/db.py +++ b/simple/stats/db.py @@ -12,20 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. -import concurrent.futures from dataclasses import dataclass from datetime import datetime -from datetime import timezone from enum import auto from enum import Enum -import gc import json import logging -import multiprocessing import os import sqlite3 -import tempfile -import threading from typing import Any from google.cloud.sql.connector.connector import Connector @@ -45,7 +39,6 @@ from stats.data import McfNode from stats.data import STAT_VAR_GROUP from stats.data import STATISTICAL_VARIABLE -from stats.data import strip_namespace from stats.data import Triple from util.filesystem import create_store from util.filesystem import Dir From cd8dc1fbc03140a94befcd04d4a6e21242879846 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 17:46:41 -0400 Subject: [PATCH 20/24] Lint --- simple/stats/jsonld_stream_db.py | 12 +-- simple/stats/main.py | 2 +- simple/stats/runner.py | 4 +- simple/tests/stats/jsonld_stream_db_test.py | 92 ++++++++++++--------- 4 files changed, 60 insertions(+), 50 deletions(-) diff --git a/simple/stats/jsonld_stream_db.py b/simple/stats/jsonld_stream_db.py index 9fc617202..dc5995666 100644 --- a/simple/stats/jsonld_stream_db.py +++ b/simple/stats/jsonld_stream_db.py @@ -22,21 +22,19 @@ import logging import multiprocessing import os -import requests import tempfile import threading from google.cloud import storage - import pandas as pd from rdflib import Graph from rdflib import Literal from rdflib import Namespace from rdflib import RDF - +import requests from stats import constants -from stats.data import Triple from stats.data import strip_namespace +from stats.data import Triple from stats.db import Db from stats.jsonld_exporter import DCID_URL from stats.jsonld_exporter import expand_id @@ -341,12 +339,10 @@ def _upload_shards_gcs(self, temp_local_dir: str, files: list[str], blob_prefix = parts[1].rstrip("/") if len(parts) > 1 else "" client = storage.Client() - + # Configure connection pool size for concurrent GCS uploads adapter = requests.adapters.HTTPAdapter( - pool_connections=_UPLOAD_CONCURRENCY, - pool_maxsize=_UPLOAD_CONCURRENCY - ) + pool_connections=_UPLOAD_CONCURRENCY, pool_maxsize=_UPLOAD_CONCURRENCY) client._http.mount("https://", adapter) client._http.mount("http://", adapter) diff --git a/simple/stats/main.py b/simple/stats/main.py index 59273f190..1c271763e 100644 --- a/simple/stats/main.py +++ b/simple/stats/main.py @@ -14,11 +14,11 @@ import logging import os -import requests.adapters from absl import app from absl import flags from freezegun import freeze_time +import requests.adapters from stats import constants from stats.logger import initialize_logger from stats.runner import RunMode diff --git a/simple/stats/runner.py b/simple/stats/runner.py index ba18811f5..9625d675e 100644 --- a/simple/stats/runner.py +++ b/simple/stats/runner.py @@ -18,8 +18,8 @@ from enum import StrEnum import json import logging -import threading import os +import threading from typing import Optional import fs.path as fspath @@ -43,13 +43,13 @@ from stats.db import get_sqlite_path_from_env from stats.db import ImportStatus from stats.db import TYPE_CLOUD_SQL -from stats.jsonld_stream_db import JsonLdStreamDb from stats.db_cache import get_db_cache_from_env from stats.db_transfer import transfer_sqlite_to_cloud_sql from stats.entities_importer import EntitiesImporter from stats.events_importer import EventsImporter from stats.importer import Importer from stats.jsonld_exporter import export_to_jsonld +from stats.jsonld_stream_db import JsonLdStreamDb from stats.mcf_importer import McfImporter import stats.nl as nl from stats.nodes import Nodes diff --git a/simple/tests/stats/jsonld_stream_db_test.py b/simple/tests/stats/jsonld_stream_db_test.py index e0f8942a3..b83432fc7 100644 --- a/simple/tests/stats/jsonld_stream_db_test.py +++ b/simple/tests/stats/jsonld_stream_db_test.py @@ -19,7 +19,6 @@ from unittest import mock import pandas as pd - from stats.data import Triple from stats.jsonld_stream_db import JsonLdStreamDb from util.filesystem import create_store @@ -40,11 +39,9 @@ def test_directory_creation(self): with tempfile.TemporaryDirectory() as temp_dir: temp_store = create_store(temp_dir) - db = JsonLdStreamDb( - output_dir=temp_store.as_dir(), - import_names=["test_import"], - nodes=self.mock_nodes - ) + db = JsonLdStreamDb(output_dir=temp_store.as_dir(), + import_names=["test_import"], + nodes=self.mock_nodes) # The jsonld folder should be created under the output dir self.assertTrue(os.path.isdir(os.path.join(temp_dir, "jsonld"))) @@ -57,11 +54,9 @@ def test_directory_creation(self): def test_insert_observations_and_triples(self): with tempfile.TemporaryDirectory() as temp_dir: temp_store = create_store(temp_dir) - db = JsonLdStreamDb( - output_dir=temp_store.as_dir(), - import_names=["test_import"], - nodes=self.mock_nodes - ) + db = JsonLdStreamDb(output_dir=temp_store.as_dir(), + import_names=["test_import"], + nodes=self.mock_nodes) # Insert observations df = pd.DataFrame([("e1", "v1", "2026", "100", "p1", "", "", "", "", "")], @@ -83,11 +78,9 @@ def test_insert_observations_and_triples(self): def test_commit_and_close_local(self): with tempfile.TemporaryDirectory() as temp_dir: temp_store = create_store(temp_dir) - db = JsonLdStreamDb( - output_dir=temp_store.as_dir(), - import_names=["test_import"], - nodes=self.mock_nodes - ) + db = JsonLdStreamDb(output_dir=temp_store.as_dir(), + import_names=["test_import"], + nodes=self.mock_nodes) # Insert observations df = pd.DataFrame([("e1", "v1", "2026", "100", "p1", "", "", "", "", "")], @@ -146,11 +139,9 @@ def test_commit_and_close_gcs(self, mock_storage_client): mock_output_dir.open_dir.return_value.open_dir.return_value.full_path.return_value = "gs://my-bucket/ingestion/test" mock_output_dir.open_dir.return_value.open_dir.return_value.isdir.return_value = False - db = JsonLdStreamDb( - output_dir=mock_output_dir, - import_names=["test_import"], - nodes=self.mock_nodes - ) + db = JsonLdStreamDb(output_dir=mock_output_dir, + import_names=["test_import"], + nodes=self.mock_nodes) # Insert observation df = pd.DataFrame([("e1", "v1", "2026", "100", "p1", "", "", "", "", "")], @@ -169,33 +160,43 @@ def test_commit_and_close_gcs(self, mock_storage_client): mock_client_instance.bucket.assert_called_with("my-bucket") # Verify upload blob calls - mock_bucket.blob.assert_called_with("ingestion/test/observation-00000.jsonld") + mock_bucket.blob.assert_called_with( + "ingestion/test/observation-00000.jsonld") mock_blob.upload_from_filename.assert_called_once() def test_node_fast_vs_rdflib_parity(self): """Rigorous parity test: Compares fast path output with rdflib path output.""" - from stats.jsonld_stream_db import _write_node_shard_fast, _write_node_shard_rdflib - + from stats.jsonld_stream_db import _write_node_shard_fast + from stats.jsonld_stream_db import _write_node_shard_rdflib + complex_triples = [ - Triple(subject_id="sub1", predicate="typeOf", object_id="StatisticalVariable"), + Triple(subject_id="sub1", + predicate="typeOf", + object_id="StatisticalVariable"), Triple(subject_id="sub1", predicate="name", object_value="Test Node"), # Multi-valued properties - Triple(subject_id="sub1", predicate="alternateName", object_value="Alias A"), - Triple(subject_id="sub1", predicate="alternateName", object_value="Alias B"), + Triple(subject_id="sub1", + predicate="alternateName", + object_value="Alias A"), + Triple(subject_id="sub1", + predicate="alternateName", + object_value="Alias B"), # References vs Values Triple(subject_id="sub1", predicate="memberOf", object_id="groupA"), # Number types Triple(subject_id="sub1", predicate="countValue", object_value=15.8), Triple(subject_id="sub1", predicate="intValue", object_value=99), # External URL predicate/object - Triple(subject_id="sub1", predicate="http://schema.org/url", object_id="https://example.org"), + Triple(subject_id="sub1", + predicate="http://schema.org/url", + object_id="https://example.org"), ] from stats.jsonld_exporter import DCID_URL ns_map = {"dcid": DCID_URL} with tempfile.TemporaryDirectory() as temp_dir_fast, \ tempfile.TemporaryDirectory() as temp_dir_rdflib: - + _write_node_shard_fast((complex_triples, 0, temp_dir_fast, ns_map)) _write_node_shard_rdflib((complex_triples, 0, temp_dir_rdflib, ns_map)) @@ -223,8 +224,8 @@ def normalize_graph(graph): if isinstance(v, list): sorted_v = sorted( v, - key=lambda x: x["@id"] if isinstance(x, dict) and "@id" in x else str(x) - ) + key=lambda x: x["@id"] + if isinstance(x, dict) and "@id" in x else str(x)) normalized_item[k] = sorted_v else: normalized_item[k] = v @@ -247,9 +248,12 @@ def test_observation_parsing_edge_cases(self): # Rows with edge-case numbers and strings chunk = [ # entity, variable, date, value, provenance, unit, scaling_factor, mmethod, period, props - ("country/ALB", "v1", "2026", "99", "p1", "unit1", "100", "m1", "P1Y", custom_props), - ("country/USA", "v1", "2026.5", "123.45", "p1", None, "10.5", None, None, None), - ("country/IND", "v1", "2026-06", "Unavailable", "p1", None, None, None, None, None), + ("country/ALB", "v1", "2026", "99", "p1", "unit1", "100", "m1", "P1Y", + custom_props), + ("country/USA", "v1", "2026.5", "123.45", "p1", None, "10.5", None, + None, None), + ("country/IND", "v1", "2026-06", "Unavailable", "p1", None, None, None, + None, None), ] ns_map = {"dcid": "https://datacommons.org/ontology/"} @@ -269,26 +273,36 @@ def test_observation_parsing_edge_cases(self): self.assertEqual(len(graph), 3) # 1. Verify first observation (Int types, Custom Props) - obs1 = [o for o in graph if o["dcid:observationAbout"]["@id"] == "dcid:country/ALB"][0] + obs1 = [ + o for o in graph + if o["dcid:observationAbout"]["@id"] == "dcid:country/ALB" + ][0] self.assertEqual(obs1["dcid:value"], 99) self.assertEqual(obs1["dcid:observationDate"], 2026) self.assertEqual(obs1["dcid:scalingFactor"], 100) - self.assertEqual(obs1["dcid:provenanceUrl"], "http://my-provenance.org/url") + self.assertEqual(obs1["dcid:provenanceUrl"], + "http://my-provenance.org/url") self.assertEqual(obs1["dcid:observationPeriod"], "P1Y") - + # Verify custom properties from JSON string self.assertEqual(obs1["dcid:customIntProp"], 42) self.assertEqual(obs1["dcid:customStrProp"], "customVal") self.assertEqual(obs1["http://schema.org/url"], "https://test-prop.org") # 2. Verify second observation (Float types) - obs2 = [o for o in graph if o["dcid:observationAbout"]["@id"] == "dcid:country/USA"][0] + obs2 = [ + o for o in graph + if o["dcid:observationAbout"]["@id"] == "dcid:country/USA" + ][0] self.assertEqual(obs2["dcid:value"], 123.45) self.assertEqual(obs2["dcid:observationDate"], 2026.5) self.assertEqual(obs2["dcid:scalingFactor"], 10.5) # 3. Verify third observation (Non-numeric value & Date string) - obs3 = [o for o in graph if o["dcid:observationAbout"]["@id"] == "dcid:country/IND"][0] + obs3 = [ + o for o in graph + if o["dcid:observationAbout"]["@id"] == "dcid:country/IND" + ][0] self.assertEqual(obs3["dcid:value"], "Unavailable") self.assertEqual(obs3["dcid:observationDate"], "2026-06") From bae4f61b6e2f28aeb3eeb47ef35aaae7f59cb033 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 17:55:50 -0400 Subject: [PATCH 21/24] More cleanups and comments --- simple/stats/jsonld_stream_db.py | 5 ++--- simple/stats/logger.py | 7 ------- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/simple/stats/jsonld_stream_db.py b/simple/stats/jsonld_stream_db.py index dc5995666..cede39870 100644 --- a/simple/stats/jsonld_stream_db.py +++ b/simple/stats/jsonld_stream_db.py @@ -52,9 +52,7 @@ def _uri_ref(val): if not val: return None - if val.startswith("http://") or val.startswith("https://"): - return {"@id": val} - if val.startswith("dcid:"): + if val.startswith(("http://", "https://", "dcid:")): return {"@id": val} return {"@id": f"dcid:{val.lstrip('/')}"} @@ -125,6 +123,7 @@ def _write_observation_shard(args): def _write_node_shard(args): + # TODO(gmechali): Get rid of this and keep only the "fast" mode. fast_export = os.getenv("FAST_NODE_EXPORT", "true").lower() in ("true", "1", "yes") if fast_export: diff --git a/simple/stats/logger.py b/simple/stats/logger.py index f85a254aa..c46d4cdd0 100644 --- a/simple/stats/logger.py +++ b/simple/stats/logger.py @@ -36,13 +36,6 @@ def initialize_logger(): for handler in logging.root.handlers: logging.root.removeHandler(handler) - # Initialize logging - try: - sys.stdout.reconfigure(line_buffering=True) - sys.stderr.reconfigure(line_buffering=True) - except AttributeError: - # In some test environments sys.stdout may not support reconfigure - pass logger = logging.getLogger() logger.setLevel(log_level) handler = logging.StreamHandler(sys.stdout) From 569eac5ff73ff2dfeac4ccfcb239449ba67f3272 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 17:59:09 -0400 Subject: [PATCH 22/24] Gemini comments round 1 --- simple/stats/jsonld_stream_db.py | 27 ++++++++++++++------- simple/tests/stats/jsonld_stream_db_test.py | 3 +++ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/simple/stats/jsonld_stream_db.py b/simple/stats/jsonld_stream_db.py index cede39870..e92a9bd50 100644 --- a/simple/stats/jsonld_stream_db.py +++ b/simple/stats/jsonld_stream_db.py @@ -158,9 +158,16 @@ def _write_node_shard_fast(args): val = _parse_numeric(row.object_value) if pred_key == "@type": - val_str = val["@id"] if isinstance(val, - dict) and "@id" in val else str(val) - subjects[sub_id]["@type"] = val_str + val_str = val["@id"] if isinstance(val, dict) and "@id" in val else str(val) + if "@type" in subjects[sub_id]: + existing = subjects[sub_id]["@type"] + if isinstance(existing, list): + if val_str not in existing: + existing.append(val_str) + elif existing != val_str: + subjects[sub_id]["@type"] = [existing, val_str] + else: + subjects[sub_id]["@type"] = val_str else: if pred_key in subjects[sub_id]: existing = subjects[sub_id][pred_key] @@ -296,20 +303,22 @@ def _generate_observation_chunks(self, temp_local_dir: str): prov_urls[prov_id] = prov.url prov_urls[prov.id] = prov.url - while self._obs_records: - chunk = self._obs_records[:_CHUNK_SIZE] - del self._obs_records[:_CHUNK_SIZE] + num_records = len(self._obs_records) + for idx in range(0, num_records, _CHUNK_SIZE): + chunk = self._obs_records[idx:idx + _CHUNK_SIZE] yield (chunk, self.obs_shard_index, temp_local_dir, self.ns_map, prov_urls) self.obs_shard_index += 1 + self._obs_records.clear() def _generate_node_chunks(self, temp_local_dir: str): """Generates node chunks of size _CHUNK_SIZE.""" - while self._triples: - chunk = self._triples[:_CHUNK_SIZE] - del self._triples[:_CHUNK_SIZE] + num_triples = len(self._triples) + for idx in range(0, num_triples, _CHUNK_SIZE): + chunk = self._triples[idx:idx + _CHUNK_SIZE] yield (chunk, self.node_shard_index, temp_local_dir, self.ns_map) self.node_shard_index += 1 + self._triples.clear() def _upload_shards(self, temp_local_dir: str): """Uploads files in temp_local_dir to jsonld_dir, optimizing for GCS via native SDK.""" diff --git a/simple/tests/stats/jsonld_stream_db_test.py b/simple/tests/stats/jsonld_stream_db_test.py index b83432fc7..48f9fdd5b 100644 --- a/simple/tests/stats/jsonld_stream_db_test.py +++ b/simple/tests/stats/jsonld_stream_db_test.py @@ -173,6 +173,9 @@ def test_node_fast_vs_rdflib_parity(self): Triple(subject_id="sub1", predicate="typeOf", object_id="StatisticalVariable"), + Triple(subject_id="sub1", + predicate="typeOf", + object_id="Thing"), Triple(subject_id="sub1", predicate="name", object_value="Test Node"), # Multi-valued properties Triple(subject_id="sub1", From bf38fee7faf837fd243a75eeedc2c7e3f87d92a0 Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 18:04:30 -0400 Subject: [PATCH 23/24] Dedup the type --- simple/stats/jsonld_stream_db.py | 11 +++++++---- simple/tests/stats/jsonld_stream_db_test.py | 5 +++++ 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/simple/stats/jsonld_stream_db.py b/simple/stats/jsonld_stream_db.py index e92a9bd50..e2aa015e9 100644 --- a/simple/stats/jsonld_stream_db.py +++ b/simple/stats/jsonld_stream_db.py @@ -107,8 +107,10 @@ def _write_observation_shard(args): prop_key = f"dcid:{k}" if not k.startswith( "dcid:") and not k.startswith("http") else k obs_obj[prop_key] = v - except Exception: - pass + except json.JSONDecodeError as e: + logging.warning( + "Failed to decode properties JSON for observation %s/%s: %s", + entity, variable, e) graph_list.append(obs_obj) @@ -172,8 +174,9 @@ def _write_node_shard_fast(args): if pred_key in subjects[sub_id]: existing = subjects[sub_id][pred_key] if isinstance(existing, list): - existing.append(val) - else: + if val not in existing: + existing.append(val) + elif existing != val: subjects[sub_id][pred_key] = [existing, val] else: subjects[sub_id][pred_key] = val diff --git a/simple/tests/stats/jsonld_stream_db_test.py b/simple/tests/stats/jsonld_stream_db_test.py index 48f9fdd5b..2657ae7e4 100644 --- a/simple/tests/stats/jsonld_stream_db_test.py +++ b/simple/tests/stats/jsonld_stream_db_test.py @@ -184,6 +184,11 @@ def test_node_fast_vs_rdflib_parity(self): Triple(subject_id="sub1", predicate="alternateName", object_value="Alias B"), + # Duplicate values for testing deduplication + Triple(subject_id="sub1", + predicate="alternateName", + object_value="Alias A"), + Triple(subject_id="sub1", predicate="intValue", object_value=99), # References vs Values Triple(subject_id="sub1", predicate="memberOf", object_id="groupA"), # Number types From 314507d4021bfd52dc4ce0ebb97e1b4edcb5fb0f Mon Sep 17 00:00:00 2001 From: Gabriel Mechali Date: Thu, 4 Jun 2026 18:37:04 -0400 Subject: [PATCH 24/24] Lint --- simple/stats/jsonld_stream_db.py | 3 ++- simple/tests/stats/jsonld_stream_db_test.py | 4 +--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/simple/stats/jsonld_stream_db.py b/simple/stats/jsonld_stream_db.py index e2aa015e9..3c9e22368 100644 --- a/simple/stats/jsonld_stream_db.py +++ b/simple/stats/jsonld_stream_db.py @@ -160,7 +160,8 @@ def _write_node_shard_fast(args): val = _parse_numeric(row.object_value) if pred_key == "@type": - val_str = val["@id"] if isinstance(val, dict) and "@id" in val else str(val) + val_str = val["@id"] if isinstance(val, + dict) and "@id" in val else str(val) if "@type" in subjects[sub_id]: existing = subjects[sub_id]["@type"] if isinstance(existing, list): diff --git a/simple/tests/stats/jsonld_stream_db_test.py b/simple/tests/stats/jsonld_stream_db_test.py index 2657ae7e4..03f52a866 100644 --- a/simple/tests/stats/jsonld_stream_db_test.py +++ b/simple/tests/stats/jsonld_stream_db_test.py @@ -173,9 +173,7 @@ def test_node_fast_vs_rdflib_parity(self): Triple(subject_id="sub1", predicate="typeOf", object_id="StatisticalVariable"), - Triple(subject_id="sub1", - predicate="typeOf", - object_id="Thing"), + Triple(subject_id="sub1", predicate="typeOf", object_id="Thing"), Triple(subject_id="sub1", predicate="name", object_value="Test Node"), # Multi-valued properties Triple(subject_id="sub1",