-
Notifications
You must be signed in to change notification settings - Fork 110
Set output datasets to VALID in DBS3 before announcing a standard workflow #10394
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 5 commits
95d7769
9b68635
a898dad
c6968d8
b5db84f
08b49cd
962c532
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| from WMCore.MicroService.MSOutput.MSOutputTemplate import MSOutputTemplate | ||
| from WMCore.WMException import WMException | ||
| from WMCore.Services.AlertManager.AlertManagerAPI import AlertManagerAPI | ||
| from WMCore.Services.DBS.DBS3Writer import DBS3Writer | ||
|
|
||
|
|
||
| class MSOutputException(WMException): | ||
|
|
@@ -248,6 +249,53 @@ def _executeConsumer(self, summary): | |
| self.logger.exception(msg) | ||
| self.updateReportDict(summary, "error", msg) | ||
|
|
||
| def setDBSStatus(self, workflow): | ||
| """ | ||
| The function to set the DBS status of outputs as VALID | ||
| :param workflow: a MSOutputTemplate object workflow | ||
| :return: the MSOutputTemplate object itself (with the necessary updates in place) | ||
| """ | ||
| if not isinstance(workflow, MSOutputTemplate): | ||
| msg = "Unsupported type object '{}' for workflows! ".format(type(workflow)) | ||
| msg += "It needs to be of type: MSOutputTemplate" | ||
| raise UnsupportedError(msg) | ||
|
|
||
| dbs3Writer = DBS3Writer(url=self.msConfig["dbsReadUrl"], | ||
| writeUrl=self.msConfig["dbsWriteUrl"]) | ||
|
|
||
| # if anything fail along the way, set it back to "pending" | ||
| dbsUpdateStatus = "done" | ||
| for dMap in workflow['OutputMap']: | ||
|
|
||
| if self.msConfig['enableDbsStatusChange']: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess I would remove this option (and the dry-run mode). When we have it implemented, I believe there is no reason to disable this feature in MSOutput, right?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I put it to make the switch from Unified to WMCore easy. So, once the PR is good to merge, we can merge it and put it to production with
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we could enable this DBS status change in MSOutput and keep both this service and Unified updating the DBS status for a day or two (I'm almost sure it's harmless and nothing fails). Then we have time to stop Unified. At this stage, dry-run mode is just an over-complication of the code IMO.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That makes sense to me. This option is removed in the latest commit. |
||
|
|
||
| res = dbs3Writer.setDBSStatus(dataset=dMap["Dataset"], | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Note that my comments on the DBS3Writer module will impact this logic. Another comment, I failed to see where this and simply refer to it here.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also made it configurable: https://github.com/dmwm/deployment/pull/1044/files#diff-3a773e3fbfb5ff69f5fd806e7a4ab54a7dfaaf1f080e7e96ac1df9e73589c2ffR115 It's also possible to set it within the module. Which one do you prefer?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I missed the deployment changes. Thank for pointing it out.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since deployment has the value, I did not set a default value in this module to avoid duplicates. If you think, it's more suitable to keep the configuration in MSOutput module instead of deployment repository, that's also fine, I can change it if you'd like. |
||
| status=self.msConfig['dbsStatus']["valid"]) | ||
|
|
||
| if res: | ||
| dMap["DBSStatus"] = self.msConfig['dbsStatus']["valid"] | ||
| else: | ||
| # There is at least one dataset whose dbs status update is unsuccessful | ||
| dbsUpdateStatus = "pending" | ||
|
|
||
| else: | ||
| msg = "DRY-RUN DBS: DBS status change for DID: {}, ".format(dMap['Dataset']) | ||
| self.logger.info(msg) | ||
|
|
||
| # Finally, update the MSOutput template document with either partial or | ||
| # complete dbs statuses | ||
| self.docKeyUpdate(workflow, OutputMap=workflow['OutputMap']) | ||
| workflow.updateTime() | ||
| if dbsUpdateStatus == "done": | ||
| self.logger.info("All the DBS status updates succeeded for: %s. Marking it as 'done'", | ||
| workflow['RequestName']) | ||
| self.docKeyUpdate(workflow, DBSUpdateStatus='done') | ||
| else: | ||
| self.logger.info("DBS status updates partially successful for: %s. Keeping it 'pending'", | ||
| workflow['RequestName']) | ||
|
|
||
| return workflow | ||
|
|
||
| def makeSubscriptions(self, workflow): | ||
| """ | ||
| The common function to make the final subscriptions | ||
|
|
@@ -448,28 +496,32 @@ def msOutputConsumer(self): | |
| # Done: To build it through a pipe | ||
| # Done: To write back the updated document to MonogoDB | ||
| msPipelineRelVal = Pipeline(name="MSOutputConsumer PipelineRelVal", | ||
| funcLine=[Functor(self.makeSubscriptions), | ||
| funcLine=[Functor(self.setDBSStatus), | ||
| Functor(self.makeSubscriptions), | ||
| Functor(self.makeTapeSubscriptions), | ||
| Functor(self.docUploader, | ||
| update=True, | ||
| keys=['LastUpdate', | ||
| 'TransferStatus', | ||
| 'DBSUpdateStatus', | ||
| 'OutputMap']), | ||
| Functor(self.docDump, pipeLine='PipelineRelVal'), | ||
| Functor(self.docCleaner)]) | ||
| msPipelineNonRelVal = Pipeline(name="MSOutputConsumer PipelineNonRelVal", | ||
| funcLine=[Functor(self.makeSubscriptions), | ||
| funcLine=[Functor(self.setDBSStatus), | ||
| Functor(self.makeSubscriptions), | ||
| Functor(self.makeTapeSubscriptions), | ||
| Functor(self.docUploader, | ||
| update=True, | ||
| keys=['LastUpdate', | ||
| 'TransferStatus', | ||
| 'DBSUpdateStatus', | ||
| 'OutputMap']), | ||
| Functor(self.docDump, pipeLine='PipelineNonRelVal'), | ||
| Functor(self.docCleaner)]) | ||
|
|
||
| wfCounterTotal = 0 | ||
| mQueryDict = {'TransferStatus': 'pending'} | ||
| mQueryDict = { "$or": [ { "TransferStatus": "pending" }, { "DBSUpdateStatus": "pending" } ] } | ||
| pipeCollections = [(msPipelineRelVal, self.msOutRelValColl), | ||
| (msPipelineNonRelVal, self.msOutNonRelValColl)] | ||
| for pipeColl in pipeCollections: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -102,6 +102,7 @@ def docSchema(self): | |
| 'Copies': 1, | ||
| ...}], | ||
| "TransferStatus": "pending"|"done, | ||
| "DBSUpdateStatus": "pending"|"done, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess I would make it a boolean, thus something like: but then it would require more changes from your side. So I leave it up to you.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good suggestion, this is applied in the last commit. |
||
| "RequestType": "" | ||
| } | ||
| :return: a list of tuples | ||
|
|
@@ -116,7 +117,8 @@ def docSchema(self): | |
| ('IsRelVal', False, bool), | ||
| ('OutputDatasets', [], list), | ||
| ('OutputMap', [], list), | ||
| ('TransferStatus', "pending", (bytes, str))] | ||
| ('TransferStatus', "pending", (bytes, str)), | ||
| ('DBSUpdateStatus', "pending", (bytes, str))] | ||
| return docTemplate | ||
|
|
||
| def outputMapSchema(self): | ||
|
|
@@ -135,7 +137,8 @@ def outputMapSchema(self): | |
| 'DiskDestination': "", | ||
| 'TapeDestination': "", | ||
| 'DiskRuleID': "", | ||
| 'TapeRuleID': ""} | ||
| 'TapeRuleID': "", | ||
| 'DBSStatus': ""} | ||
| :return: a list of tuples | ||
| """ | ||
| outMapTemplate = [ | ||
|
|
@@ -146,7 +149,8 @@ def outputMapSchema(self): | |
| ('DiskDestination', "", (bytes, str)), | ||
| ('TapeDestination', "", (bytes, str)), | ||
| ('DiskRuleID', "", (bytes, str)), | ||
| ('TapeRuleID', "", (bytes, str))] | ||
| ('TapeRuleID', "", (bytes, str)), | ||
| ('DBSStatus', "", (bytes, str))] | ||
| return outMapTemplate | ||
|
|
||
| def _checkAttr(self, myDoc, update=False, throw=False, **kwargs): | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -971,3 +971,32 @@ def getParentDatasetTrio(self, childDataset): | |||||
| frozenKey = frozenset(runLumiPair) | ||||||
| parentFrozenData[frozenKey] = fileId | ||||||
| return parentFrozenData | ||||||
|
|
||||||
| def getDBSStatus(self, dataset): | ||||||
| """ | ||||||
| The function to get the DBS status of outputs | ||||||
| :param dataset: dataset name | ||||||
| :return: DBS status of the given dataset | ||||||
| """ | ||||||
|
|
||||||
| allowedDbsStatuses = ["VALID", "INVALID", "PRODUCTION"] | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. a dataset can have other statuses. If I'm not wrong, DELETED and DEPRECATED as well. However, I don't think we need to have a list of valid statuses, we just use whatever is provided by DBS.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was also unsure whether it's necessary to put this extra check and honestly I did not have a strong reason to do so. So, it's okay to remove it to reduce the level of complexity.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As agreed, this extra check is also removed. |
||||||
|
|
||||||
| response = None | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
| try: | ||||||
| response = self.dbs.listDatasets(dataset=dataset, dataset_access_type='*', detail=True) | ||||||
| except Exception as ex: | ||||||
| msg = "Exception while getting the status of following dataset on DBS: {} ".format(dataset) | ||||||
| msg += "Error: {}".format(str(ex)) | ||||||
| self.logger.exception(msg) | ||||||
|
|
||||||
| if response: | ||||||
| dbsStatus = response[0]['dataset_access_type'] | ||||||
| isAllowedStatus = dbsStatus in allowedDbsStatuses | ||||||
|
|
||||||
| if isAllowedStatus: | ||||||
| self.logger.info("%s is %s", dataset, dbsStatus) | ||||||
| return dbsStatus | ||||||
| else: | ||||||
| raise Exception("This is not an allowed DBS status: {}".format(str(dbsStatus))) | ||||||
| else: | ||||||
| return None | ||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| #!/usr/bin/env python | ||
| """ | ||
| _DBSReader_ | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wrong copy/pasting? :-D You might just remove it as well. |
||
|
|
||
| Read/Write DBS Interface | ||
|
|
||
| """ | ||
| from __future__ import print_function, division | ||
| from builtins import str | ||
| import logging | ||
|
|
||
| from dbs.apis.dbsClient import DbsApi | ||
| from dbs.exceptions.dbsClientException import dbsClientException | ||
|
|
||
| from WMCore.Services.DBS.DBSErrors import DBSWriterError, formatEx3 | ||
| from WMCore.Services.DBS.DBS3Reader import DBS3Reader | ||
|
|
||
|
|
||
| class DBS3Writer(DBS3Reader): | ||
| """ | ||
| _DBSReader_ | ||
|
|
||
| General API for writing data to DBS | ||
| """ | ||
|
|
||
| def __init__(self, url, writeUrl, logger=None, **contact): | ||
|
|
||
| # instantiate dbs api object | ||
| try: | ||
| super(DBS3Writer, self).__init__(url=url) | ||
| self.dbs = DbsApi(writeUrl, **contact) | ||
| self.logger = logger or logging.getLogger(self.__class__.__name__) | ||
| except dbsClientException as ex: | ||
| msg = "Error in DBSWriter with DbsApi\n" | ||
| msg += "%s\n" % formatEx3(ex) | ||
| raise DBSWriterError(msg) | ||
|
|
||
| def setDBSStatus(self, dataset, status): | ||
| """ | ||
| The function to set the DBS status of an output dataset | ||
| :param dataset: Dataset name | ||
| :return: True if operation is successful, False otherwise | ||
| """ | ||
|
|
||
| try: | ||
| self.dbs.updateDatasetType(dataset=dataset, | ||
| dataset_access_type=status) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indentation needs fixing. I would suggest to remove the blank line at 44 as well.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. BTW, we should fetch the output of this call and log the error in case it happens; or return it to the caller module.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I also thought that but the function returns
Which one do you prefer? Btw, we already log the error in the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, too bad it returns None in case the call is successful..
This is my preference. Given that - if there is an exception - the error would likely be on the server side, I would suggest to change the log level from exception to error, such that we do not pollute the service logs with unnecessary information.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I applied your suggestions Alan, but |
||
| except Exception as ex: | ||
| msg = "Exception while setting the status of following dataset on DBS: {} ".format(dataset) | ||
| msg += "Error: {}".format(str(ex)) | ||
| self.logger.exception(msg) | ||
|
|
||
| dbsStatus = super(DBS3Writer, self).getDBSStatus(dataset) | ||
|
|
||
| if dbsStatus == status: | ||
| return True | ||
| else: | ||
| return False | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to initialize it in the
__init__method instead.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be fine now