Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions src/python/Utils/PortForward.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
#!/usr/bin/env python
"""
_PortForward_

A decorator for swapping ports in an url
"""
from __future__ import print_function, division
from future import standard_library
standard_library.install_aliases()

from builtins import str

import logging
from urllib.parse import urlparse, ParseResult


def portForward(port):
"""
Decorator wrapper function for port forwarding of the REST calls of any
function to a given port.

Currently there are three constraints for applying this decorator.
1. The function to be decorated must be defined within a class and not being a static method.
The reason for that is because we need to be sure the function's signature will
always include the class instance as its first argument.
2. The url argument must be present as the second one in the positional argument list
of the decorated function (right after the class instance argument).
3. The url must follow the syntax specifications in RFC 1808:
https://tools.ietf.org/html/rfc1808.html

If all of the above constraints are fulfilled and the url is part of the
urlMangleList, then the url is parsed and the port is substituted with the
one provided as an argument to the decorator's wrapper function.

param port: The port to which the REST call should be forwarded.
"""
def portForwardDecorator(callFunc):
"""
The actual decorator
"""
urlMangleList = ['https://tivanov',
'https://alancc',
'https://cmsweb']

def portMangle(callObj, url, *args, **kwargs):
"""
Function used to check if the url coming with the current argument list
is to be forwarded and if so change the port to the one provided as an
argument to the decorator wrapper.

:param classObj: This is the class object (slef from within the class)
which is always to be present in the signature of a
public method. We will never use this argument, but
we need it there for not breaking the positional
argument order
:param url: This is the actual url to be (eventually) forwarded
:param *args: The positional argument list coming from the original function
:param *kwargs: The keywords argument list coming from the original function
"""
# As a first step try to get a logger from the calling object:
if callable(getattr(callObj, 'logger', None)):
logger = callObj.logger
else:
logger = logging.getLogger()

forwarded = False
try:
oldUrl = urlparse(url)
found = False
if isinstance(url, str):
for mUrl in urlMangleList:
if url.startswith(mUrl):
netlocStr = u'%s:%d' % (oldUrl.hostname, port)
found = True
break
elif isinstance(url, bytes):
for mUrl in urlMangleList:
if url.startswith(mUrl.encode('utf-8')):
netlocStr = b'%s:%d' % (oldUrl.hostname, port)
found = True
break
if found:
newUrl = ParseResult(scheme=oldUrl.scheme,
netloc=netlocStr,
path=oldUrl.path,
params=oldUrl.params,
query=oldUrl.query,
fragment=oldUrl.fragment)
newUrl = newUrl.geturl()
forwarded = True
except Exception as ex:
msg = "Failed to forward url: %s to port: %s due to ERROR: %s"
logger.exception(msg, url, port, str(ex))
if forwarded:
return callFunc(callObj, newUrl, *args, **kwargs)
else:
return callFunc(callObj, url, *args, **kwargs)
return portMangle
return portForwardDecorator


class PortForward():
"""
A class with a call method implementing a simple way to use the functionality
provided by the protForward decorator as a pure functional call:
EXAMPLE:
from Utils.PortForward import PortForward

portForwarder = PortForward(8443)
url = 'https://cmsweb-testbed.cern.ch/couchdb'
url = portForwarder(url)
"""
def __init__(self, port):
"""
The init method for the PortForward call class. This one is supposed
to simply provide an initial class instance with a logger.
"""
self.logger = logging.getLogger()
self.port = port

def __call__(self, url):
"""
The call method for the PortForward class
"""
def dummyCall(self, url):
return url
return portForward(self.port)(dummyCall)(self, url)
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pprint import pformat
from Utils.Timers import timeFunction
from Utils.Utilities import numberCouchProcess
from Utils.PortForward import PortForward
from WMComponent.AgentStatusWatcher.DrainStatusPoller import DrainStatusPoller
from WMComponent.AnalyticsDataCollector.DataCollectAPI import WMAgentDBData, initAgentInfo
from WMCore.Credential.Proxy import Proxy
Expand All @@ -25,6 +26,7 @@
# CMSMonitoring modules
from CMSMonitoring.StompAMQ import StompAMQ


class AgentStatusPoller(BaseWorkerThread):
"""
Gether the summary data for request (workflow) from local queue,
Expand All @@ -50,6 +52,9 @@ def __init__(self, config):
self.credThresholds = {'proxy': {'error': 3, 'warning': 5},
'certificate': {'error': 10, 'warning': 20}}

# create a portForwarder to be used for rerouting the replication process
self.portForwarder = PortForward(8443)

# Monitoring setup
self.userAMQ = getattr(config.AgentStatusWatcher, "userAMQ", None)
self.passAMQ = getattr(config.AgentStatusWatcher, "passAMQ", None)
Expand All @@ -73,6 +78,7 @@ def setUpCouchDBReplication(self):
# set up common replication code
wmstatsSource = self.config.JobStateMachine.jobSummaryDBName
wmstatsTarget = self.config.General.centralWMStatsURL
wmstatsTarget = self.portForwarder(wmstatsTarget)

self.replicatorDocs.append({'source': wmstatsSource, 'target': wmstatsTarget,
'filter': "WMStatsAgent/repfilter"})
Expand All @@ -85,7 +91,9 @@ def setUpCouchDBReplication(self):
# set up workqueue replication
wqfilter = 'WorkQueue/queueFilter'
parentQURL = self.config.WorkQueueManager.queueParams["ParentQueueCouchUrl"]
parentQURL = self.portForwarder(parentQURL)
childURL = self.config.WorkQueueManager.queueParams["QueueURL"]
childURL = self.portForwarder(childURL)
query_params = {'childUrl': childURL, 'parentUrl': sanitizeURL(parentQURL)['url']}
localQInboxURL = "%s_inbox" % self.config.AnalyticsDataCollector.localQueueURL
self.replicatorDocs.append({'source': sanitizeURL(parentQURL)['url'], 'target': localQInboxURL,
Expand Down
2 changes: 2 additions & 0 deletions src/python/WMCore/Services/Requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from WMCore.Lexicon import sanitizeURL
from WMCore.WMException import WMException
from WMCore.Wrappers.JsonWrapper.JSONThunker import JSONThunker
from Utils.PortForward import portForward

try:
from WMCore.Services.pycurl_manager import RequestHandler, ResponseHeader
Expand Down Expand Up @@ -66,6 +67,7 @@ class Requests(dict):
Generic class for sending different types of HTTP Request to a given URL
"""

@portForward(8443)
def __init__(self, url='http://localhost', idict=None):
"""
url should really be host - TODO fix that when have sufficient code
Expand Down
4 changes: 0 additions & 4 deletions src/python/WMCore/Services/Service.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ def __init__(self, cfg_dict=None):
if not cfg_dict['endpoint'].endswith('/'):
cfg_dict['endpoint'] = cfg_dict['endpoint'].strip() + '/'

# setup port 8443 for cmsweb services
if cfg_dict['endpoint'].startswith("https://cmsweb"):
cfg_dict['endpoint'] = cfg_dict['endpoint'].replace('.cern.ch/', '.cern.ch:8443/', 1)

# set up defaults
self.setdefault("inputdata", {})
self.setdefault("cacheduration", 0.5)
Expand Down
9 changes: 7 additions & 2 deletions src/python/WMCore/Services/pycurl_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
from urllib.parse import urlencode

from Utils.Utilities import encodeUnicodeToBytes
from Utils.PortForward import portForward, PortForward


class ResponseHeader(object):
"""ResponseHeader parses HTTP response header"""
Expand Down Expand Up @@ -188,7 +190,6 @@ def set_opts(self, curl, url, params, headers,

encoded_data = self.encode_params(params, verb, doseq, encode)


if verb == 'GET':
if encoded_data:
url = url + '?' + encoded_data
Expand Down Expand Up @@ -269,6 +270,7 @@ def parse_header(self, header):
"""
return ResponseHeader(header)

@portForward(8443)
def request(self, url, params, headers=None, verb='GET',
verbose=0, ckey=None, cert=None, capath=None,
doseq=True, encode=False, decode=False, cainfo=None, cookie=None):
Expand Down Expand Up @@ -321,6 +323,7 @@ def getheader(self, url, params, headers=None, verb='GET',
verbose, ckey, cert, doseq=doseq)
return header

@portForward(8443)
def multirequest(self, url, parray, headers=None,
ckey=None, cert=None, verbose=None, cookie=None):
"""Fetch data for given set of parameters"""
Expand Down Expand Up @@ -404,8 +407,10 @@ def getdata(urls, ckey, cert, headers=None, options=None, num_conn=50, cookie=No
if not options:
options = pycurl_options()

portForwarder = PortForward(8443)

# Make a queue with urls
queue = [u for u in urls if validate_url(u)]
queue = [portForwarder(u) for u in urls if validate_url(u)]

# Check args
num_urls = len(queue)
Expand Down
75 changes: 75 additions & 0 deletions test/python/Utils_t/PortForward_t.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python
"""
Unittests for PortForward
"""

from __future__ import division, print_function

import unittest

from Utils.PortForward import portForward, PortForward


class RequestHandler(object):
def __init__(self, config=None, logger=None):
super(RequestHandler, self).__init__()
if not config:
config = {}

@portForward(8443)
def request(self, url, params=None, headers=None, verb='GET',
verbose=0, ckey=None, cert=None, doseq=True,
encode=False, decode=False, cookie=None, uri=None):
return url
Comment thread
todor-ivanov marked this conversation as resolved.


class PortForwardTests(unittest.TestCase):
"""
Unittest for PortForward decorator and class
"""

def __init__(self, *args, **kwargs):
super(PortForwardTests, self).__init__(*args, **kwargs)
self.urlTestList = ['https://cmsweb.cern.ch/couchdb/reqmgr_workload_cache/_design/ReqMgr/_view/bydate?descending=true&limit=1',
'https://cmsweb.cern.ch/couchdb/reqmgr_workload_cache/_design/ReqMgr/_view/bystatusandtime?startkey=%5B%22announced%22%2C+0%5D&endkey=%5B%22announced%22%2C+1616016936%5D&descending=false&stale=update_after&include_docs=false',
'https://cmsweb.cern.ch:8443/reqmgr2/js/?f=utils.js&f=ajax_utils.js&f=md5.js&f=task_splitting.js',
'https://cmsweb.cern.ch:443/wmstatsserver/data/filtered_requests?mask=RequestStatus&mask=RequestType&mask=RequestPriority&mask=Campaign&mask=RequestNumEvents',
u'https://cmsweb.cern.ch/couchdb/reqmgr_workload_cache/_design/ReqMgr/_view/bydate?descending=true&limit=1',
u'https://cmsweb.cern.ch/couchdb/reqmgr_workload_cache/_design/ReqMgr/_view/bystatusandtime?startkey=%5B%22announced%22%2C+0%5D&endkey=%5B%22announced%22%2C+1616016936%5D&descending=false&stale=update_after&include_docs=false',
u'https://cmsweb.cern.ch/reqmgr2/js/?f=utils.js&f=ajax_utils.js&f=md5.js&f=task_splitting.js',
u'https://cmsweb.cern.ch/wmstatsserver/data/filtered_requests?mask=RequestStatus&mask=RequestType&mask=RequestPriority&mask=Campaign&mask=RequestNumEvents',
b'https://cmsweb.cern.ch/couchdb/reqmgr_workload_cache/_design/ReqMgr/_view/bydate?descending=true&limit=1',
b'https://cmsweb.cern.ch/couchdb/reqmgr_workload_cache/_design/ReqMgr/_view/bystatusandtime?startkey=%5B%22announced%22%2C+0%5D&endkey=%5B%22announced%22%2C+1616016936%5D&descending=false&stale=update_after&include_docs=false',
b'https://cmsweb.cern.ch/reqmgr2/js/?f=utils.js&f=ajax_utils.js&f=md5.js&f=task_splitting.js',
b'https://cmsweb.cern.ch/wmstatsserver/data/filtered_requests?mask=RequestStatus&mask=RequestType&mask=RequestPriority&mask=Campaign&mask=RequestNumEvents']

self.urlExpectedtList = ['https://cmsweb.cern.ch:8443/couchdb/reqmgr_workload_cache/_design/ReqMgr/_view/bydate?descending=true&limit=1',
'https://cmsweb.cern.ch:8443/couchdb/reqmgr_workload_cache/_design/ReqMgr/_view/bystatusandtime?startkey=%5B%22announced%22%2C+0%5D&endkey=%5B%22announced%22%2C+1616016936%5D&descending=false&stale=update_after&include_docs=false',
'https://cmsweb.cern.ch:8443/reqmgr2/js/?f=utils.js&f=ajax_utils.js&f=md5.js&f=task_splitting.js',
'https://cmsweb.cern.ch:8443/wmstatsserver/data/filtered_requests?mask=RequestStatus&mask=RequestType&mask=RequestPriority&mask=Campaign&mask=RequestNumEvents',
u'https://cmsweb.cern.ch:8443/couchdb/reqmgr_workload_cache/_design/ReqMgr/_view/bydate?descending=true&limit=1',
u'https://cmsweb.cern.ch:8443/couchdb/reqmgr_workload_cache/_design/ReqMgr/_view/bystatusandtime?startkey=%5B%22announced%22%2C+0%5D&endkey=%5B%22announced%22%2C+1616016936%5D&descending=false&stale=update_after&include_docs=false',
u'https://cmsweb.cern.ch:8443/reqmgr2/js/?f=utils.js&f=ajax_utils.js&f=md5.js&f=task_splitting.js',
u'https://cmsweb.cern.ch:8443/wmstatsserver/data/filtered_requests?mask=RequestStatus&mask=RequestType&mask=RequestPriority&mask=Campaign&mask=RequestNumEvents',
b'https://cmsweb.cern.ch:8443/couchdb/reqmgr_workload_cache/_design/ReqMgr/_view/bydate?descending=true&limit=1',
b'https://cmsweb.cern.ch:8443/couchdb/reqmgr_workload_cache/_design/ReqMgr/_view/bystatusandtime?startkey=%5B%22announced%22%2C+0%5D&endkey=%5B%22announced%22%2C+1616016936%5D&descending=false&stale=update_after&include_docs=false',
b'https://cmsweb.cern.ch:8443/reqmgr2/js/?f=utils.js&f=ajax_utils.js&f=md5.js&f=task_splitting.js',
b'https://cmsweb.cern.ch:8443/wmstatsserver/data/filtered_requests?mask=RequestStatus&mask=RequestType&mask=RequestPriority&mask=Campaign&mask=RequestNumEvents']

def testDecorator(self):
requesHandler = RequestHandler()
self.urlResultList = []
for url in self.urlTestList:
self.urlResultList.append(requesHandler.request(url))
self.assertItemsEqual(self.urlResultList, self.urlExpectedtList)

def testCallClass(self):
portForwarder = PortForward(8443)
self.urlResultList = []
for url in self.urlTestList:
self.urlResultList.append(portForwarder(url))
self.assertItemsEqual(self.urlResultList, self.urlExpectedtList)


if __name__ == '__main__':
unittest.main()