Merge pull request #2406 from Kevinz857/feat-deserialize-control-v2
Some checks failed
End to End Tests - master / build (3.10) (push) Has been cancelled
End to End Tests - master / build (3.8) (push) Has been cancelled
End to End Tests - master / build (3.9) (push) Has been cancelled
Kubernetes Python Client - Validation / build (3.10) (push) Has been cancelled
Kubernetes Python Client - Validation / build (3.11) (push) Has been cancelled
Kubernetes Python Client - Validation / build (3.12) (push) Has been cancelled
Kubernetes Python Client - Validation / build (3.8) (push) Has been cancelled
Kubernetes Python Client - Validation / build (3.9, coverage) (push) Has been cancelled

feat: Add option to control deserialization when watching events
This commit is contained in:
Kubernetes Prow Robot 2025-06-24 13:18:27 -07:00 committed by GitHub
commit c330b84e5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 45 additions and 1 deletions

View File

@ -179,6 +179,7 @@ class Watch(object):
# We want to ensure we are returning within that timeout.
disable_retries = ('timeout_seconds' in kwargs)
retry_after_410 = False
deserialize = kwargs.pop('deserialize', True)
while True:
resp = func(*args, **kwargs)
try:
@ -186,7 +187,11 @@ class Watch(object):
# unmarshal when we are receiving events from watch,
# return raw string when we are streaming log
if watch_arg == "watch":
event = self.unmarshal_event(line, return_type)
if deserialize:
event = self.unmarshal_event(line, return_type)
else:
# Only do basic JSON parsing, no deserialize
event = json.loads(line)
if isinstance(event, dict) \
and event['type'] == 'ERROR':
obj = event['raw_object']

View File

@ -576,5 +576,44 @@ class WatchTests(unittest.TestCase):
self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace)
self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace)
if __name__ == '__main__':
def test_watch_with_deserialize_param(self):
"""test watch.stream() deserialize param"""
# prepare test data
test_json = '{"type": "ADDED", "object": {"metadata": {"name": "test1", "resourceVersion": "1"}, "spec": {}, "status": {}}}'
fake_resp = Mock()
fake_resp.close = Mock()
fake_resp.release_conn = Mock()
fake_resp.stream = Mock(return_value=[test_json + '\n'])
fake_api = Mock()
fake_api.get_namespaces = Mock(return_value=fake_resp)
fake_api.get_namespaces.__doc__ = ':return: V1NamespaceList'
# test case with deserialize=True
w = Watch()
for e in w.stream(fake_api.get_namespaces, deserialize=True):
self.assertEqual("ADDED", e['type'])
# Verify that the object is deserialized correctly
self.assertTrue(hasattr(e['object'], 'metadata'))
self.assertEqual("test1", e['object'].metadata.name)
self.assertEqual("1", e['object'].metadata.resource_version)
# Verify that the original object is saved
self.assertEqual(json.loads(test_json)['object'], e['raw_object'])
# test case with deserialize=False
w = Watch()
for e in w.stream(fake_api.get_namespaces, deserialize=False):
self.assertEqual("ADDED", e['type'])
# The validation object remains in the original dictionary format
self.assertIsInstance(e['object'], dict)
self.assertEqual("test1", e['object']['metadata']['name'])
self.assertEqual("1", e['object']['metadata']['resourceVersion'])
# verify the api is called twice
fake_api.get_namespaces.assert_has_calls([
call(_preload_content=False, watch=True),
call(_preload_content=False, watch=True)
])
if __name__ == '__main__':
unittest.main()