diff --git a/.github/workflows/deployment.yml b/.github/workflows/deployment.yml index b972c68..3f6398d 100644 --- a/.github/workflows/deployment.yml +++ b/.github/workflows/deployment.yml @@ -18,6 +18,7 @@ jobs: deploy-pypi: name: PyPI deployment runs-on: ubuntu-latest + if: github.event_name != 'push' || github.repository == 'GreenDIGIT-project/GreenDIRAC' steps: - name: Checkout code uses: actions/checkout@v3 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d42ccc8 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +src/GreenDIRAC/WorkloadManagementSystem/Client/cim.conf +src/GreenDIRAC/WorkloadManagementSystem/Agent/CIM2CSAgent.py + +.idea/ +__pycache__/ +*.egg-info/ +token diff --git a/README.md b/README.md index 1997907..07d2950 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,34 @@ # GreenDIRAC -DIRAC developments for the GreenDIGIT Project + +GreenDIRAC is a DIRAC extension built for the GreenDIGIT Project. It enriches +DIRAC workload management with sustainability data—power usage effectiveness +(PUE) and carbon intensity (CI)—so scheduling and reporting tools can account +for the environmental cost of running jobs. + +## What GreenDIRAC adds +- Registers itself as a DIRAC extension so the agents and configuration entries + are available alongside standard DIRAC deployments. +- Uses the collected metrics to prefer greener queues when dispatching jobs and + to record energy/emissions for completed workloads. + +## Core agents +- **Green-aware Site Director** – augments queue information with PUE, CI and + then computes a green score that ranks queues from greenest to least green + before scheduling. +- **GreenReportingAgent** – collects finished job records, queries CIM/CI + services for sustainability data, computes energy/emissions figures, and + persists the metrics to DIRAC databases and ElasticSearch when enabled. + +## Local CIM configuration +`CIMClient` reads credentials and API endpoints from: +`src/GreenDIRAC/WorkloadManagementSystem/Client/cim.conf` + +Required sections and keys: +- `[CIM]`: `EMAIL`, `PASSWORD`, `API_BASE`, `METRICS_URL` +- `[KPI]`: `API_BASE` +- `[Defaults]`: `PUE`, `CI`, `ENERGY_WH` +- `[Runtime]`: `TOKEN_MAX_AGE_H`, `CACHE_TTL`, `TOKEN_TIMEOUT_S`, + `PUE_TIMEOUT_S`, `CI_TIMEOUT_S`, `SUBMIT_TIMEOUT_S` + +If `cim.conf` is missing, `CIMClient` fails at startup with +`RuntimeError("CIMClient config file not found: ...")`. diff --git a/setup.cfg b/setup.cfg index 4d04ed0..f1aa702 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = greendirac -version = RELEASE_VERSION +version = 0.1.0 author = GreenDIRAC author_email = atsareg@in2p3.fr description = GreenDIRAC is a DIRAC extension for the GreenDIGIT project @@ -15,7 +15,7 @@ license = GPLv3 package_dir = = src packages = find: -python_requires = >=3.11 +python_requires = >=3.10 include_package_data = True install_requires = DIRAC ~= 8.0.75 diff --git a/setup.py b/setup.py index 6068493..1631265 100644 --- a/setup.py +++ b/setup.py @@ -1,3 +1,3 @@ from setuptools import setup -setup() +setup(install_requires=['dirac']) diff --git a/src/GreenDIRAC/WorkloadManagementSystem/Agent/GreenReportingAgent.py b/src/GreenDIRAC/WorkloadManagementSystem/Agent/GreenReportingAgent.py index eb60e61..56f8ccf 100644 --- a/src/GreenDIRAC/WorkloadManagementSystem/Agent/GreenReportingAgent.py +++ b/src/GreenDIRAC/WorkloadManagementSystem/Agent/GreenReportingAgent.py @@ -1,5 +1,13 @@ -import pprint -import requests +#!/usr/bin/env python3 +""" +GreenReportingAgent — Queries CIMClient for site green metrics, +submits per-job green metrics to CIM, +and stores them in DIRAC JobDB / ElasticSearch. +""" + +from datetime import timezone +import time + from DIRAC import S_OK, S_ERROR, gConfig from DIRAC.Core.Base.AgentModule import AgentModule from DIRAC.WorkloadManagementSystem.Client import JobStatus @@ -9,218 +17,393 @@ from DIRAC.ConfigurationSystem.Client.Helpers import Registry from DIRAC.ConfigurationSystem.Client import PathFinder +from GreenDIRAC.WorkloadManagementSystem.Client.CIMClient import CIMClient -JOB_PARAMETER_KEYS = ["ModelName", - "CPUNormalizationFactor", - "HostName", - "JobID", - "JobType", - "LoadAverage", - "MemoryUsed(kb)", - "NormCPUTime(s)", - "ScaledCPUTime(s)", - "Status", - "TotalCPUTime(s)", - "WallClockTime(s)", - "DiskSpace(MB)", - "CEQueue", - "GridCE", - ] +# -------------------------------------------------------- +# DIRAC job parameters and attributes +# -------------------------------------------------------- +JOB_PARAMETER_KEYS = [ + "ModelName", "CPUNormalizationFactor", "HostName", "JobID", "JobType", + "LoadAverage", "MemoryUsed(kb)", "NormCPUTime(s)", "ScaledCPUTime(s)", + "Status", "TotalCPUTime(s)", "WallClockTime(s)", "DiskSpace(MB)", + "CEQueue", "GridCE", +] JOB_ATTRIBUTE_KEYS = [ - "JobGroup", - "JobName", - "Owner", - "OwnerDN", - "OwnerGroup", - "RescheduleCounter", - "Site", - "SubmissionTime", - "StartExecTime", - "EndExecTime", - "SystemPriority", - "UserPriority", - ] - -TIME_STAMPS = [ - "SubmissionTime", - "StartExecTime", - "EndExecTime", + "JobGroup", "JobName", "Owner", "OwnerDN", "OwnerGroup", + "RescheduleCounter", "Site", + "SubmissionTime", "StartExecTime", "EndExecTime", + "SystemPriority", "UserPriority", ] -# Some tentative values -DEFAULT_CI = 24 -DEFAULT_PUE = 1.5 -DEFAULT_TDP = 150 +SITES_EUROPE = [ + "Cloud.IHPC.fr", + "EGI.ARNES.si", + "EGI.AUVERGRID.fr", + "EGI.BARI.it", + "EGI.CATANIA.it", + "EGI.CERN.ch", + "EGI.CESNET.cz", + "EGI.CIEMAT.es", + "EGI.CIRMMP.it", + "EGI.CNAF.it", + "EGI.CNR.it", + "EGI.CPPM.fr", + "EGI.CREATIS.fr", + "EGI.CYFRONET.pl", + "EGI.DESY.de", + "EGI.DESYZN.de", + "EGI.FRASCATI.it", + "EGI.GOEGRID.de", + "EGI.GRIDKA.de", + "EGI.GRIF.fr", + "EGI.HEPACC.uk", + "EGI.IFAE.es", + "EGI.IFCA.es", + "EGI.IN2P3-CC.fr", + "EGI.INFN-COSENZA.it", + "EGI.INFN-GENOVA.it", + "EGI.INFN-LECCE.it", + "EGI.INFN-NAPOLI.it", + "EGI.INFN-PISA.it", + "EGI.INGRID.pt", + "EGI.IRB.hr", + "EGI.IRES.fr", + "EGI.JINR.ru", + "EGI.KFKI.hu", + "EGI.LAPP.fr", + "EGI.LNL.it", + "EGI.LPC.fr", + "EGI.LSGRUG.nl", + "EGI.METU.tr", + "EGI.NCBJ.pl", + "EGI.NIKHEF.nl", + "EGI.ROMA3.it", + "EGI.RWTH-Aachen.de", + "EGI.SARA.nl", + "EGI.SRCE.hr", + "EGI.SiGNET.si", + "EGI.TASK.pl", + "EGI.TORINO.it", + "EGI.TRIESTE.it", + "EGI.UCL.be", + "EGI.UKI.uk", + "EGI.UKIAC.uk", + "EGI.UKIB.uk", + "EGI.UKID.uk", + "EGI.UKIG.uk", + "EGI.UKIL.uk", + "EGI.UKILH.uk", + "EGI.UKIM.uk", + "EGI.UKIMBH.uk", + "EGI.UKIR.uk", + "EGI.UKIRALPP.uk", + "EGI.UKISHEF.uk", + "EGI.ULAKBIM.tr", + "EGI.ULB.be", + "EGI.UNI-SIEGEN-HEP.de", +] + +TIME_STAMPS = ["SubmissionTime", "StartExecTime", "EndExecTime"] -# Getting tokens at +DEFAULT_TDP = 150 -METRICS_DB_URL = "https://mc-a4.lab.uvalight.net/gd-cim-api/submit" -METRICS_DB_TOKEN = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJhdHNhcmVnQGluMnAzLmZyIiwiaXNzIjoiZ3JlZW5kaWdpdC1sb2dpbi11dmEiLCJpYXQiOjE3NTkzMDA5ODYsIm5iZiI6MTc1OTMwMDk4NiwiZXhwIjoxNzU5Mzg3Mzg2fQ.A4nygJEdhvOQjkLe-ckRDidqVbi6-s4kZXLRUZkwek8" +IDLE_CONSUMPTION_FACTOR = 0.4 +# ========================================================== +# GreenReportingAgent +# ========================================================== class GreenReportingAgent(AgentModule): - """ - Agent for removing jobs in status "Deleted", and not only - """ def __init__(self, *args, **kwargs): - """c'tor""" super().__init__(*args, **kwargs) - # clients self.jobDB = None + self.elasticJobParametersDB = None + self.maxJobsAtOnce = 1000 - self.maxJobsAtOnce = 50 self.section = PathFinder.getAgentSection(self.agentName) - ############################################################################# + # CIM abstraction + self.cimClient = None + + # ----------------------------------------------------- def initialize(self): - """Sets defaults""" self.jobDB = JobDB() + + # ElasticSearch support self.elasticJobParametersDB = None - useESForJobParametersFlag = Operations().getValue("/Services/JobMonitoring/useESForJobParametersFlag", False) - if useESForJobParametersFlag: - try: - result = ObjectLoader().loadObject( - "WorkloadManagementSystem.DB.ElasticJobParametersDB", "ElasticJobParametersDB" - ) - if not result["OK"]: - return result - self.elasticJobParametersDB = result["Value"](parentLogger=self.log) - except RuntimeError as excp: - return S_ERROR(f"Can't connect to ES DB: {excp}") + useES = Operations().getValue( + "/Services/JobMonitoring/useESForJobParametersFlag", False + ) + + if useES: + res = ObjectLoader().loadObject( + "WorkloadManagementSystem.DB.ElasticJobParametersDB", + "ElasticJobParametersDB", + ) + if res["OK"]: + self.elasticJobParametersDB = res["Value"](parentLogger=self.log) + self.log.info("Using ElasticJobParametersDB") + else: + self.log.warn("Falling back to JobDB for job parameters") + + self.maxJobsAtOnce = self.am_getOption( + "MaxJobsAtOnce", self.maxJobsAtOnce + ) + + # Instantiate CIM client + self.cimClient = CIMClient(logger=self.log) + + # Load CPU models + self.cpuDict = {} + res = gConfig.getSections(f"{self.section}/CPUData") + if res["OK"]: + for model in res["Value"]: + self.cpuDict[model] = { + "TDP": gConfig.getValue( + f"{self.section}/CPUData/{model}/TDP", DEFAULT_TDP + ), + "Cores": gConfig.getValue( + f"{self.section}/CPUData/{model}/Cores", 12 + ), + } + + self.log.info(f"Loaded {len(self.cpuDict)} CPU models") + return S_OK() - self.maxJobsAtOnce = self.am_getOption("MaxJobsAtOnce", self.maxJobsAtOnce) + # ===================================================================== + # EXECUTE + # ===================================================================== + def execute(self): + condDict = { + "Status": [JobStatus.DONE, JobStatus.FAILED], + "Site": SITES_EUROPE, + "ApplicationNumStatus": 0, + } + + res = self.jobDB.selectJobs( + condDict, + limit=self.maxJobsAtOnce, + orderAttribute="LastUpdateTime:DESC", + ) + if not res["OK"]: + return res + + jobIDs = [int(j) for j in res["Value"]] + if not jobIDs: + return S_OK() - self.cpuDict = {} - result = gConfig.getSections(f"{self.section}/CPUData") - if result["OK"]: - models = result["Value"] - for model in models: - self.cpuDict[model] = {} - self.cpuDict[model]["TDP"] = gConfig.getValue(f"{self.section}/CPUData/{model}/TDP", DEFAULT_TDP) - self.cpuDict[model]["Cores"] = gConfig.getValue(f"{self.section}/CPUData/{model}/Cores", 12) + # Load job parameters + if self.elasticJobParametersDB: + params = self.elasticJobParametersDB.getJobParameters(jobIDs) + else: + params = self.jobDB.getJobParameters(jobIDs) - print("AT >>> CPU data") - pprint.pprint(self.cpuDict) + attrs = self.jobDB.getJobsAttributes(jobIDs) - return S_OK() + if not params["OK"] or not attrs["OK"]: + return S_ERROR("Failed to load job data") - def execute(self): - """Report job green parameters""" + jobParamsDict = params["Value"] + jobAttrDict = attrs["Value"] - condDict = {"Status": [JobStatus.DONE, JobStatus.FAILED], "ApplicationNumStatus": 0} - result = self.jobDB.selectJobs(condDict, limit=self.maxJobsAtOnce, orderAttribute="LastUpdateTime:DESC" ) - if not result["OK"]: - return result + records = [] - jobList = result["Value"] - if not jobList: - self.log.info("No jobs to report") - return S_OK() + for jobID in jobParamsDict: + rec = {} - print("Job List", jobList) + for k, v in jobParamsDict[jobID].items(): + if k in JOB_PARAMETER_KEYS: + rec[k] = v - result = self.elasticJobParametersDB.getJobParameters(jobList) - if not result["OK"]: - self.log.info("No parameters found") - return S_ERROR("No parameters found") - jobParamsDict = result["Value"] + for k, v in jobAttrDict.get(jobID, {}).items(): + if k in JOB_ATTRIBUTE_KEYS: + rec[k] = str(v) if k in TIME_STAMPS else v - result = self.jobDB.getJobsAttributes(jobList) - if not result["OK"]: - self.log.info("No attributes found") - return S_ERROR("No attributes found") - jobAttrDict = result["Value"] + rec["JobID"] = int(jobID) + records.append(rec) + successJobs = [] - #pprint.pprint(jo) - #pprint.pprint(jobAttrDict) - print("hello world") - # Form records - records = [] - for job in jobParamsDict: - jobDict = {} - for key in jobParamsDict[job]: - if key in JOB_PARAMETER_KEYS: - jobDict[key] = jobParamsDict[job][key] - for key in jobAttrDict[job]: - if key in JOB_ATTRIBUTE_KEYS: - if key in TIME_STAMPS: - jobDict[key] = str(jobAttrDict[job][key]) - else: - jobDict[key] = jobAttrDict[job][key] - - records.append(jobDict) - - # Mark jobs as reported - result = self.jobDB.setJobAttributes(jobList, ["ApplicationNumStatus"], [9999]) - if not result["OK"]: - self.log.error(f"Failed to set ApplicationNumStatus for job {job}", result["Message"]) - - #for record in records: - # pprint.pprint(record) - - # Get the processor TDP - for record in records: - result = self.__getProcessorParameters(record.get("ModelName", "Unknown")) - if not result["OK"]: - self.log.error("Failed to get processor parameters") - continue - tdp, n_cores = result["Value"] - site = record.get("Site", "Unknown") - result = self.__getSiteParameters(record["Site"]) - if not result["OK"]: - self.log.error("Failed to get site parameters") - continue - pue, ci, gocdb_name = result["Value"] - cpu = record.get("TotalCPUTime(s)", 0) - cpu = float(cpu) - record["Energy(kwh)"] = cpu*tdp/n_cores/1000./3600. - record["TDP"] = tdp - record["NCores"] = n_cores - record["VO"] = Registry.getVOForGroup(record["OwnerGroup"]) - record["SiteName"] = gocdb_name - - for record in records: - pprint.pprint(record) - result = self.__sendRecordToMB(record) - - # Send records to the Metrics DB + # ------------------------------------------------------------- + # Process jobs + # ------------------------------------------------------------- - return S_OK() + startJobs = time.time() - def __sendRecordToMB(self, record): + for rec in records: - headers = { "Authorization": f"Bearer {METRICS_DB_TOKEN}", - "Content-Type": "application/json" - } + startRecord = time.time() - response = requests.post(METRICS_DB_URL, - headers = headers, - json = record - ) - print(response.status_code) - print(response.text) + tdp, cores = self.__getProcessorParameters( + rec.get("ModelName", "Unknown") + ) - return S_OK() + site = rec.get("Site", "Unknown") - def __getSiteParameters(self, site): - """ To be implemented """ + # ---- READ from CIM ---- + pue, ci, gocdb = self.cimClient.getSiteGreenMetrics( + site, + startExecTime=rec.get("StartExecTime"), + endExecTime=rec.get("EndExecTime"), + ) - grid = site.split(".")[0] - gocdb_name = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/Name", site) - pue = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/GreenParams/PUE", DEFAULT_PUE) - ci = gConfig.getValue(f"/Resources/Sites/{grid}/{site}/GreenParams/CI", DEFAULT_CI) - return S_OK((pue, ci, gocdb_name)) + self.log.debug(f"Time after getSiteGreenMetrics: {time.time()-startRecord}") - def __getProcessorParameters(self, model): - """ Get TDP and number of cores """ + rec["SiteDIRAC"] = site + rec["SiteGOCDB"] = gocdb + rec["Site"] = gocdb - if model in self.cpuDict: - return S_OK((self.cpuDict[model]["TDP"], self.cpuDict[model]["Cores"])) + cpu_s = float(rec.get("TotalCPUTime(s)", 0)) + wallclock_s = float(rec.get("WallClockTime(s)", 0)) + + # Default assumption: one core per process + cores_used = 1 + + energy_kwh = self.__compute_energy_kwh( + cpu_seconds=cpu_s, + wallclock_seconds=wallclock_s, + tdp=tdp, + total_cores=cores, + cores_used=cores_used, + ) + energy_wh = energy_kwh * 1000.0 + emissions = energy_kwh * pue * ci + + cpunorm = float(rec.get("CPUNormalizationFactor", 0)) + cee = (cpunorm * cores) / float(tdp) if tdp else 0.0 + + # ------------------------------------------------- + # HTC metrics (added – minimal checks only) + # ------------------------------------------------- + wallclock_s = float(rec.get("WallClockTime(s)", 0)) + norm_cpu_s = float(rec.get("NormCPUTime(s)", 0)) + + if wallclock_s > 0: + rec["Efficiency"] = norm_cpu_s / wallclock_s + else: + rec["Efficiency"] = 0.0 + + if energy_wh > 0: + rec["Work"] = norm_cpu_s / energy_wh + else: + rec["Work"] = 0.0 + + rec.update({ + "ExecUnitID": rec["JobID"], + "PUE": pue, + "CI_g": ci, + "Energy_wh": energy_wh, + "CFP_g": emissions, + "Owner": Registry.getVOForGroup(rec.get("OwnerGroup")), + "ExecUnitFinished": 1, + "NCores": cores, + "TDP_w": tdp, + "CEE": cee, + }) + + # ------------------------------------------------- + # SUBMIT to CIM + # ------------------------------------------------- - print(f"AT >>> oooooooooooooo CPU Model {model} is not in the configuration ooooooooooooooo") - return S_OK((200, 12)) \ No newline at end of file + startCIM = time.time() + + try: + self.log.info(f"Submitting full record to CIM: {rec}") + ok = self.cimClient.submitRecord(rec) + if ok: + self.log.info( + f"CIM submission OK for JobID={rec['ExecUnitID']} " + f"Site={gocdb}; time spent {time.time() - startCIM}" + ) + successJobs.append(rec["JobID"]) + else: + self.log.error( + f"CIM submission FAILED for JobID={rec['ExecUnitID']}" + ) + except Exception as e: + self.log.exception( + f"CIM submission EXCEPTION for JobID={rec['ExecUnitID']}: {e}" + ) + + # ------------------------------------------------- + # STORE in ElasticSearch + # ------------------------------------------------- + if self.__storeJobGreenMetrics(rec): + self.log.info( + f"ElasticSearch storage OK for JobID={rec['ExecUnitID']}" + ) + + # Mark processed + self.log.info(f"Sending ApplicationNumStatus updates for {len(successJobs)} jobs") + result = self.jobDB.setJobAttributes( + successJobs, ["ApplicationNumStatus"], [9999] + ) + if not result["OK"]: + self.log.error("Failed to update ApplicationNumStatus attributes") + + self.log.debug(f"Processing {len(records)} jobs in {time.time()-startJobs}") + + return result + + # ===================================================================== + # HELPERS + # ===================================================================== + def __getProcessorParameters(self, model): + if model in self.cpuDict: + cpu = self.cpuDict[model] + return cpu["TDP"], cpu["Cores"] + self.log.warn(f"Unknown CPU model: {model}") + return DEFAULT_TDP, 12 + + def __compute_energy_kwh(self, cpu_seconds, wallclock_seconds, tdp, total_cores, cores_used=1): + """ + Energy model (professor's formula): + + E = ((1-f)*CPUtime + f*WallClockTime) + * (CoresUsed / TotalCores) + * TDP + + Returned value is in kWh. + """ + try: + if wallclock_seconds <= 0 or total_cores <= 0: + return 0.0 + + f = IDLE_CONSUMPTION_FACTOR + f = max(0.0, min(1.0, f)) + + effective_time_s = (1.0 - f) * float(cpu_seconds) + f * float(wallclock_seconds) + core_fraction = float(cores_used) / float(total_cores) + + energy_joule = effective_time_s * core_fraction * float(tdp) + energy_kwh = energy_joule / 3_600_000.0 + + return energy_kwh + + except Exception: + return 0.0 + + def __storeJobGreenMetrics(self, record): + jobID = record.get("ExecUnitID") + if not jobID or not self.elasticJobParametersDB: + return False + + es_params = { + k: (str(v) if k in TIME_STAMPS else v) + for k, v in record.items() + if v is not None + } + + res = self.elasticJobParametersDB.setJobParameters(jobID, es_params) + if not res["OK"]: + self.log.error( + f"ElasticSearch write failed for JobID={jobID}: " + f"{res.get('Message')}" + ) + return False + + return True diff --git a/src/GreenDIRAC/WorkloadManagementSystem/Agent/SiteDirector.py b/src/GreenDIRAC/WorkloadManagementSystem/Agent/SiteDirector.py index 18839ef..b2c688a 100644 --- a/src/GreenDIRAC/WorkloadManagementSystem/Agent/SiteDirector.py +++ b/src/GreenDIRAC/WorkloadManagementSystem/Agent/SiteDirector.py @@ -1,25 +1,355 @@ -from DIRAC.WorkloadManagementSystem.Agent.SiteDirector import SiteDirector as _SiteDirector +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- -class SiteDirector(_SiteDirector): - """ - Custom Site Director that extends the standard DIRAC SiteDirector. - """ +""" +GreenSiteDirector — READ-ONLY green-aware SiteDirector - def initialize(self): +- Does NOT override beginExecution() +- Overrides _getSortedQueues() +- Adds explicit logs showing queue ordering +""" + +from __future__ import annotations + +from DIRAC import S_OK +from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader +from DIRAC.WorkloadManagementSystem.Agent.SiteDirector import ( + SiteDirector as BaseSiteDirector, +) + +from GreenDIRAC.WorkloadManagementSystem.Client.CIMClient import CIMClient + +import time + +CEE_CACHE_TTL = 180 # seconds (10 minutes) + + +# ===================================================================== +# CONSTANTS +# ===================================================================== +DEFAULT_CEE = 1.0 + +ES_TIME_WINDOW = "now-30d" + + +# ===================================================================== +# GREEN SITE DIRECTOR +# ===================================================================== +class SiteDirector(BaseSiteDirector): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.log.always("GreenSiteDirector loaded (GreenScore-based queue ordering)") + self.cimClient = CIMClient(logger=self.log) + + # ---- CEE CACHE ---- + self._ceeCache = {} + self._ceeCacheTimestamp = 0 + self.elasticJobParametersDB = None + + def _getElasticJobParametersDB(self): + if self.elasticJobParametersDB: + return self.elasticJobParametersDB + + res = ObjectLoader().loadObject( + "WorkloadManagementSystem.DB.ElasticJobParametersDB", + "ElasticJobParametersDB", + ) + if not res["OK"]: + self.log.error(f"Cannot load ElasticJobParametersDB: {res['Message']}") + return None + + try: + self.elasticJobParametersDB = res["Value"](parentLogger=self.log) + except Exception as exc: + self.log.error(f"Cannot initialize ElasticJobParametersDB: {exc}") + return None + + return self.elasticJobParametersDB + + # ================================================================= + # QUEUE SORTING OVERRIDE (WITH LOGS) + # ================================================================= + def _getSortedQueues(self): """ - Called once at agent startup. - We call the base initialize(), then add our own custom message. + Sort queues by GreenScore (descending) and log top 5. """ - # ✅ Call the parent (base) initialize() first - result = super().initialize() + # queueDict layout (input from BaseSiteDirector), example with 2 queues: + # { + # "PARIS_CE01/queueA": { + # "Site": "EGI.IN2P3.fr", + # "CEName": "ce01.in2p3.fr", + # "ParametersDict": { + # "MaxCPUTime": 86400, + # "QueueStatus": "Production" + # } + # }, + # "SARA_CE02/queueB": { + # "Site": "EGI.SARA.nl", + # "CEName": "ce02.surfsara.nl", + # "ParametersDict": { + # "MaxCPUTime": 43200, + # "QueueStatus": "Production" + # } + # } + # } + + self.log.always( + f"GreenSiteDirector: sorting {len(self.queueDict)} queues by GreenScore" + ) + + try: + greenMetrics = self._computeGreenMetrics() + except Exception as e: + self.log.error( + f"GreenSiteDirector: failed to compute green metrics, " + f"falling back to random order: {e}" + ) + return super()._getSortedQueues() + + # sortedQueues = sorted( + # self.queueDict.items(), + # key=lambda item: item[1] + # .get("ParametersDict", {}) + # .get("GreenScore", 0.0), + # reverse=True, + # ) + + sortedQueues = sorted( + self.queueDict.items(), + key=lambda item: greenMetrics + .get(item[0], {}) + .get("GreenScore", 0.0), + reverse=True, + ) + + # ---- LOG TOP 5 SORTED QUEUES ---- + self.log.always("GreenSiteDirector: TOP 5 QUEUES AFTER GREEN SORT") + for i, (qName, qDict) in enumerate(sortedQueues[:5], 1): + gm = greenMetrics.get(qName, {}) + self.log.always( + f" {i}. {qName:<45} " + f"GREEN={gm.get('GreenScore', 0.0):.4f} " + f"PUE={gm.get('PUE')} " + f"CI={gm.get('CI')} " + f"CEE={gm.get('CEE')}" + ) + return sortedQueues + + # ================================================================= + # GREEN METRICS COMPUTATION + # ================================================================= + def _computeGreenMetrics(self): + if not self.queueDict: + return {} + + # avgCEE layout (from _getAverageCEEFromES cache): + # { + # "": , + # "...": + # } + avgCEE = self._getAverageCEEFromES() + greenMetrics = {} + + # Resolve PUE/CI once per unique site to avoid repeated calls for queues sharing a site. + siteMetrics = {} + uniqueSites = {qDict.get("Site", "") for qDict in self.queueDict.values()} + for site in uniqueSites: + try: + pue, ci, _gocdb = self.cimClient.getSiteGreenMetrics(site) + except Exception: + pue, ci = 1.7, 500.0 + siteMetrics[site] = (pue, ci) + + for qName, qDict in self.queueDict.items(): + site = qDict.get("Site", "") + ce = qDict.get("CEName", "") + pue, ci = siteMetrics.get(site, (1.7, 500.0)) + + # CEE + cee = avgCEE.get(ce, DEFAULT_CEE) + + # GreenScore + try: + greenScore = float(cee) / (float(ci) * float(pue)) + except Exception: + greenScore = 0.0 + + greenMetrics[qName] = { + "PUE": float(pue), + "CI": float(ci), + "CEE": float(cee), + "GreenScore": greenScore, + } + + # greenMetrics layout (output): + # { + # "": { + # "PUE": , + # "CI": , + # "CEE": , + # "GreenScore": + # }, + # "...": { ... } + # } + return greenMetrics + + # ================================================================= + # ELASTICSEARCH: AVERAGE CEE + # ================================================================= + def _getAverageCEEFromES(self): + now = time.time() + + # ---- CACHE HIT ---- + if ( + self._ceeCache + and (now - self._ceeCacheTimestamp) < CEE_CACHE_TTL + ): + self.log.debug( + "GreenSiteDirector: using cached CEE values " + f"(age={int(now - self._ceeCacheTimestamp)}s)" + ) + return self._ceeCache + + self.log.info("GreenSiteDirector: refreshing CEE cache from Elasticsearch") + + db = self._getElasticJobParametersDB() + if not db: + return self._ceeCache + + indexPattern = f"{db.indexName_base}_*" + self.log.info( + "GreenSiteDirector: querying ES via ElasticJobParametersDB " + f"(host={getattr(db, '_dbHost', 'n/a')}, port={getattr(db, '_dbPort', 'n/a')}, " + f"indexPattern={indexPattern}, window={ES_TIME_WINDOW})" + ) + + query = { + "size": 0, + "query": { + "bool": { + "filter": [ + {"range": {"timestamp": {"gte": ES_TIME_WINDOW}}}, + {"exists": {"field": "CEE"}}, + {"exists": {"field": "GridCE"}}, + ] + } + }, + "aggs": { + "by_gridce": { + "terms": {"field": "GridCE", "size": 10000}, + "aggs": {"avg_cee": {"avg": {"field": "CEE"}}}, + } + }, + } + # Elasticsearch aggregation query layout: + # { + # "size": 0, + # "query": { + # "bool": { + # "filter": [ + # {"range": {"timestamp": {"gte": "now-30d"}}}, + # {"exists": {"field": "CEE"}}, + # {"exists": {"field": "GridCE"}} + # ] + # } + # }, + # "aggs": { + # "by_gridce": { + # "terms": {"field": "GridCE", "size": 10000}, + # "aggs": {"avg_cee": {"avg": {"field": "CEE"}}} + # } + # } + # } + + res = db.query(index=indexPattern, query=query) + if not res["OK"]: + self.log.error(f"GreenSiteDirector: ES aggregation failed: {res['Message']}") + return self._ceeCache + + # Expected Elasticsearch response layout (subset used by SiteDirector): + # { + # "took": , + # "timed_out": , + # "_shards": { + # "total": , + # "successful": , + # "skipped": , + # "failed": + # }, + # "hits": { + # "total": { + # "value": , + # "relation": "eq" | "gte" + # }, + # "max_score": null, + # "hits": [] # empty because query uses "size": 0 + # }, + # "aggregations": { + # "by_gridce": { + # "doc_count_error_upper_bound": , + # "sum_other_doc_count": , + # "buckets": [ + # { + # "key": "", + # "doc_count": , + # "avg_cee": { + # "value": + # } + # } + # ] + # } + # } + # } + totalHits = ( + res["Value"] + .get("hits", {}) + .get("total", {}) + .get("value") + ) + buckets = ( + res["Value"].get("aggregations", {}) + .get("by_gridce", {}) + .get("buckets", []) + ) + self.log.info( + "GreenSiteDirector: ES aggregation response " + f"(hits={totalHits}, buckets={len(buckets)})" + ) + # ES response subset layout used here: + # { + # "aggregations": { + # "by_gridce": { + # "buckets": [ + # { + # "key": "", + # "avg_cee": {"value": } + # } + # ] + # } + # } + # } + + avgCEE = {} + for b in buckets: + if b.get("key") and b.get("avg_cee", {}).get("value") is not None: + try: + avgCEE[b["key"]] = float(b["avg_cee"]["value"]) + except Exception: + pass - # Check if base initialization succeeded - if not result["OK"]: - return result + # ---- UPDATE CACHE ---- + self._ceeCache = avgCEE + self._ceeCacheTimestamp = now + # _ceeCache layout: + # { + # "": , + # "...": + # } - # 🟢 Add your own behavior here - self.log.always("✅ GreenSiteDirector initialized successfully!") - print("✅ GreenSiteDirector says hi — custom init complete!") + self.log.info( + f"GreenSiteDirector: cached CEE for {len(avgCEE)} GridCEs " + f"(indexPattern={indexPattern})" + ) - # Return OK to signal successful init - return result \ No newline at end of file + return avgCEE diff --git a/src/GreenDIRAC/WorkloadManagementSystem/Client/CIMClient.py b/src/GreenDIRAC/WorkloadManagementSystem/Client/CIMClient.py new file mode 100644 index 0000000..15ebf59 --- /dev/null +++ b/src/GreenDIRAC/WorkloadManagementSystem/Client/CIMClient.py @@ -0,0 +1,415 @@ +""" +Minimal CIM/KPI client for GreenDIRAC. +""" + +import configparser +import os +import time +from datetime import datetime, timedelta, timezone + +import requests + +from DIRAC.ConfigurationSystem.Client.Utilities import getDIRACGOCDictionary + + +DEFAULT_PUE = 1.7 +DEFAULT_CI = 500 +DEFAULT_ENERGY_WH = 8500 + +DEFAULT_TOKEN_MAX_AGE_H = 24 +DEFAULT_CACHE_TTL = 300 +DEFAULT_TOKEN_TIMEOUT_S = 20 +DEFAULT_PUE_TIMEOUT_S = 20 +DEFAULT_CI_TIMEOUT_S = 30 +DEFAULT_SUBMIT_TIMEOUT_S = 30 +DEFAULT_CACHE_MAX_ENTRIES = 5000 +DEFAULT_STALE_MAX_AGE_S = 86400 +DEFAULT_HTTP_RETRIES = 2 +DEFAULT_HTTP_BACKOFF_S = 0.5 + + +class CIMClient: + def __init__(self, confFile=None, logger=None): + self.log = logger + self._token = None + self._token_ts = None + self._site_cache = {} # (site, hour_bucket) -> (ts, pue, ci, gocdb) + self._loadConfig(confFile) + + def _loadConfig(self, confFile): + if not confFile: + confFile = os.path.join(os.path.dirname(__file__), "cim.conf") + if not os.path.exists(confFile): + raise RuntimeError(f"CIMClient config file not found: {confFile}") + + cfg = configparser.ConfigParser() + cfg.read(confFile) + + self.cim_email = cfg.get("CIM", "EMAIL") + self.cim_password = cfg.get("CIM", "PASSWORD") + self.cim_api_base = cfg.get("CIM", "API_BASE").rstrip("/") + self.metrics_url = cfg.get("CIM", "METRICS_URL").rstrip("/") + self.kpi_api_base = cfg.get("KPI", "API_BASE").rstrip("/") + + self.default_pue = cfg.getfloat("Defaults", "PUE", fallback=DEFAULT_PUE) + self.default_ci = cfg.getfloat("Defaults", "CI", fallback=DEFAULT_CI) + self.default_energy_wh = cfg.getint( + "Defaults", "ENERGY_WH", fallback=DEFAULT_ENERGY_WH + ) + + self.token_max_age_h = cfg.getfloat( + "Runtime", "TOKEN_MAX_AGE_H", fallback=DEFAULT_TOKEN_MAX_AGE_H + ) + self.cache_ttl = cfg.getint("Runtime", "CACHE_TTL", fallback=DEFAULT_CACHE_TTL) + self.token_timeout_s = cfg.getfloat( + "Runtime", "TOKEN_TIMEOUT_S", fallback=DEFAULT_TOKEN_TIMEOUT_S + ) + self.pue_timeout_s = cfg.getfloat( + "Runtime", "PUE_TIMEOUT_S", fallback=DEFAULT_PUE_TIMEOUT_S + ) + self.ci_timeout_s = cfg.getfloat( + "Runtime", "CI_TIMEOUT_S", fallback=DEFAULT_CI_TIMEOUT_S + ) + self.submit_timeout_s = cfg.getfloat( + "Runtime", "SUBMIT_TIMEOUT_S", fallback=DEFAULT_SUBMIT_TIMEOUT_S + ) + self.cache_max_entries = cfg.getint( + "Runtime", "CACHE_MAX_ENTRIES", fallback=DEFAULT_CACHE_MAX_ENTRIES + ) + self.stale_max_age_s = cfg.getint( + "Runtime", "STALE_MAX_AGE_S", fallback=DEFAULT_STALE_MAX_AGE_S + ) + self.http_retries = cfg.getint( + "Runtime", "HTTP_RETRIES", fallback=DEFAULT_HTTP_RETRIES + ) + self.http_backoff_s = cfg.getfloat( + "Runtime", "HTTP_BACKOFF_S", fallback=DEFAULT_HTTP_BACKOFF_S + ) + + def _log(self, level, message): + if self.log: + getattr(self.log, level)(message) + + def _request(self, method, url, timeout, **kwargs): + retries = max(0, int(self.http_retries)) + backoff = max(0.0, float(self.http_backoff_s)) + last_exc = None + + for attempt in range(retries + 1): + try: + resp = requests.request(method, url, timeout=timeout, **kwargs) + if resp.status_code not in (429,) and resp.status_code < 500: + return resp + if attempt == retries: + return resp + except requests.RequestException as exc: + last_exc = exc + if attempt == retries: + raise + + if backoff > 0: + time.sleep(backoff * (2 ** attempt)) + + if last_exc: + raise last_exc + raise RuntimeError(f"HTTP request failed after retries: {method} {url}") + + @staticmethod + def _to_float(value, default): + try: + return float(value), True + except (TypeError, ValueError): + return float(default), False + + def _as_iso8601_utc(self, value): + if not value: + return None + + if isinstance(value, datetime): + dt = value + else: + text = str(value).strip() + if not text: + return None + try: + dt = datetime.fromisoformat(text.replace("Z", "+00:00")) + except ValueError: + dt = None + for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d %H:%M:%S.%f"): + try: + dt = datetime.strptime(text, fmt) + break + except ValueError: + continue + if dt is None: + return None + + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + else: + dt = dt.astimezone(timezone.utc) + return dt.isoformat(timespec="seconds").replace("+00:00", "Z") + + def _hour_bucket(self, execTime): + exec_iso = self._as_iso8601_utc(execTime) + if not exec_iso: + return "__default__" + dt = datetime.fromisoformat(exec_iso.replace("Z", "+00:00")) + dt = dt.replace(minute=0, second=0, microsecond=0) + return dt.isoformat(timespec="seconds").replace("+00:00", "Z") + + def _resolveGOCDB(self, site): + try: + res = getDIRACGOCDictionary() + if res["OK"]: + return res["Value"].get(site, site) + except Exception: + pass + return site + + def _prune_cache(self, now): + if not self._site_cache: + return + + if self.stale_max_age_s > 0: + old_keys = [ + key + for key, (ts, _pue, _ci, _gocdb) in self._site_cache.items() + if now - ts > self.stale_max_age_s + ] + for key in old_keys: + del self._site_cache[key] + + if self.cache_max_entries > 0 and len(self._site_cache) > self.cache_max_entries: + overflow = len(self._site_cache) - self.cache_max_entries + oldest = sorted(self._site_cache.items(), key=lambda item: item[1][0])[ + :overflow + ] + for key, _value in oldest: + del self._site_cache[key] + + # def _prune_cache(self, now): + # """ + # Site-aware prune policy: + # - Keep at least ONE cached entry per site (fresh or stale). + # - Remove stale/overflow entries only when a site has >1 entries. + # - If every remaining entry is the last one for its site, allow temporary overflow + # instead of evicting the final fallback. + # """ + # if not self._site_cache: + # return + # + # def _site_counts(): + # counts = {} + # for (site, _bucket) in self._site_cache: + # counts[site] = counts.get(site, 0) + 1 + # return counts + # + # # ----------------------------- + # # 1) Age-based prune (safe) + # # ----------------------------- + # if self.stale_max_age_s > 0: + # site_counts = _site_counts() + # old_entries = sorted( + # self._site_cache.items(), key=lambda item: item[1][0] + # ) # oldest first + # + # for key, (ts, _pue, _ci, _gocdb) in old_entries: + # age = now - ts + # if age <= self.stale_max_age_s: + # continue + # + # site = key[0] + # # delete only if site keeps at least one entry after deletion + # if site_counts.get(site, 0) > 1: + # del self._site_cache[key] + # site_counts[site] -= 1 + # + # # ----------------------------- + # # 2) Size-based prune (safe) + # # ----------------------------- + # if self.cache_max_entries > 0 and len(self._site_cache) > self.cache_max_entries: + # site_counts = _site_counts() + # candidates = sorted( + # self._site_cache.items(), key=lambda item: item[1][0] + # ) # oldest first + # + # # Evict oldest entries, but never the last one for a site + # for key, (_ts, _pue, _ci, _gocdb) in candidates: + # if len(self._site_cache) <= self.cache_max_entries: + # break + # + # site = key[0] + # if site_counts.get(site, 0) > 1: + # del self._site_cache[key] + # site_counts[site] -= 1 + # + # # Optional: if still over limit, keep overflow rather than dropping + # # the last fallback for any site. + # if len(self._site_cache) > self.cache_max_entries: + # self._log( + # "warn", + # f"Cache above max_entries={self.cache_max_entries}, " + # "kept overflow to preserve one fallback per site", + # ) + + def _get_stale_fallback(self, site, cache_key, now): + exact = self._site_cache.get(cache_key) + if exact: + ts, pue, ci, gocdb = exact + age = now - ts + if age < self.cache_ttl: + return "fresh", (pue, ci, gocdb) + return "stale", (pue, ci, gocdb, age, cache_key) + + newest = None + for (cached_site, cached_bucket), (ts, pue, ci, gocdb) in self._site_cache.items(): + if cached_site != site: + continue + if newest is None or ts > newest[0]: + newest = (ts, pue, ci, gocdb, cached_bucket) + + if not newest: + return None, None + + ts, pue, ci, gocdb, cached_bucket = newest + return "stale", (pue, ci, gocdb, now - ts, (site, cached_bucket)) + + def _getToken(self): + if self._token and self._token_ts: + age_h = (time.time() - self._token_ts) / 3600.0 + if age_h < self.token_max_age_h: + return self._token + + url = f"{self.cim_api_base}/token" + resp = self._request( + "GET", + url, + self.token_timeout_s, + params={"email": self.cim_email, "password": self.cim_password}, + ) + if resp.status_code >= 400: + raise RuntimeError(f"CIM authentication failed: status={resp.status_code}") + + token = None + try: + data = resp.json() + if isinstance(data, dict): + token = data.get("access_token") + elif data is not None: + token = str(data).strip().strip('"') + except ValueError: + token = resp.text.strip().strip('"') + + if not token: + raise RuntimeError("CIM authentication failed: empty token response") + + self._token = token + self._token_ts = time.time() + return token + + def _queryPUEandCI(self, gocdb, startExecTime=None, endExecTime=None): + token = self._getToken() + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + } + + pue_resp = self._request( + "POST", + f"{self.kpi_api_base}/pue", + self.pue_timeout_s, + json={"site_name": gocdb}, + headers=headers, + ) + pue_resp.raise_for_status() + pue_data = pue_resp.json() + + pue, pue_ok = self._to_float(pue_data.get("pue", self.default_pue), self.default_pue) + cacheable = pue_ok + + location = pue_data.get("location") or {} + lat = location.get("latitude") + lon = location.get("longitude") + if lat is None or lon is None: + return pue, float(self.default_ci), False + + now = datetime.now(timezone.utc) + end_iso = self._as_iso8601_utc(endExecTime) or now.isoformat(timespec="seconds").replace( + "+00:00", "Z" + ) + start_iso = self._as_iso8601_utc(startExecTime) or ( + now - timedelta(hours=2) + ).isoformat(timespec="seconds").replace("+00:00", "Z") + + ci_resp = self._request( + "POST", + f"{self.kpi_api_base}/ci", + self.ci_timeout_s, + json={ + "lat": lat, + "lon": lon, + "pue": pue, + "energy_wh": self.default_energy_wh, + "start": start_iso, + "end": end_iso, + "metric_id": gocdb, + }, + headers=headers, + ) + + if ci_resp.status_code != 200: + return pue, float(self.default_ci), False + + try: + ci_data = ci_resp.json() + except Exception: + return pue, float(self.default_ci), False + + ci, ci_ok = self._to_float(ci_data.get("ci_gco2_per_kwh"), self.default_ci) + return pue, ci, cacheable and ci_ok + + def getSiteGreenMetrics(self, site, startExecTime=None, endExecTime=None): + now = time.time() + self._prune_cache(now) + cache_key = (site, self._hour_bucket(endExecTime)) + + state, cached = self._get_stale_fallback(site, cache_key, now) + if state == "fresh": + return cached + stale_fallback = cached if state == "stale" else None + + gocdb = self._resolveGOCDB(site) + try: + pue, ci, cacheable = self._queryPUEandCI( + gocdb, startExecTime=startExecTime, endExecTime=endExecTime + ) + except Exception as exc: + self._log("error", f"CIMClient failure for site={site} gocdb={gocdb}: {exc}") + pue, ci, cacheable = self.default_pue, self.default_ci, False + + if cacheable: + self._site_cache[cache_key] = (now, pue, ci, gocdb) + return pue, ci, gocdb + + if stale_fallback is not None: + stale_pue, stale_ci, stale_gocdb, _stale_age_s, _stale_key = stale_fallback + return stale_pue, stale_ci, stale_gocdb + + return pue, ci, gocdb + + def submitRecord(self, record): + resp = self._request( + "POST", + self.metrics_url, + self.submit_timeout_s, + json=record, + headers={ + "Authorization": f"Bearer {self._getToken()}", + "Content-Type": "application/json", + }, + ) + if resp.status_code not in (200, 201): + self._log("error", f"CIM submission failed [{resp.status_code}]: {resp.text}") + return False + return True diff --git a/src/GreenDIRAC/WorkloadManagementSystem/Client/__init__.py b/src/GreenDIRAC/WorkloadManagementSystem/Client/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/GreenDIRAC/WorkloadManagementSystem/Client/cim.conf b/src/GreenDIRAC/WorkloadManagementSystem/Client/cim.conf new file mode 100644 index 0000000..afaae4b --- /dev/null +++ b/src/GreenDIRAC/WorkloadManagementSystem/Client/cim.conf @@ -0,0 +1,71 @@ +[CIM] +# ========================================================= +# CIM METRICS INGESTION API (AUTHENTICATED) +# OpenAPI: GreenDIGIT WP6 CIM Metrics API +# Base: /gd-cim-api/v1 +# ========================================================= + +# Authentication + +EMAIL = put your email +PASSWORD = put ur password + +# CIM API base (token, verify-token, submit) +API_BASE = https://greendigit-cim.sztaki.hu/gd-cim-api/v1 + +# Metrics submission endpoint +METRICS_URL = https://greendigit-cim.sztaki.hu/gd-cim-api/v1/submit + + +[KPI] +# ========================================================= +# KPI HELPER API (UNAUTHENTICATED) +# OpenAPI: GreenDIGIT KPI Service +# Base: /gd-kpi-api/v1 +# ========================================================= + +# Used for: +# POST /v1/pue +# POST /v1/ci +# POST /v1/transform-and-forward +API_BASE = https://greendigit-cim.sztaki.hu/gd-kpi-api/v1 + + +[Defaults] +# ========================================================= +# Fallback values if KPI calls fail +# ========================================================= +# ========================================================= + + +# ========================================================= + +#=========== PUE 2, CI 250 =============================================== +PUE = 1.7 +CI = 500 +ENERGY_WH = 8500 + + +[Runtime] +# ========================================================= +# Runtime behaviour +# ========================================================= + +# Token lifetime (hours) – matches CIM API (1 day) +TOKEN_MAX_AGE_H = 24 + +# Cache TTL for site KPI metrics (seconds) +CACHE_TTL = 300 + +# HTTP timeout for /token calls (seconds) +TOKEN_TIMEOUT_S = 20 +# +# # HTTP timeout for /pue calls (seconds) +PUE_TIMEOUT_S = 20 +# +# # HTTP timeout for /ci calls (seconds) +CI_TIMEOUT_S = 30 +# +# # HTTP timeout for submit calls (seconds) +SUBMIT_TIMEOUT_S = 30 +#