-
Notifications
You must be signed in to change notification settings - Fork 12
Adding lambda support #3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,11 +17,11 @@ | |
| import os | ||
| import random | ||
| import re | ||
| from types import LambdaType | ||
| from collections import OrderedDict | ||
| from typing import Optional, Tuple, Union | ||
| from mitmproxy import http | ||
| from mitmproxy import ctx | ||
| from mitmproxy import net | ||
|
|
||
| def host_matches(host: str, allow) -> bool: | ||
| """ | ||
|
|
@@ -380,6 +380,84 @@ def replace_in_content(replace: Union[str,list,dict], content): | |
| ctx.log.error("Invalid JSON: {}: after replace: {}".format(error, replace)) | ||
| return content | ||
|
|
||
| def lambda_with_args(lambda_func: str, index: int): | ||
| """ | ||
| Given a string representation of a lambda function and the corresponding code | ||
| block delimiter index, returns `lambda_func` as a string representing a lambda | ||
| function, inserting an optional argument if necessary. | ||
| """ | ||
| return re.sub("^\\s*lambda\\s*$", "lambda _", lambda_func[:index]) + lambda_func[index:] | ||
|
|
||
| def replace_lambda_in_dict(replace_lambda: dict, dictionary): | ||
| """ | ||
| For each value in `replace_lambda`, performs corresponding replacement (dict | ||
| update) in `dictionary`. | ||
| """ | ||
| for key in replace_lambda: | ||
| replace_lambda_value = replace_lambda[key] | ||
| if isinstance(replace_lambda_value, dict): | ||
| dictionary[key] = replace_lambda_in_dict(replace_lambda_value, dictionary.get(key, {})) | ||
| else: | ||
| index = replace_lambda_value.find(":") | ||
| if index != -1: | ||
| replace_lambda_value = lambda_with_args(replace_lambda_value, index) | ||
| try: | ||
| code_object = compile(replace_lambda_value, "<lambda>", "eval") | ||
| if code_object.co_consts[1] == "<lambda>": | ||
| replace_lambda_value = eval(replace_lambda_value) | ||
| if isinstance(replace_lambda_value, LambdaType) and replace_lambda_value.__name__ == "<lambda>": | ||
| try: | ||
| dictionary[key] = replace_lambda_value(dictionary.get(key, "")) | ||
| except Exception: | ||
| pass | ||
| except Exception: | ||
| pass | ||
| return dictionary | ||
|
|
||
| def replace_lambda_in_content(replace_lambda: Union[dict,str,list], content): | ||
| """ | ||
| Performs replacement `replace_lambda` (dict update or regex sub) in `content`. | ||
| """ | ||
| if isinstance(replace_lambda, dict): | ||
| content = replace_lambda_in_dict(replace_lambda, content_as_object(content) or {}) | ||
| elif replace_lambda: | ||
| if isinstance(replace_lambda, str): | ||
| index = replace_lambda.find(":") | ||
| if index != -1: | ||
| replace_lambda = lambda_with_args(replace_lambda, index) | ||
| try: | ||
| code_object = compile(replace_lambda, "<lambda>", "eval") | ||
| if code_object.co_consts[1] == "<lambda>": | ||
| replace_lambda = eval(replace_lambda) | ||
| if isinstance(replace_lambda, LambdaType) and replace_lambda.__name__ == "<lambda>": | ||
| try: | ||
| content = replace_lambda(content) | ||
| except ValueError as error: | ||
| ctx.log.error("Invalid JSON: {}: after replace: {}".format(error, replace_lambda)) | ||
| except Exception: | ||
| pass | ||
| except Exception: | ||
| pass | ||
| else: | ||
| sub_re, replacement = compiled_re_for(replace_lambda[0]), replace_lambda[1] | ||
| index = replacement.find(":") | ||
| if index != -1: | ||
| replacement = lambda_with_args(replacement, index) | ||
| try: | ||
| code_object = compile(replacement, "<lambda>", "eval") | ||
| if code_object.co_consts[1] == "<lambda>": | ||
| replacement = eval(replacement) | ||
| if isinstance(replacement, LambdaType) and replacement.__name__ == "<lambda>": | ||
| try: | ||
| content = sub_re.sub(replacement, content_as_str(content)) | ||
| except ValueError as error: | ||
| ctx.log.error("Invalid JSON: {}: after replace: {}".format(error, replace_lambda)) | ||
| except Exception: | ||
| pass | ||
| except Exception: | ||
| pass | ||
| return content | ||
|
|
||
| def modify_content(modify: Union[str,list,dict], content): | ||
| """ | ||
| Returns `content` modified according to all elements of `modify`. | ||
|
|
@@ -390,6 +468,7 @@ def modify_content(modify: Union[str,list,dict], content): | |
| if isinstance(modification, dict): | ||
| delete = modification.get("delete") | ||
| replace = modification.get("replace") | ||
| replace_lambda = modification.get("replace_lambda") | ||
| merge = modification.get("merge") | ||
| if delete: | ||
| content = delete_content(delete, content_as_object(content)) | ||
|
|
@@ -408,6 +487,8 @@ def modify_content(modify: Union[str,list,dict], content): | |
| content = replace | ||
| else: | ||
| content = replace_in_content(replace, content) | ||
| if replace_lambda: | ||
| content = replace_lambda_in_content(replace_lambda, content) | ||
| if merge: | ||
| if isinstance(merge, str): | ||
| with open(merge) as merge_file: | ||
|
|
@@ -430,16 +511,18 @@ def encode_content(content: Union[str,list,dict]) -> Tuple[bytes, str]: | |
| `content` may be any of the following: | ||
| - A file name string, in which case the content is loaded from the file and | ||
| type inferred from the file's extension (json, js, html, xml, txt, md). | ||
| - A raw string, in which case it is encoded according as UTF-8, and the type | ||
| - A raw string, in which case it is encoded accordingly as UTF-8, and the type | ||
| is inferred to be HTML if it starts with a `<`, otherwise JSON. | ||
| - A byte string, in which case it is assumed to be encoded as UTF-8, and the | ||
| type is inferred to be HTML if it starts with a `<`, otherwise JSON. | ||
| - An object (such as a dictionary) that can be dumped as JSON, in which case | ||
| it is converted to JSON. | ||
| """ | ||
| content_type = "application/json" | ||
| if isinstance(content, str): | ||
| try: | ||
| with open(content, "rb") as content_file: | ||
| if content.endswith("html"): | ||
| if content.endswith(".html"): | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The lack of
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the context; I'll go ahead and revert this. |
||
| content_type = "text/html" | ||
| elif content.endswith(".xml"): | ||
| content_type = "text/xml" | ||
|
|
@@ -449,9 +532,11 @@ def encode_content(content: Union[str,list,dict]) -> Tuple[bytes, str]: | |
| content_type = "application/javascript" | ||
| return content_file.read(), content_type | ||
| except FileNotFoundError: | ||
| if content.startswith("<"): | ||
| content_type = "text/html" | ||
| return content_as_str(content).encode("utf-8"), content_type + "; charset=utf-8" | ||
| pass | ||
| content = content_as_str(content) | ||
| if content.startswith("<"): | ||
| content_type = "text/html" | ||
| return content.encode("utf-8"), content_type + "; charset=utf-8" | ||
|
|
||
| def make_response(response: Union[str,dict], status, content, headers) -> http.HTTPResponse: | ||
| """ | ||
|
|
@@ -460,7 +545,7 @@ def make_response(response: Union[str,dict], status, content, headers) -> http.H | |
| given values unless overridden by `response`. | ||
|
|
||
| `response` may contain any combination of: | ||
| - `content`: A file name, a raw string, or a JSON object | ||
| - `content`: A file name, a raw string, a byte string, or a JSON object | ||
| - `status`: The HTTP status code | ||
| - `type`: The Content-Type header type with or without the charset | ||
| - `charset`: The charset part of the Content-Type header | ||
|
|
@@ -483,6 +568,97 @@ def make_response(response: Union[str,dict], status, content, headers) -> http.H | |
| ctx.log.debug("Response {}: headers={} content={}".format(status, headers, content)) | ||
| return http.HTTPResponse.make(status, content, headers) | ||
|
|
||
| def make_response_lambda(response: Union[str,dict], status, content, headers) -> http.HTTPResponse: | ||
| """ | ||
| Return a new `HTTPResponse` object constructed from the configuration | ||
| `response`, with the status code, content and headers defaulting to the | ||
| given values unless overridden by `response`. | ||
|
|
||
| `response` may contain any combination of: | ||
| - `content`: A file name, a raw string, a byte string, or a JSON object | ||
| - `status`: The HTTP status code | ||
| - `type`: The Content-Type header type with or without the charset | ||
| - `charset`: The charset part of the Content-Type header | ||
| - `headers`: A dictionary of HTTP headers | ||
|
|
||
| See also: `encode_content` | ||
| """ | ||
| if isinstance(response, str): | ||
| response_lambda = response | ||
| response = { "content": content } | ||
| else: | ||
| response_lambda = response.get("content", "lambda s: s") | ||
| content, content_type = encode_content(content) | ||
| index = response_lambda.find(":") | ||
| if index != -1: | ||
| response_lambda = lambda_with_args(response_lambda, index) | ||
| try: | ||
| code_object = compile(response_lambda, "<lambda>", "eval") | ||
| if code_object.co_consts[1] == "<lambda>": | ||
| response_lambda = eval(response_lambda) | ||
| if isinstance(response_lambda, LambdaType) and response_lambda.__name__ == "<lambda>": | ||
| try: | ||
| content, content_type = encode_content(response_lambda(content_as_str(content))) | ||
| except Exception: | ||
| pass | ||
| except Exception: | ||
| pass | ||
| response_lambda = response.get("type", "lambda s: s") | ||
| content_type = headers.get("Content-Type", content_type) | ||
| index = response_lambda.find(":") | ||
| if index != -1: | ||
| response_lambda = lambda_with_args(response_lambda, index) | ||
| try: | ||
| code_object = compile(response_lambda, "<lambda>", "eval") | ||
| if code_object.co_consts[1] == "<lambda>": | ||
| response_lambda = eval(response_lambda) | ||
| if isinstance(response_lambda, LambdaType) and response_lambda.__name__ == "<lambda>": | ||
| try: | ||
| content_type = response_lambda(content_type) | ||
| except Exception: | ||
| pass | ||
| except Exception: | ||
| pass | ||
| response_lambda = response.get("charset", "lambda s: s") | ||
| charset = mock_config.get("charset", "utf-8") | ||
| index = response_lambda.find(":") | ||
| if index != -1: | ||
| response_lambda = lambda_with_args(response_lambda, index) | ||
| try: | ||
| code_object = compile(response_lambda, "<lambda>", "eval") | ||
| if code_object.co_consts[1] == "<lambda>": | ||
| response_lambda = eval(response_lambda) | ||
| if isinstance(response_lambda, LambdaType) and response_lambda.__name__ == "<lambda>": | ||
| try: | ||
| charset = response_lambda(charset) | ||
| except Exception: | ||
| pass | ||
| except Exception: | ||
| pass | ||
| if charset and not ((";" in content_type) or ("image" in content_type)): | ||
| content_type = "{}; charset={}".format(content_type, charset) | ||
| headers = {**headers, **{ "Content-Type": content_type }} | ||
| response_lambda = response.get("headers", {}) | ||
| if isinstance(response_lambda, dict): | ||
| headers = replace_lambda_in_dict(response_lambda, headers) | ||
| response_lambda = response.get("status", "lambda s: s") | ||
| index = response_lambda.find(":") | ||
| if index != -1: | ||
| response_lambda = lambda_with_args(response_lambda, index) | ||
| try: | ||
| code_object = compile(response_lambda, "<lambda>", "eval") | ||
| if code_object.co_consts[1] == "<lambda>": | ||
| response_lambda = eval(response_lambda) | ||
| if isinstance(response_lambda, LambdaType) and response_lambda.__name__ == "<lambda>": | ||
| try: | ||
| status = response_lambda(status) | ||
| except Exception: | ||
| pass | ||
| except Exception: | ||
| pass | ||
| ctx.log.debug("Response {}: headers={} content={}".format(status, headers, content)) | ||
| return http.HTTPResponse.make(status, content, headers) | ||
|
|
||
| def extract_regex_paths(config: OrderedDict) -> OrderedDict: | ||
| """ | ||
| Returns an `OrderedDict` of compiled regex paths from `config`. | ||
|
|
@@ -780,6 +956,12 @@ def response(flow: http.HTTPFlow) -> None: | |
| if response: | ||
| flow.response = make_response(response, flow.response.status_code, flow.response.content, flow.response.headers) | ||
| ctx.log.debug("Replace response {}: {}".format(flow.request.path, flow.response)) | ||
| replace_lambda = config.get("replace_lambda") | ||
| if replace_lambda: | ||
| response = replace_lambda.get("response", replace_lambda) | ||
| if response: | ||
| flow.response = make_response_lambda(response, flow.response.status_code, flow.response.content, flow.response.headers) | ||
| ctx.log.debug("Replace response {}: {}".format(flow.request.path, flow.response)) | ||
| modify = config.get("modify", []) | ||
| if isinstance(modify, dict) or isinstance(modify, str): | ||
| modify = [ modify ] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if these compiled lambdas should be cached like compiled regexes are? Also, several lines of duplicated code between
replace_lambda_in_dictandreplace_lambda_in_content, maybe move to a function called by both?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great feedback, I will refactor accordingly.