diff --git a/thingsboard_gateway/config/request.json b/thingsboard_gateway/config/request.json index bdc6cf424c..0ce6a72593 100644 --- a/thingsboard_gateway/config/request.json +++ b/thingsboard_gateway/config/request.json @@ -21,6 +21,9 @@ "timeout": 0.5, "scanPeriod": 5, "dataUnpackExpression": "", + "queryParameters": { + "apikey": "97ebacf4-d52e-4f9b-a297-83315017e36c" + }, "converter": { "type": "json", "deviceNameJsonExpression": "SD8500", @@ -137,8 +140,16 @@ "scanPeriod": 1800, "subRequests": { "cumPwr": { - "url": "/${id}/hist/values/ActiveEnergy/SUM13/3600?start=RELATIVE_-1HOUR&end=RELATIVE_-1HOUR&online=false&aggregate=false", - "processingFunction": "def process_data(data, key): return {key: float(data[\"values\"][0][\"avg\"])/1000.0}" + "url": "/hist/values/ActiveEnergy/SUM13/3600", + "processingFunction": "def process_data(data, key): return {key: float(data[\"values\"][0][\"avg\"])/1000.0}", + "queryParameters": { + "apikey": "97ebacf4-d52e-4f9b-a297-83315017e36c", + "start": "RELATIVE_-1HOUR", + "end": "RELATIVE_-1HOUR", + "online": false, + "aggregate": false, + "id": "${id}" + } } }, "converter": { diff --git a/thingsboard_gateway/connectors/request/request_connector.py b/thingsboard_gateway/connectors/request/request_connector.py index 83763dc2d3..135b81527b 100644 --- a/thingsboard_gateway/connectors/request/request_connector.py +++ b/thingsboard_gateway/connectors/request/request_connector.py @@ -338,7 +338,8 @@ def __send_request(self, request, converter_queue, logger): else request_url_from_config ) logger.debug("Obtained request url from config - %s ", request_url_from_config) - url, response = self.__execute_request(request, request_url_from_config, logger) + query_params = self.__build_request_query_params(request["config"].get("queryParameters")) + url, response = self.__execute_request(request, request_url_from_config, logger, query_params=query_params) if request.get('withResponse'): converter_queue.put(response) @@ -382,7 +383,7 @@ def __send_request(self, request, converter_queue, logger): except Exception as e: logger.exception(e) - def __execute_request(self, request, request_url, logger): + def __execute_request(self, request, request_url, logger, query_params=None): url = self.__host + request_url if not request_url.lower().startswith("http") else request_url request_timeout = request["config"].get("timeout", 1) @@ -397,6 +398,11 @@ def __execute_request(self, request, request_url, logger): } logger.debug("Full url request has been formed - %s", url) + if query_params: + params["params"] = query_params + + logger.debug("Request params: %s", params.get("params")) + if request["config"].get("httpHeaders") is not None: params["headers"] = request["config"]["httpHeaders"] @@ -486,16 +492,21 @@ def __process_sub_requests(self, request, url, data, logger): # Check if a sub request for key is needed key = datatype_object_config.get("key") if key in request["config"].get("subRequests", {}): - request_url_from_config = TBUtility.replace_params_tags(request["config"]["subRequests"][key]["url"], - {"data": data_item}) + sub_request_config = request["config"]["subRequests"][key] + request_url_from_config = TBUtility.replace_params_tags( + sub_request_config["url"], + {"data": data_item} + ) if not request_url_from_config.lower().startswith("http"): if not request_url_from_config.startswith("/"): request_url_from_config = "/" + request_url_from_config - request_url_from_config = url + request_url_from_config + request_url_from_config = self.__host + request_url_from_config logger.debug("Sub request needed for key %s with url %s", key, request_url_from_config) - response = self.__send_sub_request(request, request_url_from_config, logger) + query_params = self.__build_sub_request_query_params(sub_request_config, data_item) + response = self.__send_sub_request(request, request_url_from_config, logger, query_params) + logger.debug("Sub request query params: %s", query_params) logger.debug("Sub request response: %s", response) # Only if a response is available, process it @@ -509,13 +520,18 @@ def __process_sub_requests(self, request, url, data, logger): exec(processing_function, {}, local_scope) result = local_scope["process_data"](response, key) # Update data with result of sub request - data_item.update(result) + if isinstance(result, dict): + data_item.update(result) + else: + logger.warning("Sub request result for key %s is not a dict (%s). Storing under key.", + key, type(result).__name__) + data_item[key] = result logger.debug("Data after sub request processing: %s", data_item) - def __send_sub_request(self, request, sub_request_url, logger): + def __send_sub_request(self, request, sub_request_url, logger, query_params=None): url = "" try: - url, response = self.__execute_request(request, sub_request_url, logger) + url, response = self.__execute_request(request, sub_request_url, logger, query_params=query_params) if response and response.ok: try: return response.json() @@ -534,3 +550,36 @@ def __send_sub_request(self, request, sub_request_url, logger): logger.error("Cannot connect to %s. Connection error.", url) except Exception as e: logger.exception(e) + + def __build_sub_request_query_params(self, sub_request_config, data_item): + query_params = sub_request_config.get("queryParameters") + if not query_params: + return None + + def replace_value(value): + if isinstance(value, str): + return TBUtility.replace_params_tags(value, {"data": data_item}) + if isinstance(value, list): + return [replace_value(item) for item in value] + if isinstance(value, dict): + return {k: replace_value(v) for k, v in value.items()} + return value + + if isinstance(query_params, dict): + return {key: replace_value(value) for key, value in query_params.items()} + return replace_value(query_params) + + def __build_request_query_params(self, query_params): + if not query_params: + return None + + def clone_value(value): + if isinstance(value, list): + return [clone_value(item) for item in value] + if isinstance(value, dict): + return {k: clone_value(v) for k, v in value.items()} + return value + + if isinstance(query_params, dict): + return {key: clone_value(value) for key, value in query_params.items()} + return clone_value(query_params)