Skip to content
Draft
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
c3c4900
Add spider query trigger bash scripts
Nov 24, 2025
b7d136f
Add utils.py
Nov 24, 2025
dd1dc42
Add convert_to_json.py
Nov 24, 2025
647ff54
Add AffiliationManager.py
Nov 24, 2025
1124c8a
Write logic for publishing in NATS
Nov 24, 2025
fb280d5
Add main py files for history and queues
Nov 24, 2025
a014391
Add Dockerfile and requirements.txt
Nov 24, 2025
75b42c3
Add spider_cms.py and adapt Dockerfile
Nov 24, 2025
87116ee
Remove amq references from query jobs
Carlosbogo Nov 27, 2025
23b14de
Fix dockerfile pathing
Carlosbogo Nov 27, 2025
00cfa54
Add library to dockerfile
Carlosbogo Nov 27, 2025
dfb18c5
Only activate venv on local testing
Carlosbogo Nov 27, 2025
566eb14
Fix queues query logic and integrate with nats
Carlosbogo Nov 27, 2025
d6d0106
Reformat python files
Carlosbogo Nov 27, 2025
b9795f1
Local checkpoint
Carlosbogo Jan 30, 2026
a61759a
Fix span naming for better trace analysis
Carlosbogo Feb 26, 2026
39a4545
Add type hints and fix trace logic
Carlosbogo Feb 26, 2026
f510d92
Update logging
Carlosbogo Feb 26, 2026
3f764d2
Add checkpoint logic with nats kv store
Carlosbogo Feb 26, 2026
3cb80c1
Add checkpoint logic to nats_client.py
Carlosbogo Feb 26, 2026
601f849
Simplify nats logic
Carlosbogo Feb 26, 2026
d89c3e4
Add timeout env var
Carlosbogo Feb 26, 2026
6494a5c
Update history logic to follow new setup
Carlosbogo Feb 26, 2026
ece69fb
Remove unnecessary imports
Carlosbogo Feb 26, 2026
8af4e69
Add metrics for otel collector
Carlosbogo Feb 26, 2026
6930af4
Add MachineAttrGLIDEIN_OVERLOAD_ENABLED0 attribute
Carlosbogo Feb 26, 2026
2837327
Add timeout_mins env var
Carlosbogo Feb 26, 2026
19624a1
Remove collectors.json from Dockerfile
Carlosbogo Feb 26, 2026
186cd0e
Remove unneeded env vars
Carlosbogo Feb 26, 2026
4b7a7e1
Add trace logic to history and simplify
Carlosbogo Feb 26, 2026
270c8e4
Remove unused signal logic from running_jogs.py
Carlosbogo Feb 26, 2026
dc3b2e2
Create affiliation-cache docker image
Carlosbogo Feb 26, 2026
92c4d1c
Create spider-worker docker image
Carlosbogo Feb 26, 2026
2b9de48
Add utils.py
Carlosbogo Feb 26, 2026
d305c35
Add otel_setup.py to worker image
Carlosbogo Feb 26, 2026
ffd5ea1
Add nats consumer logic to worker image
Carlosbogo Feb 26, 2026
394eb9a
Add AMQ logic to worker image
Carlosbogo Feb 26, 2026
7735592
Add affiliation logic to docker image
Carlosbogo Feb 26, 2026
8b96baa
Add job processing logic to worker image
Carlosbogo Feb 26, 2026
feca018
Add opensearch logic to worker image
Carlosbogo Feb 26, 2026
ecdf825
Add env var logic to worker image
Carlosbogo Feb 26, 2026
e6870dd
Tie processing logic with nats consumer in worker image
Carlosbogo Feb 26, 2026
f83af49
Create main.py
Carlosbogo Feb 26, 2026
081cf0b
Update index creation logic to have proper date typing
Carlosbogo Feb 27, 2026
6f843b8
Add trace propagation logic over nas on query cronjobs
Carlosbogo Mar 23, 2026
dedd98d
Improve bool conversion for classad fields
Carlosbogo Mar 23, 2026
280df20
Add propagation of traces through nats on workers
Carlosbogo Mar 23, 2026
020d9bc
Improve the opensearch client handling
Carlosbogo Mar 23, 2026
5e47fc6
Apply the new boolean field logic in convert_to_json.py
Carlosbogo Mar 23, 2026
f7c9ad4
Fix regex for boolean extraction
Carlosbogo Mar 23, 2026
f6b07df
Allow for nats config values to be set through env vars
Carlosbogo Mar 23, 2026
5cfa4d7
Improve telemetry and instrumentation of worker processes
Carlosbogo Mar 23, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions docker/spider-query-cronjob/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
FROM python:3.11-slim

ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1

RUN apt-get update && \
apt-get install -y --no-install-recommends \
ca-certificates \
curl \
&& rm -rf /var/lib/apt/lists/*

# Application user and directories
RUN useradd -ms /bin/bash cmsjobmon && \
mkdir -p /opt/spider && \
chown -R cmsjobmon:cmsjobmon /opt/spider

WORKDIR /opt/spider

COPY ./requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir --upgrade pip && \
pip install --no-cache-dir -r /tmp/requirements.txt

COPY ./src ./src
COPY ./*.sh ./bin/

RUN chown -R cmsjobmon:cmsjobmon /home/cmsjobmon /opt/spider

ENV PYTHONPATH=/opt/spider/src \
SPIDER_WORKDIR=/opt/spider \
AFFILIATION_DIR_LOCATION=/opt/spider/.affiliation_dir.json

USER cmsjobmon

# Default entrypoint simply invokes python. Override CMD/args per cronjob template.
ENTRYPOINT ["python"]
CMD ["--version"]

15 changes: 15 additions & 0 deletions docker/spider-query-cronjob/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# be consistent with: https://gitlab.cern.ch/ai/it-puppet-hostgroup-vocms/-/blob/master/data/fqdns/vocms0240.cern.ch.yaml#L7
# check breaking changes before any update and ask to HTCondor-users <htcondor-users> if you see any problem
htcondor==23.0.28

# exact version is needed, previous versions include breaking changes;
# installs also stomp.py==7.0.0
CMSMonitoring==0.6.12

# last version for Py v3.9
requests~=2.31

# after any OpenSearch upgrade, it may change
opensearch-py~=2.5

click
40 changes: 40 additions & 0 deletions docker/spider-query-cronjob/spider_cms_history.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#!/bin/bash
# Copied from vocms0240
#
export SPIDER_WORKDIR="/opt/spider"
export AFFILIATION_DIR_LOCATION="$SPIDER_WORKDIR/.affiliation_dir.json"
export PYTHONPATH="$SPIDER_WORKDIR/src/:$PYTHONPATH"
export CMS_HTCONDOR_TOPIC="/topic/cms.jobmon.condor"

# PROD
export CMS_HTCONDOR_PRODUCER="condor"
export CMS_HTCONDOR_BROKER="cms-mb.cern.ch"
_ES_INDEX_TEMPLATE="cms"

_LOGDIR=$SPIDER_WORKDIR/log_history/
_LOG_LEVEL="WARNING"
_ALERT_EMAILS="cms-comp-monit-alerts@cern.ch"
_ES_BUNCH_SIZE=100
_QUERY_POOL_SIZE=16
_UPLOAD_POOL_SIZE=8

cd $SPIDER_WORKDIR || exit
source "$SPIDER_WORKDIR/venv/bin/activate"

# ./scripts/cronAffiliation.sh # First run

python scripts/spider_cms.py \
--feed_amq \
--feed_es \
--log_dir $_LOGDIR \
--log_level $_LOG_LEVEL \
--es_bunch_size $_ES_BUNCH_SIZE \
--query_pool_size $_QUERY_POOL_SIZE \
--upload_pool_size $_UPLOAD_POOL_SIZE \
--email_alerts "$_ALERT_EMAILS" \
--collectors_file $SPIDER_WORKDIR/etc/collectors.json \
--es_index_template $_ES_INDEX_TEMPLATE

# crontab entry (to run every 12 min, starting from 5 min past the hour):
# i.e. at 5,17,29,41,53 past the hour
# 5-59/12 * * * * /opt/spider/scripts/spider_cms.sh
38 changes: 38 additions & 0 deletions docker/spider-query-cronjob/spider_cms_queues.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/bash
# Copied from vocms0240
#
export SPIDER_WORKDIR="/opt/spider"
export AFFILIATION_DIR_LOCATION="$SPIDER_WORKDIR/.affiliation_dir.json"
export PYTHONPATH="$SPIDER_WORKDIR/src/:$PYTHONPATH"
export CMS_HTCONDOR_TOPIC="/topic/cms.jobmon.condor"

# PROD
export CMS_HTCONDOR_PRODUCER="condor"
export CMS_HTCONDOR_BROKER="cms-mb.cern.ch"
_LOGDIR=$SPIDER_WORKDIR/log/
_LOG_LEVEL="WARNING"

_QUERY_QUEUE_BATCH_SIZE=100
_QUERY_POOL_SIZE=16
_UPLOAD_POOL_SIZE=8

cd $SPIDER_WORKDIR || exit
source "$SPIDER_WORKDIR/venv/bin/activate"

# ./scripts/cronAffiliation.sh # First run

python scripts/spider_cms.py \
--feed_amq \
--log_dir $_LOGDIR \
--log_level $_LOG_LEVEL \
--skip_history \
--process_queue \
--query_queue_batch_size $_QUERY_QUEUE_BATCH_SIZE \
--query_pool_size $_QUERY_POOL_SIZE \
--upload_pool_size $_UPLOAD_POOL_SIZE \
--collectors_file $SPIDER_WORKDIR/etc/collectors.json

#python spider_cms.py --log_dir $LOGDIR --log_level WARNING --feed_amq --email_alerts 'cms-comp-monit-alerts@cern.ch' --skip_history --process_queue --query_queue_batch_size 100 --query_pool_size 16 --upload_pool_size 8 --collectors_file $SPIDER_WORKDIR/etc/collectors.json

# crontab entry (to run every 12 min):
# */12 * * * * /opt/spider/scripts/spider_cms.sh
161 changes: 161 additions & 0 deletions docker/spider-query-cronjob/src/AffiliationManager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Christian Ariza <christian.ariza AT gmail [DOT] com>
# pylint: disable=line-too-long
import errno
import json
import logging
import os
from datetime import datetime, timedelta
from logging.handlers import RotatingFileHandler
from pathlib import Path

import requests

_DEFAULT_WORKDIR = os.getenv("SPIDER_WORKDIR", "/opt/spider")
AFFILIATION_LOG_DIR = os.path.join(_DEFAULT_WORKDIR, "log_aff")


def setup_logging():
"""
Affiliation cache logger
"""
_logger = logging.getLogger("affiliation_logger")
_logger.setLevel(logging.INFO)
try:
if not os.path.exists(AFFILIATION_LOG_DIR):
os.makedirs(AFFILIATION_LOG_DIR)
except Exception as e:
_logger.warning("AFFILIATION_LOG_DIR does not exist: " + AFFILIATION_LOG_DIR + str(e))
log_file = os.path.join(AFFILIATION_LOG_DIR, "affiliation.log")
log_handler = RotatingFileHandler(log_file, maxBytes=100000, backupCount=5)
log_handler.setFormatter(logging.Formatter("%(asctime)s : %(name)s:%(levelname)s - %(message)s"))
_logger.addHandler(log_handler)


setup_logging()
aff_logger = logging.getLogger("affiliation_logger")


class AffiliationManager:
__DEFAULT_DIR_PATH = Path(os.path.join(_DEFAULT_WORKDIR, ".affiliation_dir.json"))
__DEFAULT_URL = "https://cms-cric.cern.ch/api/accounts/user/query/?json"
__DEFAULT_CA_CERT = "/etc/pki/tls/certs/CERN-bundle.pem"
__DEFAULT_ROBOT_CERT = "/home/cmsjobmon/.globus/usercert.pem"
__DEFAULT_ROBOT_KEY = "/home/cmsjobmon/.globus/userkey.pem"

def __init__(
self,
dir_file=__DEFAULT_DIR_PATH,
recreate=False,
recreate_older_days=None,
service_url=__DEFAULT_URL,
robot_cert=__DEFAULT_ROBOT_CERT,
robot_key=__DEFAULT_ROBOT_KEY,
ca_cert=__DEFAULT_CA_CERT,
):
"""
params:
recreate: boolean
recreate_older_days: int, recreate the dir if is older
than that number of days.
"""
self.path = Path(dir_file)
self.url = service_url
self.path = Path(dir_file)
self.url = service_url
self.robot_cert = robot_cert
self.robot_key = robot_key
self.ca_cert = ca_cert
if not recreate and recreate_older_days:
if self.path.is_file():
_min_date = datetime.now() - timedelta(days=recreate_older_days)
_dir_time = datetime.fromtimestamp(self.path.stat().st_mtime)
recreate = _dir_time < _min_date
else:
recreate = True

try:
self.__dir = self.loadOrCreateDirectory(recreate)
self.__dn_dir = {
person["dn"]: person for person in list(self.__dir.values())
}
except (
IOError,
requests.RequestException,
requests.HTTPError,
json.JSONDecodeError,
) as cause:
aff_logger.error("Affiliation instance initialization error: " + str(cause))
raise AffiliationManagerException from cause

def loadOrCreateDirectory(self, recreate=False):
"""
Create or load from a json file an inverted
index of instutions by person login. e.g.:

{
'valya':{u'country': u'US',
u'institute': u'Cornell University'},
'belforte': {u'country': u'IT',
u'institute': u'Universita e INFN Trieste'}
...
}
raises IOError if the file doesn't exist (of it cannot be read)
RequestException if something happen with the request
HTTPError if the response was something different
to a success
"""
aff_logger.debug("Affiliation load or create args. recreate:" + str(recreate))
_tmp_dir = None
if recreate:
# response = requests.get(self.url) #no auth
cert = (self.robot_cert, self.robot_key)
response = requests.get(self.url, cert=cert, verify=self.ca_cert)
response.raise_for_status()

_json = json.loads(response.text)
_tmp_dir = {}
for person in list(_json.values()):
login = None
for profile in person["profiles"]:
if "login" in profile:
login = profile["login"]
break
if login and "institute" in person:
_tmp_dir[login] = {
"institute": person["institute"],
"country": person["institute_country"],
"dn": person["dn"],
}
aff_logger.debug("Temp affiliations before written: " + str(_tmp_dir))
# Only override the file if the dict is not empty.
if _tmp_dir:
with open(self.path, "w") as _dir_file:
json.dump(_tmp_dir, _dir_file)
aff_logger.info("Successfully recreated: " + str(self.path))
elif self.path.is_file():
with open(self.path, "r") as dir_file:
_tmp_dir = json.load(dir_file)
else:
raise IOError(errno.ENOENT, os.strerror(errno.ENOENT), self.path)
return _tmp_dir

def getAffiliation(self, login=None, dn=None):
"""
Returns a python dictionary with the institute and country
for the given login or dn.
Returns None if not found.
"""
if login:
return self.__dir.get(login)
if dn:
return self.__dn_dir.get(dn)
return None


class AffiliationManagerException(Exception):
"""
Exception wrapper for problems that prevents us to obtain the affiliation info.
"""
pass
Loading