Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added dimsim/compute/__init__.py
Empty file.
182 changes: 182 additions & 0 deletions dimsim/compute/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
import openmm
from openff.toolkit import Topology
from parsl.app.app import python_app

from dimsim.configs.compute.density import DensityConfig


@python_app
def prepare_packed_topology(
compute_config: DensityConfig,
job_dir: str,
) -> dict[str, DensityConfig | Topology]:
import logging
import pathlib
import time

from openff.packmol import pack_box
from openff.toolkit import Molecule, Quantity

# making the job dir should maybe happen inside of SimulationWorkflow.submit() instead of here?
pathlib.Path(job_dir).mkdir(exist_ok=True)

logging.basicConfig(
filename=f"{job_dir}/simulation.log",
level=logging.INFO,
)
logging.info("Starting packing app")

data_entry = compute_config["target"]
n_molecules = compute_config["n_molecules"]

time.sleep(n_molecules * 0.001)
filename = f"{job_dir}/packed_topology.pdb"

molecules = [Molecule.from_smiles(smiles) for smiles in data_entry["smiles"]]

if pathlib.Path(filename).exists():
logging.info(f"File {filename} already exists, skipping packing.")
return {
"compute_config": compute_config,
"packed_topology": Topology.from_pdb(filename, unique_molecules=molecules),
}

n_copies = [int(n_molecules * x) for x in data_entry["x"]]

result = pack_box(
molecules,
n_copies,
target_density=Quantity(data_entry["value"] * 0.7, "g/mL"),
working_directory=job_dir,
)

result.to_file(f"{job_dir}/packed_topology.pdb", file_format="pdb")

logging.info(f"packed {result.n_molecules} molecules")

return {
"compute_config": compute_config,
"packed_topology": result,
}


@python_app
def prepare_openmm_system(
packing_future: dict[str, DensityConfig | Topology],
job_dir: str,
) -> dict[str, DensityConfig | openmm.System]:
import logging
import pathlib

import openmm
from openff.toolkit import ForceField

logging.basicConfig(
filename=f"{job_dir}/simulation.log",
level=logging.INFO,
)
logging.info("Starting OpenMM system creation app")

filename = f"{job_dir}/openmm_system.xml"

if pathlib.Path(filename).exists():
logging.warning(f"File {filename} already exists, skipping packing.")
return {
"compute_config": packing_future["compute_config"],
"openmm_system": openmm.XmlSerializer.deserialize(open(filename).read()),
}

compute_config = packing_future["compute_config"]
packed_topology: Topology = packing_future["packed_topology"]

force_field = ForceField(compute_config["force_field"])

openmm_system = force_field.create_openmm_system(packed_topology)

with open(filename, "w") as f:
f.write(openmm.XmlSerializer.serialize(openmm_system))

logging.info("made openmm system!")
return {
"compute_config": compute_config,
"openmm_system": openmm_system,
}


@python_app
def minimize_energy(
system_future: dict[str, DensityConfig | openmm.System], job_dir: str
) -> dict[str, DensityConfig | float]:
import logging

import openmm
import openmm.app
import openmm.unit
from openff.toolkit import Molecule, Topology

logging.basicConfig(
filename=f"{job_dir}/simulation.log",
level=logging.INFO,
)
logging.info("Starting energy minimization app")
system = system_future["openmm_system"]

data_entry = system_future["compute_config"]["target"]

# this should be in Kelvin
temperature = system_future["compute_config"]["target"]["temperature"]

packed_topology_file = f"{job_dir}/packed_topology.pdb"
minimized_topology_file = f"{job_dir}/minimized_topology.pdb"

molecules = [Molecule.from_smiles(smiles) for smiles in data_entry["smiles"]]

topology = Topology.from_pdb(
packed_topology_file,
unique_molecules=molecules,
)

simulation = openmm.app.Simulation(
topology=topology.to_openmm(),
system=system,
integrator=openmm.LangevinMiddleIntegrator(
temperature * openmm.unit.kelvin,
1.0 / openmm.unit.picosecond,
1.0 * openmm.unit.femtoseconds,
),
)

simulation.context.setPositions(topology.get_positions().to_openmm())

original_state = simulation.context.getState(energy=True)

simulation.minimizeEnergy()

# simulation.reporters.append(openmm.app.DCDReporter(outputs[0].filepath, 100))

# print("Minimized energy, now running 10,000 steps of dynamics...")
# simulation.step(10_000)

final_state = simulation.context.getState(energy=True, positions=True)

with open(minimized_topology_file, "w") as f:
openmm.app.PDBFile.writeFile(simulation.topology, final_state.getPositions(), f)

original = original_state.getPotentialEnergy().value_in_unit(openmm.unit.kilojoule_per_mole)
final = final_state.getPotentialEnergy().value_in_unit(openmm.unit.kilojoule_per_mole)

logging.info(f"Minimized energy from {original:.2f} to {final:.2f}")

return original, final


@python_app
def run_simulation(config, job_dir, steps=10000):
...
return {"trajectory": f"{job_dir}/trajectory.dcd", "energy": -12345.6}


@python_app
def analyze_trajectory(simulation_result, job_dir):
...
return {"mean_energy": ..., "rmsd": ...}
56 changes: 56 additions & 0 deletions dimsim/compute/configs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import os

from parsl.config import Config
from parsl.executors import HighThroughputExecutor, ThreadPoolExecutor
from parsl.providers import LocalProvider, SlurmProvider
from parsl.utils import get_all_checkpoints


def local_config():
"""For development and testing."""
if False:
# If openff-packmol/temporary_cd was threadsafe, better to use
# ThreadPoolExecutor and local config
return Config(
executors=[ThreadPoolExecutor(label="local")],
checkpoint_mode="task_exit",
checkpoint_files=get_all_checkpoints(),
)

return Config(
executors=[
HighThroughputExecutor(
label="local_process_pool",
# LocalProvider runs on the local machine
provider=LocalProvider(
init_blocks=1,
max_blocks=1,
),
# Optional: Specify exact max workers (processes)
max_workers_per_node=int(os.cpu_count() / 2),
)
],
strategy=None, # Disable dynamic scaling for simpler local execution
)


def slurm_config(partition, max_blocks=100):
"""For production HPC runs."""
return Config(
executors=[
HighThroughputExecutor(
label="slurm_gpu",
provider=SlurmProvider(
partition=partition,
walltime="02:00:00",
nodes_per_block=1,
min_blocks=0,
max_blocks=max_blocks,
scheduler_options="#SBATCH --gres=gpu:1",
worker_init="source activate myenv",
),
)
],
checkpoint_mode="task_exit",
checkpoint_files=get_all_checkpoints(),
)
35 changes: 35 additions & 0 deletions dimsim/compute/jobs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import hashlib
import json
import os

from dimsim.configs.compute.density import DensityConfig


def make_job_id(compute_config: DensityConfig) -> str:
"""
Generate a deterministic job ID from compute parameters.

Maybe this could be simplified if it only has one argument?
"""
params = {
"tag": compute_config["tag"],
"target": compute_config["target"],
"force_field": compute_config["force_field"],
"n_molecules": str(compute_config["n_molecules"]),
}

return hashlib.sha256(json.dumps(params, sort_keys=True).encode()).hexdigest()


def get_job_paths(base_dir, job_id):
return {
"root": f"{base_dir}/{job_id}",
"checkpoint": f"{base_dir}/{job_id}/checkpoint.chk",
"trajectory": f"{base_dir}/{job_id}/trajectory.dcd",
"log": f"{base_dir}/{job_id}/simulation.log",
}


def is_complete(base_dir, job_id):
"""Check if a job already has complete outputs."""
return os.path.exists(get_job_paths(base_dir, job_id)["trajectory"])
77 changes: 77 additions & 0 deletions dimsim/compute/workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import pathlib

import parsl

from dimsim.compute.apps import (
minimize_energy,
prepare_openmm_system,
prepare_packed_topology,
)
from dimsim.compute.jobs import get_job_paths, is_complete, make_job_id
from dimsim.configs.compute.density import DensityConfig


class SimulationWorkflow:
def __init__(self, base_dir, parsl_config):
pathlib.Path(base_dir).mkdir(exist_ok=True)

self.base_dir = base_dir

parsl.load(parsl_config)

def submit(self, compute_config: DensityConfig):
"""Submit a single end-to-end simulation pipeline."""
job_id = make_job_id(compute_config)
job_dir = get_job_paths(self.base_dir, job_id)["root"]

if is_complete(self.base_dir, job_id):
return None # already done, skip

# packed_pdb = File(f"{job_dir}/packed.pdb")
# system_xml = File(f"{job_dir}/system.xml")
# trajectory_dcd = File(f"{job_dir}/trajectory.dcd")

# 1. pack from compute config
pack_future = prepare_packed_topology(compute_config, job_dir)

# 2. set up openmm system
setup_future = prepare_openmm_system(pack_future, job_dir)

# 3 (for now ...) get minimized energy
minimize_future = minimize_energy(setup_future, job_dir)

# 3. run equilibration step
# 4. run "production" step
# sim_future = run_simulation(config_future, job_dir)
# 5. analyze trajectory
# 6. check for convergence, if not converged, run more production and repeat
# analysis_future = analyze_trajectory(sim_future, job_dir)

return {"job_id": job_id, "future": minimize_future}

def submit_batch(self, compute_configs: list[DensityConfig]):
"""Submit many jobs, skipping already-complete ones."""
return [result for spec in compute_configs if (result := self.submit(spec)) is not None]

def run(self, compute_configs: list[DensityConfig]):
"""Submit a batch and block until all complete."""
pending = self.submit_batch(compute_configs)

results = []
for item in pending:
try:
result = item["future"].result()
results.append({"job_id": item["job_id"], "result": result})
except Exception as e:
results.append({"job_id": item["job_id"], "error": str(e)})

return results

def shutdown(self):
parsl.clear()

def __enter__(self):
return self

def __exit__(self, *args):
self.shutdown()
Empty file.
17 changes: 17 additions & 0 deletions dimsim/configs/compute/density.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import typing

from dimsim.configs.targets.thermo import DataEntry


class DensityConfig(typing.TypedDict):
tag: typing.Literal["density"] = "density"

target: DataEntry

force_field: str

n_molecules: int

"""
Maybe add an RNG seed?
"""
Loading