Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
c3c4900
Add spider query trigger bash scripts
Nov 24, 2025
b7d136f
Add utils.py
Nov 24, 2025
dd1dc42
Add convert_to_json.py
Nov 24, 2025
647ff54
Add AffiliationManager.py
Nov 24, 2025
1124c8a
Write logic for publishing in NATS
Nov 24, 2025
fb280d5
Add main py files for history and queues
Nov 24, 2025
a014391
Add Dockerfile and requirements.txt
Nov 24, 2025
75b42c3
Add spider_cms.py and adapt Dockerfile
Nov 24, 2025
87116ee
Remove amq references from query jobs
Carlosbogo Nov 27, 2025
23b14de
Fix dockerfile pathing
Carlosbogo Nov 27, 2025
00cfa54
Add library to dockerfile
Carlosbogo Nov 27, 2025
dfb18c5
Only activate venv on local testing
Carlosbogo Nov 27, 2025
566eb14
Fix queues query logic and integrate with nats
Carlosbogo Nov 27, 2025
d6d0106
Reformat python files
Carlosbogo Nov 27, 2025
b9795f1
Local checkpoint
Carlosbogo Jan 30, 2026
a61759a
Fix span naming for better trace analysis
Carlosbogo Feb 26, 2026
39a4545
Add type hints and fix trace logic
Carlosbogo Feb 26, 2026
f510d92
Update logging
Carlosbogo Feb 26, 2026
3f764d2
Add checkpoint logic with nats kv store
Carlosbogo Feb 26, 2026
3cb80c1
Add checkpoint logic to nats_client.py
Carlosbogo Feb 26, 2026
601f849
Simplify nats logic
Carlosbogo Feb 26, 2026
d89c3e4
Add timeout env var
Carlosbogo Feb 26, 2026
6494a5c
Update history logic to follow new setup
Carlosbogo Feb 26, 2026
ece69fb
Remove unnecessary imports
Carlosbogo Feb 26, 2026
8af4e69
Add metrics for otel collector
Carlosbogo Feb 26, 2026
6930af4
Add MachineAttrGLIDEIN_OVERLOAD_ENABLED0 attribute
Carlosbogo Feb 26, 2026
2837327
Add timeout_mins env var
Carlosbogo Feb 26, 2026
19624a1
Remove collectors.json from Dockerfile
Carlosbogo Feb 26, 2026
186cd0e
Remove unneeded env vars
Carlosbogo Feb 26, 2026
4b7a7e1
Add trace logic to history and simplify
Carlosbogo Feb 26, 2026
270c8e4
Remove unused signal logic from running_jogs.py
Carlosbogo Feb 26, 2026
dc3b2e2
Create affiliation-cache docker image
Carlosbogo Feb 26, 2026
92c4d1c
Create spider-worker docker image
Carlosbogo Feb 26, 2026
2b9de48
Add utils.py
Carlosbogo Feb 26, 2026
d305c35
Add otel_setup.py to worker image
Carlosbogo Feb 26, 2026
ffd5ea1
Add nats consumer logic to worker image
Carlosbogo Feb 26, 2026
394eb9a
Add AMQ logic to worker image
Carlosbogo Feb 26, 2026
7735592
Add affiliation logic to docker image
Carlosbogo Feb 26, 2026
8b96baa
Add job processing logic to worker image
Carlosbogo Feb 26, 2026
feca018
Add opensearch logic to worker image
Carlosbogo Feb 26, 2026
ecdf825
Add env var logic to worker image
Carlosbogo Feb 26, 2026
e6870dd
Tie processing logic with nats consumer in worker image
Carlosbogo Feb 26, 2026
f83af49
Create main.py
Carlosbogo Feb 26, 2026
081cf0b
Update index creation logic to have proper date typing
Carlosbogo Feb 27, 2026
6f843b8
Add trace propagation logic over nas on query cronjobs
Carlosbogo Mar 23, 2026
dedd98d
Improve bool conversion for classad fields
Carlosbogo Mar 23, 2026
280df20
Add propagation of traces through nats on workers
Carlosbogo Mar 23, 2026
020d9bc
Improve the opensearch client handling
Carlosbogo Mar 23, 2026
5e47fc6
Apply the new boolean field logic in convert_to_json.py
Carlosbogo Mar 23, 2026
f7c9ad4
Fix regex for boolean extraction
Carlosbogo Mar 23, 2026
f6b07df
Allow for nats config values to be set through env vars
Carlosbogo Mar 23, 2026
5cfa4d7
Improve telemetry and instrumentation of worker processes
Carlosbogo Mar 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docker/affiliation-cache/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM python:3.11-slim

ENV PYTHONUNBUFFERED=1 \
PYTHONPATH=/opt/spider

WORKDIR /opt/spider

RUN apt-get update && \
apt-get install -y --no-install-recommends ca-certificates && \
rm -rf /var/lib/apt/lists/* && \
useradd -m -s /bin/bash cmsjobmon && \
pip install --no-cache-dir --upgrade pip

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt && \
chown -R cmsjobmon:cmsjobmon /opt/spider

COPY affiliation_cache.py ./

USER cmsjobmon

CMD ["python", "affiliation_cache.py"]
154 changes: 154 additions & 0 deletions docker/affiliation-cache/affiliation_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Christian Ariza <christian.ariza AT gmail [DOT] com>
# This script fetches affiliation data from CRIC API and publishes it to JetStream KV store
# (an indexed structure of username/login and affiliation institution and country from cric data)
# How to run:
# python affiliation_cache.py
# This will fetch the latest data from CRIC API and update the cache in JetStream KV store.
# This script can be setup as a daily cronjob to keep the cache up to date.
import traceback
import asyncio
import json
import logging
import os

import requests
from nats.aio.client import Client as NATS

AFFILIATION_API_URL = os.getenv("AFFILIATION_API_URL", "https://cms-cric.cern.ch/api/accounts/user/query/?json")
CA_CERT = os.getenv("CA_CERT", "/etc/pki/tls/certs/CERN-bundle.pem")
ROBOT_CERT = os.getenv("ROBOT_CERT", "/etc/secrets/robot/cert/robotcert.pem")
ROBOT_KEY = os.getenv("ROBOT_KEY", "/etc/secrets/robot/key/robotkey.pem")
NATS_SERVER = os.getenv("NATS_SERVER", "nats://nats.cluster.local:4222")
KV_BUCKET = os.getenv("KV_BUCKET", "spider_affiliations")
KV_KEY = os.getenv("KV_KEY", "affiliations")


def fetch_affiliations(
service_url=AFFILIATION_API_URL,
robot_cert=ROBOT_CERT,
robot_key=ROBOT_KEY,
ca_cert=CA_CERT,
):
"""
Fetch affiliation data from CRIC API.
Returns an inverted index of institutions by person login. e.g.:

{
'valya':{u'country': u'US',
u'institute': u'Cornell University'},
'belforte': {u'country': u'IT',
u'institute': u'Universita e INFN Trieste'}
...
}
"""
logging.info("Fetching affiliation data from CRIC API")
cert = (robot_cert, robot_key)
response = requests.get(service_url, cert=cert, verify=ca_cert)
response.raise_for_status()

_json = json.loads(response.text)
affiliations_dict = {}
for person in list(_json.values()):
login = None
for profile in person["profiles"]:
if "login" in profile:
login = profile["login"]
break
if login and "institute" in person:
affiliations_dict[login] = {
"institute": person["institute"],
"country": person["institute_country"],
"dn": person["dn"],
}

logging.debug("Fetched %d affiliations from CRIC API", len(affiliations_dict))
return affiliations_dict


def publish_to_kv(
affiliations_dict,
nats_server=NATS_SERVER,
kv_bucket_name=KV_BUCKET,
):
"""
Publish affiliations to JetStream KV store.
"""
nats_connection = None
try:
# Create event loop
try:
loop = asyncio.get_event_loop()
if loop.is_closed():
raise RuntimeError("Event loop is closed")
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)

# Connect to NATS
nats_connection = NATS()
nats_servers = [s.strip() for s in nats_server.split(",")]
loop.run_until_complete(nats_connection.connect(servers=nats_servers))
jetstream = nats_connection.jetstream()
logging.debug("Connected to NATS JetStream at %s", nats_servers)

# Get KV store
kv = loop.run_until_complete(jetstream.key_value(kv_bucket_name))

# Publish to KV store
value = json.dumps(affiliations_dict).encode("utf-8")
loop.run_until_complete(kv.put(KV_KEY, value))
logging.info(
"Successfully published %d affiliations to KV store %s",
len(affiliations_dict),
kv_bucket_name,
)
except Exception as e:
logging.error("Failed to publish affiliations to KV store: %s", str(e))
raise
finally:
# Close NATS connection
if nats_connection is not None and not nats_connection.is_closed:
try:
loop.run_until_complete(nats_connection.close())
except Exception as e:
logging.debug("Error closing NATS connection: %s", str(e))


def fetch_and_publish(
service_url=AFFILIATION_API_URL,
robot_cert=ROBOT_CERT,
robot_key=ROBOT_KEY,
ca_cert=CA_CERT,
nats_server=NATS_SERVER,
kv_bucket_name=KV_BUCKET,
):
"""
Fetch affiliation data from CRIC API and publish to JetStream KV store.
"""
affiliations_dict = fetch_affiliations(
service_url=service_url, robot_cert=robot_cert, robot_key=robot_key, ca_cert=ca_cert
)

if affiliations_dict:
publish_to_kv(
affiliations_dict, nats_server=nats_server, kv_bucket_name=kv_bucket_name
)
else:
logging.warning("No affiliations to publish")


def main():
"""
Fetch affiliation data from CRIC API and update the cache in JetStream KV store.
"""
try:
fetch_and_publish()
except Exception as e:
traceback.print_exc()
raise


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions docker/affiliation-cache/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
requests~=2.31
nats-py==2.6.0
41 changes: 41 additions & 0 deletions docker/spider-query-cronjob/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
FROM python:3.11-slim

ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1

RUN apt-get update && \
apt-get install -y --no-install-recommends \
ca-certificates \
curl \
libexpat1 \
&& rm -rf /var/lib/apt/lists/*

RUN useradd -ms /bin/bash cmsjobmon && \
mkdir -p /opt/spider/ && \
chown -R cmsjobmon:cmsjobmon /opt/spider

WORKDIR /opt/spider

COPY ./requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r /tmp/requirements.txt

COPY ./src ./src
COPY ./running_jobs.py ./running_jobs.py
COPY ./job_history.py ./job_history.py
RUN chmod +x ./*.py

RUN chown -R cmsjobmon:cmsjobmon /home/cmsjobmon /opt/spider

# Build arguments for docker image (used for monitoring)
ARG DOCKER_TAG=unknown
ARG IMAGE_NAME=unknown

ENV PYTHONPATH=/opt/spider/src:$PYTHONPATH \
SPIDER_WORKDIR=/opt/spider \
AFFILIATION_DIR_LOCATION=/opt/spider/.affiliation_dir.json \
DOCKER_TAG=${DOCKER_TAG} \
IMAGE_NAME=${IMAGE_NAME} \
COLLECTORS_FILE=/opt/spider/etc/collectors.json

USER cmsjobmon
7 changes: 7 additions & 0 deletions docker/spider-query-cronjob/collectors.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"Global":["cmsgwms-collector-global.cern.ch:9620", "cmsgwms-collector-global.fnal.gov:9620","cmssrv276.fnal.gov"],
"Tier0":["cmsgwms-collector-tier0.cern.ch:9620","cmsgwms-collector-tier0.fnal.gov:9620"],
"Volunteer":["vocms0840.cern.ch"],
"ITB":["cmsgwms-collector-itb.cern.ch"],
"UCSD":["htcondor-cm-ucsd.osg.chtc.io"]
}
36 changes: 36 additions & 0 deletions docker/spider-query-cronjob/job_history.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/usr/bin/env python
"""
Script for processing the contents of the CMS pool.
"""

import time
import os

# Ensure this script has a distinct OpenTelemetry service name (for tracing)
# Must be set BEFORE any src.* imports that may load constants.py
os.environ.setdefault("OTEL_SERVICE_NAME", "spider-job-history")

from src.utils import get_schedds_from_file, global_logger
from src.history import query_job_history
from src.otel_setup import trace_span
import src.constants as const
from opentelemetry import trace


@trace_span("job_history_main")
def main():
starttime = time.time()
global_logger.info("Starting spider_cms history process.")

# Get all the schedd ads (these are ClassAds; they can be sent directly
# to worker processes, and `htcondor.Schedd` expects this type).
schedd_ads = get_schedds_from_file(collectors_file=const.COLLECTORS_FILE)

counts = query_job_history(schedd_ads, starttime)
trace.get_current_span().set_attribute("job.count", counts["count"])
trace.get_current_span().set_attribute("job.published_count", counts["published_count"])
return 0


if __name__ == "__main__":
main()
20 changes: 20 additions & 0 deletions docker/spider-query-cronjob/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[project]
name = "spider-query-cronjob"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.9"
dependencies = [
"cmsmonitoring>=0.6.13",
"htcondor==23.0.28",
"ipykernel>=6.31.0",
"opensearch-py>=3.0.0",
"opentelemetry-exporter-otlp>=1.39.0",
"requests>=2.32.5",
"ruff>=0.14.6",
]

[tool.uv.workspace]
members = [
"python3.9",
]
1 change: 1 addition & 0 deletions docker/spider-query-cronjob/query_tests/output.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Failed to query collector cmsgwms-collector-global.fnal.gov:9620 for schedd crab3@vocms0197.cern.ch: Failed communication with collector.
93 changes: 93 additions & 0 deletions docker/spider-query-cronjob/query_tests/test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
Test script to translate condor_history command to Python using htcondor bindings.

Original command:
condor_history -name crab3@vocms0197.cern.ch -constraint '(JobUniverse == 5) && (CMS_Type != "DONOTMONIT") && (EnteredCurrentStatus >= '$(($(date +%s) - 3600))' || CRAB_PostJobLastUpdate >= '$(($(date +%s) - 3600))')' -pool cmsgwms-collector-global.fnal.gov:9620 > output.txt 2>&1 &
"""

import os
import time
import classad
import htcondor

# Configuration from the original command
SCHEDD_NAME = "crab3@vocms0197.cern.ch"
POOL = "cmsgwms-collector-global.fnal.gov:9620"

# Set collector host (equivalent to -pool option in condor_history)
# Must set BEFORE importing or creating Collector objects
os.environ["COLLECTOR_HOST"] = POOL

# Calculate time threshold (1 hour ago)
current_time = int(time.time())
time_threshold = current_time - 3600 # 3600 seconds = 1 hour

# Query collector for the specific schedd
# Since bash works, try using subprocess as a workaround if Python bindings fail
print(f"Querying collector {POOL} for schedd: {SCHEDD_NAME}")

schedd_ad = None
try:
# Try with explicit collector first
collector = htcondor.Collector(POOL)
schedd_query = classad.ExprTree(f'Name == "{SCHEDD_NAME}"')
schedds = collector.query(htcondor.AdTypes.Schedd, schedd_query)
if schedds:
schedd_ad = schedds[0]
print(f"Found schedd via explicit collector connection")
except Exception as e:
print(f"Explicit collector connection failed: {type(e).__name__}: {e}")
print(f"Note: This is a known issue - Python bindings may have network/security restrictions")
print(f"that don't affect the command-line tools.")
print(f"\nSince bash commands work, you may need to:")
print(f" 1. Check firewall rules for Python processes")
print(f" 2. Check if Python bindings need additional security configuration")
print(f" 3. Use subprocess to call condor_status as a workaround")
exit(1)

if not schedd_ad:
print(f"Error: Schedd {SCHEDD_NAME} not found in pool {POOL}")
exit(1)

# Create Schedd object
schedd = htcondor.Schedd(schedd_ad)

# Build the constraint query
# (JobUniverse == 5) && (CMS_Type != "DONOTMONIT") &&
# (EnteredCurrentStatus >= time_threshold || CRAB_PostJobLastUpdate >= time_threshold)
constraint = f"""
(JobUniverse == 5) && (CMS_Type != "DONOTMONIT")
&&
(
EnteredCurrentStatus >= {time_threshold}
|| CRAB_PostJobLastUpdate >= {time_threshold}
)
"""
history_query = classad.ExprTree(constraint)

print(f"Querying schedd: {SCHEDD_NAME}")
print(f"Pool: {POOL}")
print(f"Time threshold: {time_threshold} ({time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time_threshold))})")
print(f"Constraint: {history_query}")
print("-" * 80)

try:
# Time the history query itself
query_start_time = time.time()
history_iter = schedd.history(history_query, [], match=-1)
query_end_time = time.time()
query_duration = query_end_time - query_start_time

print(f"History query initiated in {query_duration:.2f} seconds")


# Output timing information
print(f"\nTotal jobs found: {len(history_iter)}")
print("\nTiming information:")
print(f" History query initiation: {query_duration:.2f} seconds")
if len(history_iter) > 0:
print(f" Average time per job: {query_duration/len(history_iter):.4f} seconds")

except RuntimeError as e:
print(f"Error querying history: {e}")
exit(1)
Loading
Loading