Add flag to enable keep the watch action working all the time
Fixes issue: https://github.com/kubernetes-incubator/client-python/issues/124
This commit is contained in:
parent
9213876f0b
commit
8f7b490086
@ -83,12 +83,14 @@ class Watch(object):
|
||||
js['object'] = self._api_client.deserialize(obj, return_type)
|
||||
return js
|
||||
|
||||
def stream(self, func, *args, **kwargs):
|
||||
def stream(self, func, keep=False, *args, **kwargs):
|
||||
"""Watch an API resource and stream the result back via a generator.
|
||||
|
||||
:param func: The API function pointer. Any parameter to the function
|
||||
can be passed after this parameter.
|
||||
|
||||
:param keep: Flag to keep the watch work all the time.
|
||||
|
||||
:return: Event object with these keys:
|
||||
'type': The type of event such as "ADDED", "DELETED", etc.
|
||||
'raw_object': a dict representing the watched object.
|
||||
@ -113,12 +115,17 @@ class Watch(object):
|
||||
return_type = self.get_return_type(func)
|
||||
kwargs['watch'] = True
|
||||
kwargs['_preload_content'] = False
|
||||
resp = func(*args, **kwargs)
|
||||
try:
|
||||
for line in iter_resp_lines(resp):
|
||||
yield self.unmarshal_event(line, return_type)
|
||||
if self._stop:
|
||||
break
|
||||
finally:
|
||||
resp.close()
|
||||
resp.release_conn()
|
||||
|
||||
while True:
|
||||
resp = func(*args, **kwargs)
|
||||
try:
|
||||
for line in iter_resp_lines(resp):
|
||||
yield self.unmarshal_event(line, return_type)
|
||||
if self._stop:
|
||||
break
|
||||
finally:
|
||||
resp.close()
|
||||
resp.release_conn()
|
||||
|
||||
if not keep or self._stop:
|
||||
break
|
||||
|
||||
@ -85,6 +85,36 @@ class WatchTests(unittest.TestCase):
|
||||
fake_resp.close.assert_called_once()
|
||||
fake_resp.release_conn.assert_called_once()
|
||||
|
||||
def test_watch_stream_keep(self):
|
||||
w = Watch(float)
|
||||
|
||||
fake_resp = Mock()
|
||||
fake_resp.close = Mock()
|
||||
fake_resp.release_conn = Mock()
|
||||
fake_resp.read_chunked = Mock(
|
||||
return_value=['{"type": "ADDED", "object": 1}\n'])
|
||||
|
||||
fake_api = Mock()
|
||||
fake_api.get_namespaces = Mock(return_value=fake_resp)
|
||||
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
|
||||
|
||||
count = 0
|
||||
for e in w.stream(fake_api.get_namespaces):
|
||||
count = count + 1
|
||||
|
||||
self.assertEqual(count, 1)
|
||||
|
||||
for e in w.stream(fake_api.get_namespaces, True):
|
||||
count = count + 1
|
||||
if count == 2:
|
||||
w.stop()
|
||||
|
||||
self.assertEqual(count, 2)
|
||||
self.assertEqual(fake_api.get_namespaces.call_count, 2)
|
||||
self.assertEqual(fake_resp.read_chunked.call_count, 2)
|
||||
self.assertEqual(fake_resp.close.call_count, 2)
|
||||
self.assertEqual(fake_resp.release_conn.call_count, 2)
|
||||
|
||||
def test_unmarshal_with_float_object(self):
|
||||
w = Watch()
|
||||
event = w.unmarshal_event('{"type": "ADDED", "object": 1}', 'float')
|
||||
|
||||
Loading…
Reference in New Issue
Block a user