Skip to content
Merged
Show file tree
Hide file tree
Changes from 46 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
3c73983
First try at sqlite-less
gmechali Jun 3, 2026
d9168a5
Multi threaded
gmechali Jun 3, 2026
2a7ffa6
More speedu p
gmechali Jun 4, 2026
132cf2c
Logging buffering
gmechali Jun 4, 2026
87cbcb6
Threadit
gmechali Jun 4, 2026
4c357cd
more parallelization
gmechali Jun 4, 2026
2a7f6ee
Export faster
gmechali Jun 4, 2026
e5de7d1
Adds a global counter
gmechali Jun 4, 2026
aadefdc
Adds gardbage collection to help performance, and prevent ooms
gmechali Jun 4, 2026
fdad734
Bypass rdflib for speed.
gmechali Jun 4, 2026
dbd4c03
Remove gccollect
gmechali Jun 4, 2026
c4d4659
Use native GCS
gmechali Jun 4, 2026
0d99f46
More performance fix
gmechali Jun 4, 2026
766d000
Minor cleanup
gmechali Jun 4, 2026
68ef5d7
Lint
gmechali Jun 4, 2026
83bb3c4
Major cleanup on the code
gmechali Jun 4, 2026
3f99685
Minor cleanup
gmechali Jun 4, 2026
5c7c433
Lint
gmechali Jun 4, 2026
2283706
More cleanups and comments
gmechali Jun 4, 2026
0bce6cf
Per-import split
gmechali Jun 4, 2026
3e6a494
Cloud build fix (#520)
vish-cs Jun 5, 2026
3589ebf
Per import support, adds validation and more
gmechali Jun 5, 2026
3d827c7
Fix the import paths
gmechali Jun 5, 2026
7775828
Path fix
gmechali Jun 5, 2026
9f22040
fixing importname
gmechali Jun 5, 2026
e20a993
clean up import name, clean up lots of other things.
gmechali Jun 5, 2026
abfaeb8
Clean up the default nodes and sources
gmechali Jun 5, 2026
85b0931
not sure anymore
gmechali Jun 5, 2026
1996d33
Fix the provenance URL
gmechali Jun 5, 2026
32fd6e9
Remnove previosly deleted file.
gmechali Jun 8, 2026
aae86d2
Fix OOM on export.
gmechali Jun 8, 2026
e72d031
Fix import list param in trigger + the regex parsing of patterns in c…
gmechali Jun 8, 2026
41d88ef
Raise exceptions for invalid file + cleanup
gmechali Jun 8, 2026
48daabb
Moves from file-driven processing to config.json driven processing
gmechali Jun 8, 2026
09cb358
Stop the parallel processing for resource saving
gmechali Jun 8, 2026
b1dba8f
Productionization improvemnts for RAM usage.
gmechali Jun 8, 2026
94a72e4
Validation code improvements
gmechali Jun 8, 2026
f2eb02f
Allow custom namespacing
gmechali Jun 8, 2026
fd80f89
Custom namesapce fix
gmechali Jun 8, 2026
a9d5fda
More cleanup to reuse the namesapce helper
gmechali Jun 8, 2026
a1b40c0
Fix the expected tests to stop using the generated provenance and use…
gmechali Jun 8, 2026
4f2d756
Supporting a custom namespace for DCID format
gmechali Jun 8, 2026
a045ff8
Regex fix
gmechali Jun 8, 2026
7c24a99
Modify provenance and source schema.
gmechali Jun 8, 2026
6012652
Some minor refactoring
gmechali Jun 8, 2026
b7e58bb
Lint
gmechali Jun 8, 2026
5a55d9c
lint
gmechali Jun 9, 2026
e43f572
tweaks
gmechali Jun 9, 2026
6c19a70
One more lint
gmechali Jun 9, 2026
a302123
Merge branch 'master' into perImp
gmechali Jun 9, 2026
9ed348d
Review gemini comments
gmechali Jun 9, 2026
56547af
Test fix for nl artifacts now gone
gmechali Jun 9, 2026
c9dbd89
Lint
gmechali Jun 9, 2026
1943d3a
Remove old tests that we rendered backwards incompatible. Future depr…
gmechali Jun 9, 2026
b720072
lint
gmechali Jun 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions simple/stats/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import re

import fs.path as fspath
from stats import constants
from stats.data import AggregationConfig
from stats.data import EntityType
Expand All @@ -23,6 +24,7 @@
from stats.data import Provenance
from stats.data import Source
from stats.data import StatVar
from stats.data import strip_namespace
from util.file_match import match
from util.filesystem import File

Expand Down Expand Up @@ -71,8 +73,14 @@ class Config:

def __init__(self, data: dict) -> None:
self.data = data
self._input_files_config: dict[str, dict] = self.data.get(
_INPUT_FILES_FIELD, {})
self._input_files_config: dict[str, dict] = {}
input_files = self.data.get(_INPUT_FILES_FIELD, [])

if isinstance(input_files, list):
self._parse_input_files_config(input_files)
elif isinstance(input_files, dict):
self._parse_legacy_input_files_config(input_files)

# If input file paths are specified with wildcards - e.g. "gs://bucket/foo*.csv",
# this dict maintains a mapping from actual file path to the wildcard key
# for fast lookup.
Expand All @@ -85,6 +93,34 @@ def __init__(self, data: dict) -> None:
self.provenance_sources: dict[str, Source] = {}
self._parse_provenances_and_sources()

def _parse_input_files_config(self, input_files: list) -> None:
"""Parses the modern list-of-objects format for inputFiles."""
for entry in input_files:
if not isinstance(entry, dict):
raise ValueError(
f"Invalid entry in '{_INPUT_FILES_FIELD}': must be a JSON object. Got: {entry}"
)
key = entry.get("pattern") or entry.get("filename")
if not key:
raise ValueError(
f"Invalid entry in '{_INPUT_FILES_FIELD}': must specify 'pattern' or 'filename'. Got: {entry}"
)
self._input_files_config[key] = entry

def _parse_legacy_input_files_config(self, input_files: dict) -> None:
"""Parses the legacy dictionary format for inputFiles.

TODO: Deprecate and completely remove this legacy dictionary format
once all test config.json files and legacy custom installations have been
migrated to the modern list-of-objects format.
"""
for key, entry in input_files.items():
if not isinstance(entry, dict):
raise ValueError(
f"Invalid entry in '{_INPUT_FILES_FIELD}': must be a JSON object. Got: {entry}"
)
self._input_files_config[key] = entry

def data_download_urls(self) -> list[str]:
cfg = self.data.get(_DATA_DOWNLOAD_URL_FIELD)
if not cfg:
Expand Down Expand Up @@ -272,6 +308,15 @@ def default_custom_root_svg_name(self) -> str:
return self.data.get(_DEFAULT_CUSTOM_ROOT_SVG_NAME_FIELD,
sc.DEFAULT_CUSTOM_ROOT_SVG_NAME)

def import_name(self, input_file: File) -> str:
"""Returns the normalized import name associated with a given input file."""
prov_id = self._per_file_config(input_file).get("provenance")
if prov_id:
return strip_namespace(prov_id).lower()
raise ValueError(
f"Could not determine import name: missing 'provenance' configuration for file '{input_file.path}'."
)

def _per_file_config(self, input_file: File) -> dict:
""" Looks up the config for a given file.

Expand Down
8 changes: 4 additions & 4 deletions simple/stats/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ class Db:
def maybe_clear_before_import(self):
pass

def insert_triples(self, triples: list[Triple]):
def insert_triples(self, triples: list[Triple], input_file: File = None):
pass

def insert_observations(self, observations_df: pd.DataFrame,
Expand Down Expand Up @@ -280,7 +280,7 @@ def __init__(self, db_params: dict) -> None:
# dcid to node dict
self.nodes: dict[str, McfNode] = {}

def insert_triples(self, triples: list[Triple]):
def insert_triples(self, triples: list[Triple], input_file: File = None):
for triple in triples:
self._add_triple(triple)

Expand Down Expand Up @@ -348,7 +348,7 @@ def __init__(self, config: dict) -> None:
def maybe_clear_before_import(self):
self.engine.clear_tables_and_indexes()

def insert_triples(self, triples: list[Triple]):
def insert_triples(self, triples: list[Triple], input_file: File = None):
logging.info("Writing %s triples to [%s]", len(triples), self.engine)
if triples:
self.engine.executemany(_INSERT_TRIPLES_STATEMENT,
Expand Down Expand Up @@ -420,7 +420,7 @@ def maybe_clear_before_import(self):
# Not applicable for Data Commons Platform.
pass

def insert_triples(self, triples: list[Triple]):
def insert_triples(self, triples: list[Triple], input_file: File = None):
"""
Convert triples to a jsonld graph and writes the graph to the Data Commons Platform instance.
"""
Expand Down
2 changes: 1 addition & 1 deletion simple/stats/entities_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,4 @@ def _write_row_entity_triples(self) -> None:
prop_object_ids=prop_object_ids)
triples.extend(row_entity.triples())

self.db.insert_triples(triples)
self.db.insert_triples(triples, self.input_file)
2 changes: 1 addition & 1 deletion simple/stats/events_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def _write_event_triples(self) -> None:
properties=properties)
triples.extend(event.triples())

self.db.insert_triples(triples)
self.db.insert_triples(triples, self.input_file)

def _resolve_entities(self) -> None:
df = self.df
Expand Down
148 changes: 99 additions & 49 deletions simple/stats/jsonld_stream_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
"""A DB implementation that streams JSON-LD shards directly to GCS/Disk."""

from collections import defaultdict
import concurrent.futures
from datetime import datetime
from datetime import timezone
Expand All @@ -39,6 +40,7 @@
from stats.jsonld_exporter import DCID_URL
from stats.jsonld_exporter import expand_id
from stats.jsonld_exporter import write_shard
from stats.util import is_uri_or_namespace
from util.filesystem import create_store
from util.filesystem import Dir
from util.filesystem import File
Expand All @@ -52,7 +54,7 @@
def _uri_ref(val):
if not val:
return None
if val.startswith(("http://", "https://", "dcid:")):
if is_uri_or_namespace(val):
return {"@id": val}
return {"@id": f"dcid:{val.lstrip('/')}"}

Expand Down Expand Up @@ -143,13 +145,12 @@ def _write_node_shard_fast(args):
if sub_id not in subjects:
subjects[sub_id] = {
"@id":
f"dcid:{sub_id.lstrip('/')}" if not sub_id.startswith("http") and
not sub_id.startswith("dcid:") else sub_id
sub_id
if is_uri_or_namespace(sub_id) else f"dcid:{sub_id.lstrip('/')}"
}

pred = row.predicate
pred_key = f"dcid:{pred}" if not pred.startswith(
"dcid:") and not pred.startswith("http") else pred
pred_key = pred if is_uri_or_namespace(pred) else f"dcid:{pred}"

if pred == "typeOf":
pred_key = "@type"
Expand Down Expand Up @@ -233,6 +234,7 @@ def __init__(self, output_dir, import_names, nodes) -> None:
self.output_dir = output_dir
self.import_names = import_names
self.nodes = nodes
self.config = nodes.config

# Generate unique folder name based on import name and timestamp
import_name = None
Expand All @@ -255,25 +257,38 @@ def __init__(self, output_dir, import_names, nodes) -> None:
self.node_shard_index = 0
self.ns_map = {"dcid": DCID_URL}
self.lock = threading.Lock()
self._obs_records = []
self._triples = []
self._obs_records = defaultdict(list)
self._triples = defaultdict(list)
self._processed_imports = set()

def insert_observations(self, observations_df: pd.DataFrame,
input_file: File):
if not observations_df.empty:
import_name = self.config.import_name(input_file)
records = observations_df.to_records(index=False).tolist()
with self.lock:
self._obs_records.extend(records)
self._processed_imports.add(import_name)
self._obs_records[import_name].extend(records)

def insert_triples(self, triples: list[Triple]):
def insert_triples(self, triples: list[Triple], input_file: File = None):
if triples:
with self.lock:
self._triples.extend(triples)
if input_file:
import_name = self.config.import_name(input_file)
self._processed_imports.add(import_name)
self._triples[import_name].extend(triples)
else:
self._triples["_global"].extend(triples)

def commit(self):
pass

def commit_and_close(self):
# Add global triples to every processed import's triples
global_triples = self._triples.pop("_global", [])
for import_name in self._processed_imports:
self._triples[import_name].extend(global_triples)
Comment thread
gmechali marked this conversation as resolved.

num_processes = min(multiprocessing.cpu_count(), _EXPORT_PROCESSES_MAX)

with tempfile.TemporaryDirectory() as temp_local_dir:
Expand All @@ -282,51 +297,81 @@ def commit_and_close(self):

if self._obs_records or self._triples:
logging.info(
"Starting JSON-LD local export with %d processes in streaming mode",
"Starting JSON-LD local export with %d processes in sequential-parallel mode",
num_processes)
with multiprocessing.Pool(processes=num_processes) as pool:
if self._obs_records:
logging.info("Streaming observations export...")
obs_gen = self._generate_observation_chunks(temp_local_dir)
for _ in pool.imap(_write_observation_shard, obs_gen):
pass

if self._triples:
logging.info("Streaming triples export...")
node_gen = self._generate_node_chunks(temp_local_dir)
for _ in pool.imap(_write_node_shard, node_gen):
pass

# Process each import sequentially to minimize peak memory usage,
# but write shards in parallel within each import to maximize speed.
for import_name in self._processed_imports:
import_temp_dir = os.path.join(temp_local_dir, import_name)
os.makedirs(import_temp_dir, exist_ok=True)

if import_name in self._obs_records or import_name in self._triples:
with multiprocessing.Pool(processes=num_processes) as pool:
Comment thread
gmechali marked this conversation as resolved.
if import_name in self._obs_records:
logging.info("Streaming observations export for %s...",
import_name)
obs_gen = self._generate_observation_chunks(
import_name, import_temp_dir)
for _ in pool.imap(_write_observation_shard, obs_gen):
pass
logging.info("Completed observations export for %s.",
import_name)

if import_name in self._triples:
logging.info("Streaming triples export for %s...", import_name)
node_gen = self._generate_node_chunks(import_name,
import_temp_dir)
for _ in pool.imap(_write_node_shard, node_gen):
pass
logging.info("Completed triples export for %s.", import_name)

self._upload_shards(temp_local_dir)

def _generate_observation_chunks(self, temp_local_dir: str):
"""Generates observation chunks of size _CHUNK_SIZE, cleaning memory dynamically."""
def _generate_observation_chunks(self, import_name: str,
import_temp_dir: str):
"""Generates observation chunks of size _CHUNK_SIZE, cleaning memory progressively."""
prov_urls = {}
for prov in self.nodes.provenances.values():
prov_id = strip_namespace(prov.id)
prov_urls[prov_id] = prov.url
prov_urls[prov.id] = prov.url

num_records = len(self._obs_records)
for idx in range(0, num_records, _CHUNK_SIZE):
chunk = self._obs_records[idx:idx + _CHUNK_SIZE]
yield (chunk, self.obs_shard_index, temp_local_dir, self.ns_map,
prov_urls)
self.obs_shard_index += 1
self._obs_records.clear()

def _generate_node_chunks(self, temp_local_dir: str):
"""Generates node chunks of size _CHUNK_SIZE."""
num_triples = len(self._triples)
for idx in range(0, num_triples, _CHUNK_SIZE):
chunk = self._triples[idx:idx + _CHUNK_SIZE]
yield (chunk, self.node_shard_index, temp_local_dir, self.ns_map)
self.node_shard_index += 1
self._triples.clear()
records = self._obs_records.get(import_name, [])
shard_index = 0
while records:
chunk = []
# Pop from the end to avoid O(N) list shifting overhead
for _ in range(min(_CHUNK_SIZE, len(records))):
chunk.append(records.pop())
yield (chunk, shard_index, import_temp_dir, self.ns_map, prov_urls)
shard_index += 1
Comment thread
gmechali marked this conversation as resolved.
if import_name in self._obs_records:
self._obs_records[import_name].clear()

def _generate_node_chunks(self, import_name: str, import_temp_dir: str):
"""Generates node chunks of size _CHUNK_SIZE, cleaning memory progressively."""
triples = self._triples.get(import_name, [])
shard_index = 0
while triples:
chunk = []
# Pop from the end to avoid O(N) list shifting overhead
for _ in range(min(_CHUNK_SIZE, len(triples))):
chunk.append(triples.pop())
yield (chunk, shard_index, import_temp_dir, self.ns_map)
shard_index += 1
Comment thread
gmechali marked this conversation as resolved.
if import_name in self._triples:
self._triples[import_name].clear()

def _upload_shards(self, temp_local_dir: str):
"""Uploads files in temp_local_dir to jsonld_dir, optimizing for GCS via native SDK."""
files_to_upload = sorted(os.listdir(temp_local_dir))
files_to_upload = []
for root, _, filenames in os.walk(temp_local_dir):
for filename in filenames:
abs_path = os.path.join(root, filename)
rel_path = os.path.relpath(abs_path, temp_local_dir)
files_to_upload.append(rel_path)

if not files_to_upload:
return

Expand Down Expand Up @@ -360,9 +405,9 @@ def _upload_shards_gcs(self, temp_local_dir: str, files: list[str],

bucket = client.bucket(bucket_name)

def _upload_single(filename: str):
local_file_path = os.path.join(temp_local_dir, filename)
blob_key = f"{blob_prefix}/{filename}" if blob_prefix else filename
def _upload_single(rel_path: str):
local_file_path = os.path.join(temp_local_dir, rel_path)
blob_key = f"{blob_prefix}/{rel_path}" if blob_prefix else rel_path
blob = bucket.blob(blob_key)
blob.upload_from_filename(local_file_path)

Expand All @@ -372,12 +417,17 @@ def _upload_single(filename: str):

def _upload_shards_local(self, temp_local_dir: str, files: list[str]):
"""Performs concurrent local file copy (for test environments)."""
local_store = create_store(temp_local_dir).as_dir()
target_store = self.jsonld_dir

def _copy_single(filename: str):
content = local_store.open_file(filename).read()
target_store.open_file(filename).write(content)
parent_dirs = set(os.path.dirname(f) for f in files if os.path.dirname(f))
for d in sorted(parent_dirs):
target_store.open_dir(d)

def _copy_single(rel_path: str):
local_file_path = os.path.join(temp_local_dir, rel_path)
with open(local_file_path, "r") as f:
content = f.read()
target_store.open_file(rel_path).write(content)

with concurrent.futures.ThreadPoolExecutor(
max_workers=_UPLOAD_CONCURRENCY) as executor:
Expand Down
Loading