Fixes kubernetes-client/python issue 1047 "ResponseNotChunked from watch"

In recent versions of K8S (>1.16?), when a `Watch.stream()` call uses a
resource_version which is too old the resulting 410 error is wrapped in JSON
and returned in a non-chunked 200 response. Using `resp.stream()` instead of
`resp.read_chunked()` automatically handles the response being either chunked or
non-chunked.
This commit is contained in:
Darren Hague 2021-04-08 13:49:46 +01:00
parent fb425a3bec
commit 90399663f3
2 changed files with 26 additions and 20 deletions

View File

@ -53,7 +53,7 @@ def _find_return_type(func):
def iter_resp_lines(resp):
prev = ""
for seg in resp.read_chunked(decode_content=False):
for seg in resp.stream(amt=None, decode_content=False):
if isinstance(seg, bytes):
seg = seg.decode('utf8')
seg = prev + seg

View File

@ -30,7 +30,7 @@ class WatchTests(unittest.TestCase):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'{"type": "ADDED", "object": {"metadata": {"name": "test1",'
'"resourceVersion": "1"}, "spec": {}, "status": {}}}\n',
@ -63,7 +63,8 @@ class WatchTests(unittest.TestCase):
fake_api.get_namespaces.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()
@ -71,7 +72,7 @@ class WatchTests(unittest.TestCase):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'log_line_1\n',
'log_line_2\n'])
@ -92,7 +93,8 @@ class WatchTests(unittest.TestCase):
fake_api.read_namespaced_pod_log.assert_called_once_with(
_preload_content=False, follow=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()
@ -112,6 +114,7 @@ class WatchTests(unittest.TestCase):
'{"type": "ADDED", "object": {"metadata": {"name": "test3",'
'"resourceVersion": "3"}, "spec": {}, "status": {}}}\n'
]
# return nothing on the first call and values on the second
# this emulates a watch from a rv that returns nothing in the first k8s
# watch reset and values later
@ -123,7 +126,7 @@ class WatchTests(unittest.TestCase):
else:
return values
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
side_effect=get_values)
fake_api = Mock()
@ -170,7 +173,7 @@ class WatchTests(unittest.TestCase):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=['{"type": "ADDED", "object": 1}\n'] * 4)
fake_api = Mock()
@ -186,8 +189,8 @@ class WatchTests(unittest.TestCase):
self.assertEqual(count, 3)
fake_api.get_namespaces.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.read_chunked.assert_called_once_with(
decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()
@ -197,7 +200,7 @@ class WatchTests(unittest.TestCase):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=['{"type": "ADDED", "object": 1}\n'])
fake_api = Mock()
@ -219,7 +222,7 @@ class WatchTests(unittest.TestCase):
self.assertEqual(count, 2)
self.assertEqual(fake_api.get_namespaces.call_count, 2)
self.assertEqual(fake_resp.read_chunked.call_count, 2)
self.assertEqual(fake_resp.stream.call_count, 2)
self.assertEqual(fake_resp.close.call_count, 2)
self.assertEqual(fake_resp.release_conn.call_count, 2)
@ -256,7 +259,7 @@ class WatchTests(unittest.TestCase):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(side_effect=KeyError('expected'))
fake_resp.stream = Mock(side_effect=KeyError('expected'))
fake_api = Mock()
fake_api.get_thing = Mock(return_value=fake_resp)
@ -271,7 +274,8 @@ class WatchTests(unittest.TestCase):
fake_api.get_thing.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()
@ -279,7 +283,7 @@ class WatchTests(unittest.TestCase):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'{"type": "ERROR", "object": {"code": 410, '
'"reason": "Gone", "message": "error message"}}\n'])
@ -294,7 +298,8 @@ class WatchTests(unittest.TestCase):
fake_api.get_thing.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()
@ -302,7 +307,7 @@ class WatchTests(unittest.TestCase):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'{"type": "ERROR", "object": {"code": 410, '
'"reason": "Gone", "message": "error message"}}\n'])
@ -320,8 +325,8 @@ class WatchTests(unittest.TestCase):
# Two calls should be expected during a retry
fake_api.get_thing.assert_has_calls(
[call(resource_version=0, _preload_content=False, watch=True)] * 2)
fake_resp.read_chunked.assert_has_calls(
[call(decode_content=False)] * 2)
fake_resp.stream.assert_has_calls(
[call(amt=None, decode_content=False)] * 2)
assert fake_resp.close.call_count == 2
assert fake_resp.release_conn.call_count == 2
@ -329,7 +334,7 @@ class WatchTests(unittest.TestCase):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.read_chunked = Mock(
fake_resp.stream = Mock(
return_value=[
'{"type": "ERROR", "object": {"code": 410, '
'"reason": "Gone", "message": "error message"}}\n'])
@ -346,7 +351,8 @@ class WatchTests(unittest.TestCase):
fake_api.get_thing.assert_called_once_with(
_preload_content=False, watch=True, timeout_seconds=10)
fake_resp.read_chunked.assert_called_once_with(decode_content=False)
fake_resp.stream.assert_called_once_with(
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()