commit
ded3d12e04
@ -12,6 +12,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import http
|
||||
import json
|
||||
import pydoc
|
||||
|
||||
@ -86,7 +87,7 @@ class Watch(object):
|
||||
def unmarshal_event(self, data, return_type):
|
||||
js = json.loads(data)
|
||||
js['raw_object'] = js['object']
|
||||
if return_type:
|
||||
if return_type and js['type'] != 'ERROR':
|
||||
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
|
||||
js['object'] = self._api_client.deserialize(obj, return_type)
|
||||
if hasattr(js['object'], 'metadata'):
|
||||
@ -102,6 +103,14 @@ class Watch(object):
|
||||
def stream(self, func, *args, **kwargs):
|
||||
"""Watch an API resource and stream the result back via a generator.
|
||||
|
||||
Note that watching an API resource can expire. The method tries to
|
||||
resume automatically once from the last result, but if that last result
|
||||
is too old as well, an `ApiException` exception will be thrown with
|
||||
``code`` 410. In that case you have to recover yourself, probably
|
||||
by listing the API resource to obtain the latest state and then
|
||||
watching from that state on by setting ``resource_version`` to
|
||||
one returned from listing.
|
||||
|
||||
:param func: The API function pointer. Any parameter to the function
|
||||
can be passed after this parameter.
|
||||
|
||||
@ -134,6 +143,7 @@ class Watch(object):
|
||||
self.resource_version = kwargs['resource_version']
|
||||
|
||||
timeouts = ('timeout_seconds' in kwargs)
|
||||
retry_after_410 = False
|
||||
while True:
|
||||
resp = func(*args, **kwargs)
|
||||
try:
|
||||
@ -141,7 +151,23 @@ class Watch(object):
|
||||
# unmarshal when we are receiving events from watch,
|
||||
# return raw string when we are streaming log
|
||||
if watch_arg == "watch":
|
||||
yield self.unmarshal_event(line, return_type)
|
||||
event = self.unmarshal_event(line, return_type)
|
||||
if isinstance(event, dict) \
|
||||
and event['type'] == 'ERROR':
|
||||
obj = event['raw_object']
|
||||
# Current request expired, let's retry,
|
||||
# but only if we have not already retried.
|
||||
if not retry_after_410 and \
|
||||
obj['code'] == http.HTTPStatus.GONE:
|
||||
retry_after_410 = True
|
||||
break
|
||||
else:
|
||||
reason = "%s: %s" % (obj['reason'], obj['message'])
|
||||
raise client.rest.ApiException(status=obj['code'],
|
||||
reason=reason)
|
||||
else:
|
||||
retry_after_410 = False
|
||||
yield event
|
||||
else:
|
||||
yield line
|
||||
if self._stop:
|
||||
|
||||
@ -16,6 +16,8 @@ import unittest
|
||||
|
||||
from mock import Mock, call
|
||||
|
||||
from kubernetes import client
|
||||
|
||||
from .watch import Watch
|
||||
|
||||
|
||||
@ -273,6 +275,31 @@ class WatchTests(unittest.TestCase):
|
||||
fake_resp.close.assert_called_once()
|
||||
fake_resp.release_conn.assert_called_once()
|
||||
|
||||
def test_watch_with_error_event(self):
|
||||
fake_resp = Mock()
|
||||
fake_resp.close = Mock()
|
||||
fake_resp.release_conn = Mock()
|
||||
fake_resp.read_chunked = Mock(
|
||||
return_value=[
|
||||
'{"type": "ERROR", "object": {"code": 410, '
|
||||
'"reason": "Gone", "message": "error message"}}\n'])
|
||||
|
||||
fake_api = Mock()
|
||||
fake_api.get_thing = Mock(return_value=fake_resp)
|
||||
|
||||
w = Watch()
|
||||
try:
|
||||
for _ in w.stream(fake_api.get_thing):
|
||||
self.fail(self, "Should fail with ApiException.")
|
||||
except client.rest.ApiException:
|
||||
pass
|
||||
|
||||
fake_api.get_thing.assert_called_once_with(
|
||||
_preload_content=False, watch=True)
|
||||
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
|
||||
fake_resp.close.assert_called_once()
|
||||
fake_resp.release_conn.assert_called_once()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user