Handled UTF-8 edge cases in Watch

This commit is contained in:
David E 2023-08-07 12:20:31 -07:00 committed by yliao
parent ac60e8ac6b
commit 061c257701
2 changed files with 144 additions and 11 deletions

View File

@ -52,20 +52,33 @@ def _find_return_type(func):
def iter_resp_lines(resp):
prev = ""
for seg in resp.stream(amt=None, decode_content=False):
if isinstance(seg, bytes):
seg = seg.decode('utf8')
seg = prev + seg
lines = seg.split("\n")
if not seg.endswith("\n"):
prev = lines[-1]
lines = lines[:-1]
buffer = bytearray()
for segment in resp.stream(amt=None, decode_content=False):
# Append the segment (chunk) to the buffer
#
# Performance note: depending on contents of buffer and the type+value of segment,
# encoding segment into the buffer could be a wasteful step. The approach used here
# simplifies the logic farther down, but in the future it may be reasonable to
# sacrifice readability for performance.
if isinstance(segment, bytes):
buffer.extend(segment)
elif isinstance(segment, str):
buffer.extend(segment.encode("utf-8"))
else:
prev = ""
for line in lines:
raise TypeError(
f"Received invalid segment type, {type(segment)}, from stream. Accepts only 'str' or 'bytes'.")
# Split by newline (safe for utf-8 because multi-byte sequences cannot contain the newline byte)
next_newline = buffer.find(b'\n')
while next_newline != -1:
# Convert bytes to a valid utf-8 string, replacing any invalid utf-8 with the '<27>' character
line = buffer[:next_newline].decode(
"utf-8", errors="replace")
buffer = buffer[next_newline+1:]
if line:
yield line
next_newline = buffer.find(b'\n')
class Watch(object):

View File

@ -61,6 +61,9 @@ class WatchTests(unittest.TestCase):
if count == 4:
w.stop()
# make sure that all three records were consumed by the stream
self.assertEqual(4, count)
fake_api.get_namespaces.assert_called_once_with(
_preload_content=False, watch=True)
fake_resp.stream.assert_called_once_with(
@ -68,6 +71,123 @@ class WatchTests(unittest.TestCase):
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()
def test_watch_with_interspersed_newlines(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.stream = Mock(
return_value=[
'\n',
'{"type": "ADDED", "object": {"metadata":',
'{"name": "test1","resourceVersion": "1"}}}\n{"type": "ADDED", ',
'"object": {"metadata": {"name": "test2", "resourceVersion": "2"}}}\n',
'\n',
'',
'{"type": "ADDED", "object": {"metadata": {"name": "test3", "resourceVersion": "3"}}}\n',
'\n\n\n',
'\n',
])
fake_api = Mock()
fake_api.get_namespaces = Mock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
w = Watch()
count = 0
# Consume all test events from the mock service, stopping when no more data is available.
# Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
# the only way to do so. Without that, the stream will re-read the test data forever.
for e in w.stream(fake_api.get_namespaces, timeout_seconds=1):
count += 1
self.assertEqual("test%d" % count, e['object'].metadata.name)
self.assertEqual(3, count)
def test_watch_with_multibyte_utf8(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.stream = Mock(
return_value=[
# two-byte utf-8 character
'{"type":"MODIFIED","object":{"data":{"utf-8":"© 1"},"metadata":{"name":"test1","resourceVersion":"1"}}}\n',
# same copyright character expressed as bytes
b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xC2\xA9 2"},"metadata":{"name":"test2","resourceVersion":"2"}}}\n'
# same copyright character with bytes split across two stream chunks
b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xC2',
b'\xA9 3"},"metadata":{"n',
# more chunks of the same event, sent as a mix of bytes and strings
'ame":"test3","resourceVersion":"3"',
'}}}',
b'\n'
])
fake_api = Mock()
fake_api.get_configmaps = Mock(return_value=fake_resp)
fake_api.get_configmaps.__doc__ = ':return: V1ConfigMapList'
w = Watch()
count = 0
# Consume all test events from the mock service, stopping when no more data is available.
# Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
# the only way to do so. Without that, the stream will re-read the test data forever.
for event in w.stream(fake_api.get_configmaps, timeout_seconds=1):
count += 1
self.assertEqual("MODIFIED", event['type'])
self.assertEqual("test%d" % count, event['object'].metadata.name)
self.assertEqual("© %d" % count, event['object'].data["utf-8"])
self.assertEqual(
"%d" % count, event['object'].metadata.resource_version)
self.assertEqual("%d" % count, w.resource_version)
self.assertEqual(3, count)
def test_watch_with_invalid_utf8(self):
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.stream = Mock(
# test 1 uses 1 invalid utf-8 byte
# test 2 uses a sequence of 2 invalid utf-8 bytes
# test 3 uses a sequence of 3 invalid utf-8 bytes
return_value=[
# utf-8 sequence for 😄 is \xF0\x9F\x98\x84
# all other sequences below are invalid
# ref: https://www.w3.org/2001/06/utf-8-wrong/UTF-8-test.html
b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xF0\x9F\x98\x84 1","invalid":"\x80 1"},"metadata":{"name":"test1"}}}\n',
b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xF0\x9F\x98\x84 2","invalid":"\xC0\xAF 2"},"metadata":{"name":"test2"}}}\n',
# mix bytes/strings and split byte sequences across chunks
b'{"type":"MODIFIED","object":{"data":{"utf-8":"\xF0\x9F\x98',
b'\x84 ',
b'',
b'3","invalid":"\xE0\x80',
b'\xAF ',
'3"},"metadata":{"n',
'ame":"test3"',
'}}}',
b'\n'
])
fake_api = Mock()
fake_api.get_configmaps = Mock(return_value=fake_resp)
fake_api.get_configmaps.__doc__ = ':return: V1ConfigMapList'
w = Watch()
count = 0
# Consume all test events from the mock service, stopping when no more data is available.
# Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
# the only way to do so. Without that, the stream will re-read the test data forever.
for event in w.stream(fake_api.get_configmaps, timeout_seconds=1):
count += 1
self.assertEqual("MODIFIED", event['type'])
self.assertEqual("test%d" % count, event['object'].metadata.name)
self.assertEqual("😄 %d" % count, event['object'].data["utf-8"])
# expect N replacement characters in test N
self.assertEqual("<EFBFBD> %d".replace('<EFBFBD>', '<EFBFBD>'*count) %
count, event['object'].data["invalid"])
self.assertEqual(3, count)
def test_watch_for_follow(self):
fake_resp = Mock()
fake_resp.close = Mock()