diff --git a/.gitignore b/.gitignore index 26e63b2..fcd5a6d 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,6 @@ __pycache__ *.pyo build/ dist/ +.idea/ +tmp/ +Pipfile* diff --git a/CHANGELOG.md b/CHANGELOG.md index b1fb79c..3ef18d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,8 @@ * `Query.watch` learned to carry forward all query parameters * `APIObject` learned `watch` to enable per-object watches * `Deployment` learned to roll back using `rollout_undo` similar to `kubectl rollout undo deployment` +* Added `includeUninitialized` query parameter to include partially initialized resources in the response +* `Query` can now receive data by chunks (to perform consistent reads across a large list) ## 0.14.0 diff --git a/README.rst b/README.rst index 5d0d95b..5483625 100644 --- a/README.rst +++ b/README.rst @@ -80,6 +80,27 @@ Selector query: pending_pods = pykube.objects.Pod.objects(api).filter( field_selector={"status.phase": "Pending"} ) + uninitialized_pods = pykube.objects.Pod.objects(api).filter( + selector={"environment": "production", "tier": "frontend"}, + include_uninitialized=True, + ) + +Consistent reads across a large list: + +.. code:: python + + # Fetch all pods by chunks of size 10 + pods = pykube.Pod.objects(api).all().limit(10).iterator() + + # Pagination + first_pods = pykube.Pod.objects(api).all().limit(10) + for pod, offset in first_pods.paginate(): + pass + second_pods = first_pods.offset(offset) + for pod, offset in second_pods.paginate(): + pass + for pod, offset in second_pods.paginate(offset): + pass Watch query: diff --git a/pykube/query.py b/pykube/query.py index 31d4b3b..d5a45f2 100644 --- a/pykube/query.py +++ b/pykube/query.py @@ -1,8 +1,8 @@ -import json -from collections import namedtuple +import collections +import json -from six import string_types +from six import string_types, class_types from six.moves.urllib.parse import urlencode from .exceptions import ObjectDoesNotExist @@ -13,53 +13,113 @@ now = object() +def returns_clone(cls_or_func): + cls = None + + def wrap(func): + def inner(self, *args, **kwargs): + clone = self._clone(cls) + func(clone, *args, **kwargs) + return clone + return inner + + if isinstance(cls_or_func, class_types): + cls = cls_or_func + return wrap + + return wrap(cls_or_func) + + +WatchEvent = collections.namedtuple("WatchEvent", "type object") + + class BaseQuery(object): def __init__(self, api, api_obj_class, namespace=None): self.api = api self.api_obj_class = api_obj_class - self.namespace = namespace - self.selector = everything - self.field_selector = everything + + self._namespace = namespace + self._selector = everything + self._field_selector = everything + self._include_uninitialized = None def all(self): return self._clone() - def filter(self, namespace=None, selector=None, field_selector=None): - clone = self._clone() + @returns_clone + def filter(self, namespace=None, selector=None, field_selector=None, + include_uninitialized=None): + """ + :rtype: self + """ if namespace is not None: - clone.namespace = namespace + self._namespace = namespace if selector is not None: - clone.selector = selector + self._selector = selector if field_selector is not None: - clone.field_selector = field_selector - return clone + self._field_selector = field_selector + if include_uninitialized is not None: + self._include_uninitialized = include_uninitialized def _clone(self, cls=None): if cls is None: cls = self.__class__ - clone = cls(self.api, self.api_obj_class, namespace=self.namespace) - clone.selector = self.selector - clone.field_selector = self.field_selector + clone = cls(self.api, self.api_obj_class, namespace=self._namespace) + clone._selector = self._selector + clone._field_selector = self._field_selector + clone._include_uninitialized = self._include_uninitialized return clone - def _build_api_url(self, params=None): + def _collect_params(self, params=None): if params is None: params = {} - if self.selector is not everything: - params["labelSelector"] = as_selector(self.selector) - if self.field_selector is not everything: - params["fieldSelector"] = as_selector(self.field_selector) - query_string = urlencode(params) - return "{}{}".format(self.api_obj_class.endpoint, "?{}".format(query_string) if query_string else "") + if self._selector is not everything: + # noinspection PyTypeChecker + params["labelSelector"] = as_selector(self._selector) + if self._field_selector is not everything: + # noinspection PyTypeChecker + params["fieldSelector"] = as_selector(self._field_selector) + if self._include_uninitialized is not None: + params["includeUninitialized"] = \ + "true" if self._include_uninitialized else "false" + return params + + def _build_api_url(self, params=None): + return "{}{}".format( + self.api_obj_class.endpoint, + "?{}".format(urlencode(params)) if params else "", + ) class Query(BaseQuery): + def __init__(self, *args, **kwargs): + super(Query, self).__init__(*args, **kwargs) + + self._limit = None + self._continue = None + + self._query_cache = None + + def _clone(self, cls=None): + clone = super(Query, self)._clone(cls) + clone._limit = self._limit + clone._continue = self._continue + return clone + + def _collect_params(self, params=None): + params = super(Query, self)._collect_params(params) + if self._limit: + params["limit"] = self._limit + if self._continue: + params["continue"] = self._continue + return params + def get_by_name(self, name): kwargs = { "url": "{}/{}".format(self.api_obj_class.endpoint, name), - "namespace": self.namespace, + "namespace": self._namespace, } if self.api_obj_class.base: kwargs["base"] = self.api_obj_class.base @@ -89,40 +149,67 @@ def get_or_none(self, *args, **kwargs): except ObjectDoesNotExist: return None + @returns_clone + def limit(self, lim): + self._limit = lim + + @returns_clone + def offset(self, off): + self._continue = off + def watch(self, since=None): query = self._clone(WatchQuery) if since is now: - query.resource_version = self.response["metadata"]["resourceVersion"] + # noinspection PyTypeChecker + query._resource_version = \ + self.response["metadata"]["resourceVersion"] elif since is not None: - query.resource_version = since + query._resource_version = since return query def execute(self): - kwargs = {"url": self._build_api_url()} + kwargs = {"url": self._build_api_url(params=self._collect_params())} if self.api_obj_class.base: kwargs["base"] = self.api_obj_class.base if self.api_obj_class.version: kwargs["version"] = self.api_obj_class.version - if self.namespace is not None and self.namespace is not all_: - kwargs["namespace"] = self.namespace + if self._namespace is not None and self._namespace is not all_: + kwargs["namespace"] = self._namespace r = self.api.get(**kwargs) r.raise_for_status() return r + def _fetch_chunk(self): + resp = self.execute().json() + self._continue = resp["metadata"].get("continue") + for obj in resp["items"]: + yield self.api_obj_class(self.api, obj) + def iterator(self): """ Execute the API request and return an iterator over the objects. This method does not use the query cache. """ - for obj in (self.execute().json().get("items") or []): - yield self.api_obj_class(self.api, obj) + while True: + for item in self._fetch_chunk(): + yield item + if not self._continue: + break + + def paginate(self, offset=None): + query = self.offset(offset) if offset else self + for item in query._fetch_chunk(): + yield item, self._continue @property def query_cache(self): - if not hasattr(self, "_query_cache"): - cache = {"objects": []} - cache["response"] = self.execute().json() - for obj in (cache["response"].get("items") or []): + if not self._query_cache: + cache = { + "objects": [], + "response": self.execute().json(), + } + # noinspection PyTypeChecker + for obj in cache["response"]["items"]: cache["objects"].append(self.api_obj_class(self.api, obj)) self._query_cache = cache return self._query_cache @@ -141,27 +228,33 @@ def response(self): class WatchQuery(BaseQuery): def __init__(self, *args, **kwargs): - self.resource_version = kwargs.pop("resource_version", None) + self._resource_version = kwargs.pop("resource_version", None) super(WatchQuery, self).__init__(*args, **kwargs) + def _collect_params(self, params=None): + params = super(WatchQuery, self)._collect_params() + params["watch"] = "true" + if self._resource_version is not None: + params["resourceVersion"] = self._resource_version + return params + def object_stream(self): - params = {"watch": "true"} - if self.resource_version is not None: - params["resourceVersion"] = self.resource_version kwargs = { - "url": self._build_api_url(params=params), + "url": self._build_api_url(params=self._collect_params()), "stream": True, } - if self.namespace is not all_: - kwargs["namespace"] = self.namespace + if self._namespace is not all_: + kwargs["namespace"] = self._namespace if self.api_obj_class.version: kwargs["version"] = self.api_obj_class.version r = self.api.get(**kwargs) self.api.raise_for_status(r) - WatchEvent = namedtuple("WatchEvent", "type object") for line in r.iter_lines(): we = json.loads(line.decode("utf-8")) - yield WatchEvent(type=we["type"], object=self.api_obj_class(self.api, we["object"])) + yield WatchEvent( + type=we["type"], + object=self.api_obj_class(self.api, we["object"]), + ) def __iter__(self): return iter(self.object_stream()) @@ -190,5 +283,6 @@ def as_selector(value): elif op == "notin": s.append("{} notin ({})".format(label, ",".join(v))) else: - raise ValueError("{} is not a valid comparison operator".format(op)) + raise ValueError("{} is not a valid comparison " + "operator".format(op)) return ",".join(s)