diff --git a/k8sutil/__init__.py b/k8sutil/__init__.py index bceafecbf..02f19b0e8 100644 --- a/k8sutil/__init__.py +++ b/k8sutil/__init__.py @@ -13,3 +13,4 @@ # limitations under the License. from .kube_config import load_kube_config +from .watch import Watch diff --git a/k8sutil/watch.py b/k8sutil/watch.py new file mode 100644 index 000000000..52b0b3b01 --- /dev/null +++ b/k8sutil/watch.py @@ -0,0 +1,123 @@ +# Copyright 2016 The Kubernetes Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import pydoc + +import k8sclient + +PYDOC_RETURN_LABEL = ":return:" + +# Removing this suffix from return type name should give us event's object +# type. e.g., if list_namespaces() returns "NamespaceList" type, +# then list_namespaces(watch=true) returns a stream of events with objects +# of type "Namespace". In case this assumption is not true, user should +# provide return_type to Watch class's __init__. +TYPE_LIST_SUFFIX = "List" + + +class SimpleNamespace: + + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + +def _find_return_type(func): + for line in pydoc.getdoc(func).splitlines(): + if line.startswith(PYDOC_RETURN_LABEL): + return line[len(PYDOC_RETURN_LABEL):].strip() + return "" + + +def iter_resp_lines(resp): + prev = "" + for seg in resp.read_chunked(decode_content=False): + if isinstance(seg, bytes): + seg = seg.decode('utf8') + seg = prev + seg + lines = seg.split("\n") + if not seg.endswith("\n"): + prev = lines[-1] + lines = lines[:-1] + else: + prev = "" + for line in lines: + if line: + yield line + + +class Watch(object): + + def __init__(self, return_type=None): + self._raw_return_type = return_type + self._stop = False + self._api_client = k8sclient.ApiClient() + + def stop(self): + self._stop = True + + def get_return_type(self, func): + if self._raw_return_type: + return self._raw_return_type + return_type = _find_return_type(func) + if return_type.endswith(TYPE_LIST_SUFFIX): + return return_type[:-len(TYPE_LIST_SUFFIX)] + return return_type + + def unmarshal_event(self, data, return_type): + js = json.loads(data) + js['raw_object'] = js['object'] + if return_type: + obj = SimpleNamespace(data=json.dumps(js['raw_object'])) + js['object'] = self._api_client.deserialize(obj, return_type) + return js + + def stream(self, func, *args, **kwargs): + """Watch an API resource and stream the result back via a generator. + + :param func: The API function pointer. Any parameter to the function + can be passed after this parameter. + + :return: Event object with these keys: + 'type': The type of event such as "ADDED", "DELETED", etc. + 'raw_object': a dict representing the watched object. + 'object': A model representation of raw_object. The name of + model will be determined based on + the func's doc string. If it cannot be determined, + 'object' value will be the same as 'raw_object'. + + Example: + v1 = k8sclient.CoreV1Api() + watch = k8sutil.Watch() + for e in watch.stream(v1.list_namespace, resource_version=1127): + type = e['type'] + object = e['object'] # object is one of type return_type + raw_object = e['raw_object'] # raw_object is a dict + ... + if should_stop: + watch.stop() + """ + + return_type = self.get_return_type(func) + kwargs['watch'] = True + kwargs['_preload_content'] = False + resp = func(*args, **kwargs) + try: + for line in iter_resp_lines(resp): + yield self.unmarshal_event(line, return_type) + if self._stop: + break + finally: + resp.close() + resp.release_conn()