From 4e41e8359d6000ee082e84c930fcba18b9f7be50 Mon Sep 17 00:00:00 2001 From: dvazar Date: Wed, 20 Dec 2017 22:36:00 +0700 Subject: [PATCH 1/7] added: "includeUninitialized" query parameter --- .gitignore | 2 ++ pykube/query.py | 14 +++++++++++--- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 26e63b2..86b53eb 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ __pycache__ *.pyo build/ dist/ +ENV/ +.idea/ \ No newline at end of file diff --git a/pykube/query.py b/pykube/query.py index 31d4b3b..49c2010 100644 --- a/pykube/query.py +++ b/pykube/query.py @@ -21,11 +21,13 @@ def __init__(self, api, api_obj_class, namespace=None): self.namespace = namespace self.selector = everything self.field_selector = everything + self.include_uninitialized = None def all(self): return self._clone() - def filter(self, namespace=None, selector=None, field_selector=None): + def filter(self, namespace=None, selector=None, field_selector=None, + include_uninitialized=None): clone = self._clone() if namespace is not None: clone.namespace = namespace @@ -33,6 +35,8 @@ def filter(self, namespace=None, selector=None, field_selector=None): clone.selector = selector if field_selector is not None: clone.field_selector = field_selector + if include_uninitialized is not None: + clone.include_uninitialized = include_uninitialized return clone def _clone(self, cls=None): @@ -50,8 +54,12 @@ def _build_api_url(self, params=None): params["labelSelector"] = as_selector(self.selector) if self.field_selector is not everything: params["fieldSelector"] = as_selector(self.field_selector) - query_string = urlencode(params) - return "{}{}".format(self.api_obj_class.endpoint, "?{}".format(query_string) if query_string else "") + if self.include_uninitialized is not None: + params["includeUninitialized"] = self.include_uninitialized + return "{}{}".format( + self.api_obj_class.endpoint, + "?{}".format(urlencode(params)) if params else "", + ) class Query(BaseQuery): From 030c4865f57cd8a2e8858a92054e929c53a640c3 Mon Sep 17 00:00:00 2001 From: dvazar Date: Thu, 21 Dec 2017 15:41:44 +0700 Subject: [PATCH 2/7] added: docs of use --- .gitignore | 4 +++- CHANGELOG.md | 1 + README.rst | 4 ++++ pykube/query.py | 4 +++- 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index 86b53eb..8486607 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,6 @@ __pycache__ build/ dist/ ENV/ -.idea/ \ No newline at end of file +.idea/ +tmp/ +Pipfile* \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index b1fb79c..867565f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ * `Query.watch` learned to carry forward all query parameters * `APIObject` learned `watch` to enable per-object watches * `Deployment` learned to roll back using `rollout_undo` similar to `kubectl rollout undo deployment` +* Added `includeUninitialized` query parameter to include partially initialized resources in the response ## 0.14.0 diff --git a/README.rst b/README.rst index 5d0d95b..d66873c 100644 --- a/README.rst +++ b/README.rst @@ -80,6 +80,10 @@ Selector query: pending_pods = pykube.objects.Pod.objects(api).filter( field_selector={"status.phase": "Pending"} ) + uninitialized_pods = pykube.objects.Pod.objects(api).filter( + selector={"environment": "production", "tier": "frontend"}, + include_uninitialized=True, + ) Watch query: diff --git a/pykube/query.py b/pykube/query.py index 49c2010..8694703 100644 --- a/pykube/query.py +++ b/pykube/query.py @@ -45,6 +45,7 @@ def _clone(self, cls=None): clone = cls(self.api, self.api_obj_class, namespace=self.namespace) clone.selector = self.selector clone.field_selector = self.field_selector + clone.include_uninitialized = self.include_uninitialized return clone def _build_api_url(self, params=None): @@ -55,7 +56,8 @@ def _build_api_url(self, params=None): if self.field_selector is not everything: params["fieldSelector"] = as_selector(self.field_selector) if self.include_uninitialized is not None: - params["includeUninitialized"] = self.include_uninitialized + params["includeUninitialized"] = \ + "true" if self.include_uninitialized else "false" return "{}{}".format( self.api_obj_class.endpoint, "?{}".format(urlencode(params)) if params else "", From c148f1f43e832d13f992517bd735012e39f0d0e2 Mon Sep 17 00:00:00 2001 From: dvazar Date: Fri, 22 Dec 2017 20:03:43 +0700 Subject: [PATCH 3/7] added: ability to break large LIST calls into multiple smaller chunks --- .gitignore | 3 +- pykube/query.py | 121 ++++++++++++++++++++++++++++++++++-------------- 2 files changed, 87 insertions(+), 37 deletions(-) diff --git a/.gitignore b/.gitignore index 8486607..fcd5a6d 100644 --- a/.gitignore +++ b/.gitignore @@ -5,7 +5,6 @@ __pycache__ *.pyo build/ dist/ -ENV/ .idea/ tmp/ -Pipfile* \ No newline at end of file +Pipfile* diff --git a/pykube/query.py b/pykube/query.py index 8694703..ea4537c 100644 --- a/pykube/query.py +++ b/pykube/query.py @@ -11,6 +11,7 @@ all_ = object() everything = object() now = object() +initially = object() class BaseQuery(object): @@ -18,10 +19,19 @@ class BaseQuery(object): def __init__(self, api, api_obj_class, namespace=None): self.api = api self.api_obj_class = api_obj_class - self.namespace = namespace - self.selector = everything - self.field_selector = everything - self.include_uninitialized = None + + self._namespace = namespace + self._selector = everything + self._field_selector = everything + self._include_uninitialized = None + self._limit = None + self._offset = initially + + @property + def continuing(self): + if self._offset and self._offset is not initially: + return True + return False def all(self): return self._clone() @@ -30,34 +40,50 @@ def filter(self, namespace=None, selector=None, field_selector=None, include_uninitialized=None): clone = self._clone() if namespace is not None: - clone.namespace = namespace + clone._namespace = namespace if selector is not None: - clone.selector = selector + clone._selector = selector if field_selector is not None: - clone.field_selector = field_selector + clone._field_selector = field_selector if include_uninitialized is not None: - clone.include_uninitialized = include_uninitialized + clone._include_uninitialized = include_uninitialized + return clone + + def limit(self, lim): + clone = self._clone() + clone._limit = lim + return clone + + def offset(self, off): + clone = self._clone() + clone._offset = off return clone def _clone(self, cls=None): if cls is None: cls = self.__class__ - clone = cls(self.api, self.api_obj_class, namespace=self.namespace) - clone.selector = self.selector - clone.field_selector = self.field_selector - clone.include_uninitialized = self.include_uninitialized + clone = cls(self.api, self.api_obj_class, namespace=self._namespace) + clone._selector = self._selector + clone._field_selector = self._field_selector + clone._include_uninitialized = self._include_uninitialized + clone._limit = self._limit + clone._offset = self._offset return clone def _build_api_url(self, params=None): if params is None: params = {} - if self.selector is not everything: - params["labelSelector"] = as_selector(self.selector) - if self.field_selector is not everything: - params["fieldSelector"] = as_selector(self.field_selector) - if self.include_uninitialized is not None: + if self._selector is not everything: + params["labelSelector"] = as_selector(self._selector) + if self._field_selector is not everything: + params["fieldSelector"] = as_selector(self._field_selector) + if self._include_uninitialized is not None: params["includeUninitialized"] = \ - "true" if self.include_uninitialized else "false" + "true" if self._include_uninitialized else "false" + if self._limit: + params["limit"] = self._limit + if self.continuing: + params["continue"] = self._offset return "{}{}".format( self.api_obj_class.endpoint, "?{}".format(urlencode(params)) if params else "", @@ -66,10 +92,14 @@ def _build_api_url(self, params=None): class Query(BaseQuery): + def __init__(self, *args, **kwargs): + super(Query, self).__init__(*args, **kwargs) + self._query_cache = None + def get_by_name(self, name): kwargs = { "url": "{}/{}".format(self.api_obj_class.endpoint, name), - "namespace": self.namespace, + "namespace": self._namespace, } if self.api_obj_class.base: kwargs["base"] = self.api_obj_class.base @@ -102,19 +132,24 @@ def get_or_none(self, *args, **kwargs): def watch(self, since=None): query = self._clone(WatchQuery) if since is now: - query.resource_version = self.response["metadata"]["resourceVersion"] + query._resource_version = \ + self.response["metadata"]["resourceVersion"] elif since is not None: - query.resource_version = since + query._resource_version = since return query def execute(self): + """ + + :rtype: requests.Response + """ kwargs = {"url": self._build_api_url()} if self.api_obj_class.base: kwargs["base"] = self.api_obj_class.base if self.api_obj_class.version: kwargs["version"] = self.api_obj_class.version - if self.namespace is not None and self.namespace is not all_: - kwargs["namespace"] = self.namespace + if self._namespace is not None and self._namespace is not all_: + kwargs["namespace"] = self._namespace r = self.api.get(**kwargs) r.raise_for_status() return r @@ -124,15 +159,23 @@ def iterator(self): Execute the API request and return an iterator over the objects. This method does not use the query cache. """ - for obj in (self.execute().json().get("items") or []): + resp = self.execute().json() + self._offset = resp["metadata"].get("continue", initially) + for obj in resp["items"]: yield self.api_obj_class(self.api, obj) @property def query_cache(self): - if not hasattr(self, "_query_cache"): - cache = {"objects": []} - cache["response"] = self.execute().json() - for obj in (cache["response"].get("items") or []): + """ + + :rtype: dict + """ + if not self._query_cache: + cache = { + "objects": [], + "response": self.execute().json(), + } + for obj in cache["response"]["items"]: cache["objects"].append(self.api_obj_class(self.api, obj)) self._query_cache = cache return self._query_cache @@ -145,25 +188,29 @@ def __iter__(self): @property def response(self): + """ + + :rtype: dict or list[dict] + """ return self.query_cache["response"] class WatchQuery(BaseQuery): def __init__(self, *args, **kwargs): - self.resource_version = kwargs.pop("resource_version", None) + self._resource_version = kwargs.pop("resource_version", None) super(WatchQuery, self).__init__(*args, **kwargs) def object_stream(self): params = {"watch": "true"} - if self.resource_version is not None: - params["resourceVersion"] = self.resource_version + if self._resource_version is not None: + params["resourceVersion"] = self._resource_version kwargs = { "url": self._build_api_url(params=params), "stream": True, } - if self.namespace is not all_: - kwargs["namespace"] = self.namespace + if self._namespace is not all_: + kwargs["namespace"] = self._namespace if self.api_obj_class.version: kwargs["version"] = self.api_obj_class.version r = self.api.get(**kwargs) @@ -171,7 +218,10 @@ def object_stream(self): WatchEvent = namedtuple("WatchEvent", "type object") for line in r.iter_lines(): we = json.loads(line.decode("utf-8")) - yield WatchEvent(type=we["type"], object=self.api_obj_class(self.api, we["object"])) + yield WatchEvent( + type=we["type"], + object=self.api_obj_class(self.api, we["object"]), + ) def __iter__(self): return iter(self.object_stream()) @@ -200,5 +250,6 @@ def as_selector(value): elif op == "notin": s.append("{} notin ({})".format(label, ",".join(v))) else: - raise ValueError("{} is not a valid comparison operator".format(op)) + raise ValueError("{} is not a valid comparison " + "operator".format(op)) return ",".join(s) From 5cef9b38f5c045b2790ea353d1d8670a788cf56a Mon Sep 17 00:00:00 2001 From: dvazar Date: Sat, 23 Dec 2017 00:14:18 +0700 Subject: [PATCH 4/7] refactoring --- pykube/query.py | 84 ++++++++++++++++++++++++++----------------------- 1 file changed, 45 insertions(+), 39 deletions(-) diff --git a/pykube/query.py b/pykube/query.py index ea4537c..0be8890 100644 --- a/pykube/query.py +++ b/pykube/query.py @@ -1,6 +1,6 @@ -import json from collections import namedtuple +import json from six import string_types from six.moves.urllib.parse import urlencode @@ -24,14 +24,6 @@ def __init__(self, api, api_obj_class, namespace=None): self._selector = everything self._field_selector = everything self._include_uninitialized = None - self._limit = None - self._offset = initially - - @property - def continuing(self): - if self._offset and self._offset is not initially: - return True - return False def all(self): return self._clone() @@ -49,16 +41,6 @@ def filter(self, namespace=None, selector=None, field_selector=None, clone._include_uninitialized = include_uninitialized return clone - def limit(self, lim): - clone = self._clone() - clone._limit = lim - return clone - - def offset(self, off): - clone = self._clone() - clone._offset = off - return clone - def _clone(self, cls=None): if cls is None: cls = self.__class__ @@ -66,24 +48,24 @@ def _clone(self, cls=None): clone._selector = self._selector clone._field_selector = self._field_selector clone._include_uninitialized = self._include_uninitialized - clone._limit = self._limit - clone._offset = self._offset return clone - def _build_api_url(self, params=None): + def _collect_params(self, params=None): if params is None: params = {} if self._selector is not everything: + # noinspection PyTypeChecker params["labelSelector"] = as_selector(self._selector) if self._field_selector is not everything: + # noinspection PyTypeChecker params["fieldSelector"] = as_selector(self._field_selector) if self._include_uninitialized is not None: params["includeUninitialized"] = \ "true" if self._include_uninitialized else "false" - if self._limit: - params["limit"] = self._limit - if self.continuing: - params["continue"] = self._offset + return params + + def _build_api_url(self, params=None): + params = self._collect_params(params) return "{}{}".format( self.api_obj_class.endpoint, "?{}".format(urlencode(params)) if params else "", @@ -94,8 +76,32 @@ class Query(BaseQuery): def __init__(self, *args, **kwargs): super(Query, self).__init__(*args, **kwargs) + + self._limit = None + self._continue = initially + self._query_cache = None + def _clone(self, cls=None): + clone = super(Query, self)._clone(cls) + clone._limit = self._limit + clone._continue = self._continue + return clone + + def _collect_params(self, params=None): + params = super(Query, self)._collect_params(params) + if self._limit: + params["limit"] = self._limit + if self.continuing: + params["continue"] = self._continue + return params + + @property + def continuing(self): + if self._continue and self._continue is not initially: + return True + return False + def get_by_name(self, name): kwargs = { "url": "{}/{}".format(self.api_obj_class.endpoint, name), @@ -129,9 +135,20 @@ def get_or_none(self, *args, **kwargs): except ObjectDoesNotExist: return None + def limit(self, lim): + clone = self._clone() + clone._limit = lim + return clone + + def offset(self, off): + clone = self._clone() + clone._continue = off + return clone + def watch(self, since=None): query = self._clone(WatchQuery) if since is now: + # noinspection PyTypeChecker query._resource_version = \ self.response["metadata"]["resourceVersion"] elif since is not None: @@ -139,10 +156,6 @@ def watch(self, since=None): return query def execute(self): - """ - - :rtype: requests.Response - """ kwargs = {"url": self._build_api_url()} if self.api_obj_class.base: kwargs["base"] = self.api_obj_class.base @@ -160,21 +173,18 @@ def iterator(self): method does not use the query cache. """ resp = self.execute().json() - self._offset = resp["metadata"].get("continue", initially) + self._continue = resp["metadata"].get("continue", initially) for obj in resp["items"]: yield self.api_obj_class(self.api, obj) @property def query_cache(self): - """ - - :rtype: dict - """ if not self._query_cache: cache = { "objects": [], "response": self.execute().json(), } + # noinspection PyTypeChecker for obj in cache["response"]["items"]: cache["objects"].append(self.api_obj_class(self.api, obj)) self._query_cache = cache @@ -188,10 +198,6 @@ def __iter__(self): @property def response(self): - """ - - :rtype: dict or list[dict] - """ return self.query_cache["response"] From 272a54674d3d3a8cfbda859cbb7993de8e80e66c Mon Sep 17 00:00:00 2001 From: dvazar Date: Sat, 23 Dec 2017 01:11:55 +0700 Subject: [PATCH 5/7] refactoring --- pykube/query.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/pykube/query.py b/pykube/query.py index 0be8890..e86e140 100644 --- a/pykube/query.py +++ b/pykube/query.py @@ -65,7 +65,6 @@ def _collect_params(self, params=None): return params def _build_api_url(self, params=None): - params = self._collect_params(params) return "{}{}".format( self.api_obj_class.endpoint, "?{}".format(urlencode(params)) if params else "", @@ -156,7 +155,7 @@ def watch(self, since=None): return query def execute(self): - kwargs = {"url": self._build_api_url()} + kwargs = {"url": self._build_api_url(params=self._collect_params())} if self.api_obj_class.base: kwargs["base"] = self.api_obj_class.base if self.api_obj_class.version: @@ -207,12 +206,16 @@ def __init__(self, *args, **kwargs): self._resource_version = kwargs.pop("resource_version", None) super(WatchQuery, self).__init__(*args, **kwargs) - def object_stream(self): - params = {"watch": "true"} + def _collect_params(self, params=None): + params = super(WatchQuery, self)._collect_params() + params["watch"] = "true" if self._resource_version is not None: params["resourceVersion"] = self._resource_version + return params + + def object_stream(self): kwargs = { - "url": self._build_api_url(params=params), + "url": self._build_api_url(params=self._collect_params()), "stream": True, } if self._namespace is not all_: From d29f40cc9182f86606c04a51f81e3f2bbd8ccfca Mon Sep 17 00:00:00 2001 From: dvazar Date: Mon, 25 Dec 2017 15:05:51 +0700 Subject: [PATCH 6/7] added: pagination --- pykube/query.py | 80 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 52 insertions(+), 28 deletions(-) diff --git a/pykube/query.py b/pykube/query.py index e86e140..d5a45f2 100644 --- a/pykube/query.py +++ b/pykube/query.py @@ -1,8 +1,8 @@ -from collections import namedtuple +import collections import json -from six import string_types +from six import string_types, class_types from six.moves.urllib.parse import urlencode from .exceptions import ObjectDoesNotExist @@ -11,7 +11,26 @@ all_ = object() everything = object() now = object() -initially = object() + + +def returns_clone(cls_or_func): + cls = None + + def wrap(func): + def inner(self, *args, **kwargs): + clone = self._clone(cls) + func(clone, *args, **kwargs) + return clone + return inner + + if isinstance(cls_or_func, class_types): + cls = cls_or_func + return wrap + + return wrap(cls_or_func) + + +WatchEvent = collections.namedtuple("WatchEvent", "type object") class BaseQuery(object): @@ -28,18 +47,20 @@ def __init__(self, api, api_obj_class, namespace=None): def all(self): return self._clone() + @returns_clone def filter(self, namespace=None, selector=None, field_selector=None, include_uninitialized=None): - clone = self._clone() + """ + :rtype: self + """ if namespace is not None: - clone._namespace = namespace + self._namespace = namespace if selector is not None: - clone._selector = selector + self._selector = selector if field_selector is not None: - clone._field_selector = field_selector + self._field_selector = field_selector if include_uninitialized is not None: - clone._include_uninitialized = include_uninitialized - return clone + self._include_uninitialized = include_uninitialized def _clone(self, cls=None): if cls is None: @@ -77,7 +98,7 @@ def __init__(self, *args, **kwargs): super(Query, self).__init__(*args, **kwargs) self._limit = None - self._continue = initially + self._continue = None self._query_cache = None @@ -91,16 +112,10 @@ def _collect_params(self, params=None): params = super(Query, self)._collect_params(params) if self._limit: params["limit"] = self._limit - if self.continuing: + if self._continue: params["continue"] = self._continue return params - @property - def continuing(self): - if self._continue and self._continue is not initially: - return True - return False - def get_by_name(self, name): kwargs = { "url": "{}/{}".format(self.api_obj_class.endpoint, name), @@ -134,15 +149,13 @@ def get_or_none(self, *args, **kwargs): except ObjectDoesNotExist: return None + @returns_clone def limit(self, lim): - clone = self._clone() - clone._limit = lim - return clone + self._limit = lim + @returns_clone def offset(self, off): - clone = self._clone() - clone._continue = off - return clone + self._continue = off def watch(self, since=None): query = self._clone(WatchQuery) @@ -166,15 +179,27 @@ def execute(self): r.raise_for_status() return r + def _fetch_chunk(self): + resp = self.execute().json() + self._continue = resp["metadata"].get("continue") + for obj in resp["items"]: + yield self.api_obj_class(self.api, obj) + def iterator(self): """ Execute the API request and return an iterator over the objects. This method does not use the query cache. """ - resp = self.execute().json() - self._continue = resp["metadata"].get("continue", initially) - for obj in resp["items"]: - yield self.api_obj_class(self.api, obj) + while True: + for item in self._fetch_chunk(): + yield item + if not self._continue: + break + + def paginate(self, offset=None): + query = self.offset(offset) if offset else self + for item in query._fetch_chunk(): + yield item, self._continue @property def query_cache(self): @@ -224,7 +249,6 @@ def object_stream(self): kwargs["version"] = self.api_obj_class.version r = self.api.get(**kwargs) self.api.raise_for_status(r) - WatchEvent = namedtuple("WatchEvent", "type object") for line in r.iter_lines(): we = json.loads(line.decode("utf-8")) yield WatchEvent( From ca665a818cf991da6d20922a08ec3e7a9c04f81b Mon Sep 17 00:00:00 2001 From: dvazar Date: Mon, 25 Dec 2017 15:50:05 +0700 Subject: [PATCH 7/7] added: API chunking docs --- CHANGELOG.md | 1 + README.rst | 17 +++++++++++++++++ 2 files changed, 18 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 867565f..3ef18d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ * `APIObject` learned `watch` to enable per-object watches * `Deployment` learned to roll back using `rollout_undo` similar to `kubectl rollout undo deployment` * Added `includeUninitialized` query parameter to include partially initialized resources in the response +* `Query` can now receive data by chunks (to perform consistent reads across a large list) ## 0.14.0 diff --git a/README.rst b/README.rst index d66873c..5483625 100644 --- a/README.rst +++ b/README.rst @@ -85,6 +85,23 @@ Selector query: include_uninitialized=True, ) +Consistent reads across a large list: + +.. code:: python + + # Fetch all pods by chunks of size 10 + pods = pykube.Pod.objects(api).all().limit(10).iterator() + + # Pagination + first_pods = pykube.Pod.objects(api).all().limit(10) + for pod, offset in first_pods.paginate(): + pass + second_pods = first_pods.offset(offset) + for pod, offset in second_pods.paginate(): + pass + for pod, offset in second_pods.paginate(offset): + pass + Watch query: .. code:: python