Update continue the watch with resource_version
This commit is contained in:
parent
8f7b490086
commit
aec1c5259a
@ -63,6 +63,7 @@ class Watch(object):
|
||||
self._raw_return_type = return_type
|
||||
self._stop = False
|
||||
self._api_client = client.ApiClient()
|
||||
self.resource_version = 0
|
||||
|
||||
def stop(self):
|
||||
self._stop = True
|
||||
@ -81,16 +82,16 @@ class Watch(object):
|
||||
if return_type:
|
||||
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
|
||||
js['object'] = self._api_client.deserialize(obj, return_type)
|
||||
if hasattr(js['object'], 'metadata'):
|
||||
self.resource_version = js['object'].metadata.resource_version
|
||||
return js
|
||||
|
||||
def stream(self, func, keep=False, *args, **kwargs):
|
||||
def stream(self, func, *args, **kwargs):
|
||||
"""Watch an API resource and stream the result back via a generator.
|
||||
|
||||
:param func: The API function pointer. Any parameter to the function
|
||||
can be passed after this parameter.
|
||||
|
||||
:param keep: Flag to keep the watch work all the time.
|
||||
|
||||
:return: Event object with these keys:
|
||||
'type': The type of event such as "ADDED", "DELETED", etc.
|
||||
'raw_object': a dict representing the watched object.
|
||||
@ -116,6 +117,7 @@ class Watch(object):
|
||||
kwargs['watch'] = True
|
||||
kwargs['_preload_content'] = False
|
||||
|
||||
timeouts = ('timeout_seconds' in kwargs)
|
||||
while True:
|
||||
resp = func(*args, **kwargs)
|
||||
try:
|
||||
@ -124,8 +126,9 @@ class Watch(object):
|
||||
if self._stop:
|
||||
break
|
||||
finally:
|
||||
kwargs['resource_version'] = self.resource_version
|
||||
resp.close()
|
||||
resp.release_conn()
|
||||
|
||||
if not keep or self._stop:
|
||||
if timeouts or self._stop:
|
||||
break
|
||||
|
||||
@ -85,7 +85,7 @@ class WatchTests(unittest.TestCase):
|
||||
fake_resp.close.assert_called_once()
|
||||
fake_resp.release_conn.assert_called_once()
|
||||
|
||||
def test_watch_stream_keep(self):
|
||||
def test_watch_stream_loop(self):
|
||||
w = Watch(float)
|
||||
|
||||
fake_resp = Mock()
|
||||
@ -99,12 +99,14 @@ class WatchTests(unittest.TestCase):
|
||||
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
|
||||
|
||||
count = 0
|
||||
for e in w.stream(fake_api.get_namespaces):
|
||||
count = count + 1
|
||||
|
||||
# when timeout_seconds is set, auto-exist when timeout reaches
|
||||
for e in w.stream(fake_api.get_namespaces, timeout_seconds=1):
|
||||
count = count + 1
|
||||
self.assertEqual(count, 1)
|
||||
|
||||
for e in w.stream(fake_api.get_namespaces, True):
|
||||
# when no timeout_seconds, only exist when w.stop() is called
|
||||
for e in w.stream(fake_api.get_namespaces):
|
||||
count = count + 1
|
||||
if count == 2:
|
||||
w.stop()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user