Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions thingsboard_gateway/config/request.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
"timeout": 0.5,
"scanPeriod": 5,
"dataUnpackExpression": "",
"queryParameters": {
"apikey": "97ebacf4-d52e-4f9b-a297-83315017e36c"
},
"converter": {
"type": "json",
"deviceNameJsonExpression": "SD8500",
Expand Down Expand Up @@ -137,8 +140,16 @@
"scanPeriod": 1800,
"subRequests": {
"cumPwr": {
"url": "/${id}/hist/values/ActiveEnergy/SUM13/3600?start=RELATIVE_-1HOUR&end=RELATIVE_-1HOUR&online=false&aggregate=false",
"processingFunction": "def process_data(data, key): return {key: float(data[\"values\"][0][\"avg\"])/1000.0}"
"url": "/hist/values/ActiveEnergy/SUM13/3600",
"processingFunction": "def process_data(data, key): return {key: float(data[\"values\"][0][\"avg\"])/1000.0}",
"queryParameters": {
"apikey": "97ebacf4-d52e-4f9b-a297-83315017e36c",
"start": "RELATIVE_-1HOUR",
"end": "RELATIVE_-1HOUR",
"online": false,
"aggregate": false,
"id": "${id}"
}
}
},
"converter": {
Expand Down
67 changes: 58 additions & 9 deletions thingsboard_gateway/connectors/request/request_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ def __send_request(self, request, converter_queue, logger):
else request_url_from_config
)
logger.debug("Obtained request url from config - %s ", request_url_from_config)
url, response = self.__execute_request(request, request_url_from_config, logger)
query_params = self.__build_request_query_params(request["config"].get("queryParameters"))
url, response = self.__execute_request(request, request_url_from_config, logger, query_params=query_params)

if request.get('withResponse'):
converter_queue.put(response)
Expand Down Expand Up @@ -382,7 +383,7 @@ def __send_request(self, request, converter_queue, logger):
except Exception as e:
logger.exception(e)

def __execute_request(self, request, request_url, logger):
def __execute_request(self, request, request_url, logger, query_params=None):
url = self.__host + request_url if not request_url.lower().startswith("http") else request_url

request_timeout = request["config"].get("timeout", 1)
Expand All @@ -397,6 +398,11 @@ def __execute_request(self, request, request_url, logger):
}
logger.debug("Full url request has been formed - %s", url)

if query_params:
params["params"] = query_params

logger.debug("Request params: %s", params.get("params"))

if request["config"].get("httpHeaders") is not None:
params["headers"] = request["config"]["httpHeaders"]

Expand Down Expand Up @@ -486,16 +492,21 @@ def __process_sub_requests(self, request, url, data, logger):
# Check if a sub request for key is needed
key = datatype_object_config.get("key")
if key in request["config"].get("subRequests", {}):
request_url_from_config = TBUtility.replace_params_tags(request["config"]["subRequests"][key]["url"],
{"data": data_item})
sub_request_config = request["config"]["subRequests"][key]
request_url_from_config = TBUtility.replace_params_tags(
sub_request_config["url"],
{"data": data_item}
)

if not request_url_from_config.lower().startswith("http"):
if not request_url_from_config.startswith("/"):
request_url_from_config = "/" + request_url_from_config
request_url_from_config = url + request_url_from_config
request_url_from_config = self.__host + request_url_from_config
logger.debug("Sub request needed for key %s with url %s", key, request_url_from_config)

response = self.__send_sub_request(request, request_url_from_config, logger)
query_params = self.__build_sub_request_query_params(sub_request_config, data_item)
response = self.__send_sub_request(request, request_url_from_config, logger, query_params)
logger.debug("Sub request query params: %s", query_params)
logger.debug("Sub request response: %s", response)

# Only if a response is available, process it
Expand All @@ -509,13 +520,18 @@ def __process_sub_requests(self, request, url, data, logger):
exec(processing_function, {}, local_scope)
result = local_scope["process_data"](response, key)
# Update data with result of sub request
data_item.update(result)
if isinstance(result, dict):
data_item.update(result)
else:
logger.warning("Sub request result for key %s is not a dict (%s). Storing under key.",
key, type(result).__name__)
data_item[key] = result
logger.debug("Data after sub request processing: %s", data_item)

def __send_sub_request(self, request, sub_request_url, logger):
def __send_sub_request(self, request, sub_request_url, logger, query_params=None):
url = ""
try:
url, response = self.__execute_request(request, sub_request_url, logger)
url, response = self.__execute_request(request, sub_request_url, logger, query_params=query_params)
if response and response.ok:
try:
return response.json()
Expand All @@ -534,3 +550,36 @@ def __send_sub_request(self, request, sub_request_url, logger):
logger.error("Cannot connect to %s. Connection error.", url)
except Exception as e:
logger.exception(e)

def __build_sub_request_query_params(self, sub_request_config, data_item):
query_params = sub_request_config.get("queryParameters")
if not query_params:
return None

def replace_value(value):
if isinstance(value, str):
return TBUtility.replace_params_tags(value, {"data": data_item})
if isinstance(value, list):
return [replace_value(item) for item in value]
if isinstance(value, dict):
return {k: replace_value(v) for k, v in value.items()}
return value

if isinstance(query_params, dict):
return {key: replace_value(value) for key, value in query_params.items()}
return replace_value(query_params)

def __build_request_query_params(self, query_params):
if not query_params:
return None

def clone_value(value):
if isinstance(value, list):
return [clone_value(item) for item in value]
if isinstance(value, dict):
return {k: clone_value(v) for k, v in value.items()}
return value

if isinstance(query_params, dict):
return {key: clone_value(value) for key, value in query_params.items()}
return clone_value(query_params)
Loading