Merge pull request #2372 from p172913/master

Changes made in wacth.py to print Empty newlines that are skipped when watching pod logs.
This commit is contained in:
Kubernetes Prow Robot 2025-03-21 10:56:32 -07:00 committed by GitHub
commit bd323606b9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 116 additions and 20 deletions

View File

@ -78,6 +78,8 @@ def iter_resp_lines(resp):
buffer = buffer[next_newline+1:]
if line:
yield line
else:
yield '' # Only print one empty line
next_newline = buffer.find(b'\n')
@ -107,24 +109,29 @@ class Watch(object):
return 'watch'
def unmarshal_event(self, data, return_type):
js = json.loads(data)
js['raw_object'] = js['object']
# BOOKMARK event is treated the same as ERROR for a quick fix of
# decoding exception
# TODO: make use of the resource_version in BOOKMARK event for more
# efficient WATCH
if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK':
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version
# For custom objects that we don't have model defined, json
# deserialization results in dictionary
elif (isinstance(js['object'], dict) and 'metadata' in js['object']
and 'resourceVersion' in js['object']['metadata']):
self.resource_version = js['object']['metadata'][
'resourceVersion']
return js
if not data or data.isspace():
return None
try:
js = json.loads(data)
js['raw_object'] = js['object']
# BOOKMARK event is treated the same as ERROR for a quick fix of
# decoding exception
# TODO: make use of the resource_version in BOOKMARK event for more
# efficient WATCH
if return_type and js['type'] != 'ERROR' and js['type'] != 'BOOKMARK':
obj = SimpleNamespace(data=json.dumps(js['raw_object']))
js['object'] = self._api_client.deserialize(obj, return_type)
if hasattr(js['object'], 'metadata'):
self.resource_version = js['object'].metadata.resource_version
# For custom objects that we don't have model defined, json
# deserialization results in dictionary
elif (isinstance(js['object'], dict) and 'metadata' in js['object']
and 'resourceVersion' in js['object']['metadata']):
self.resource_version = js['object']['metadata'][
'resourceVersion']
return js
except json.JSONDecodeError:
return None
def stream(self, func, *args, **kwargs):
"""Watch an API resource and stream the result back via a generator.
@ -198,7 +205,10 @@ class Watch(object):
retry_after_410 = False
yield event
else:
yield line
if line:
yield line # Normal non-empty line
else:
yield '' # Only yield one empty line
if self._stop:
break
finally:

View File

@ -14,12 +14,18 @@
import unittest
import os
import time
from unittest.mock import Mock, call
from kubernetes import client
from kubernetes import client,config
from .watch import Watch
from kubernetes.client import ApiException
class WatchTests(unittest.TestCase):
def setUp(self):
@ -99,6 +105,9 @@ class WatchTests(unittest.TestCase):
# Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
# the only way to do so. Without that, the stream will re-read the test data forever.
for e in w.stream(fake_api.get_namespaces, timeout_seconds=1):
# Here added a statement for exception for empty lines.
if e is None:
continue
count += 1
self.assertEqual("test%d" % count, e['object'].metadata.name)
self.assertEqual(3, count)
@ -488,7 +497,84 @@ class WatchTests(unittest.TestCase):
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()
@classmethod
def setUpClass(cls):
cls.api = Mock()
cls.namespace = "default"
def test_pod_log_empty_lines(self):
pod_name = "demo-bug"
try:
self.api.create_namespaced_pod = Mock()
self.api.read_namespaced_pod = Mock()
self.api.delete_namespaced_pod = Mock()
self.api.read_namespaced_pod_log = Mock()
#pod creating step
self.api.create_namespaced_pod.return_value = None
#Checking pod status
mock_pod = Mock()
mock_pod.status.phase = "Running"
self.api.read_namespaced_pod.return_value = mock_pod
# Printing at pod output
self.api.read_namespaced_pod_log.return_value = iter(["Hello from Docker\n"])
# Wait for the pod to reach 'Running'
timeout = 60
start_time = time.time()
while time.time() - start_time < timeout:
pod = self.api.read_namespaced_pod(name=pod_name, namespace=self.namespace)
if pod.status.phase == "Running":
break
time.sleep(2)
else:
self.fail("Pod did not reach 'Running' state within timeout")
# Reading and streaming logs using Watch (mocked)
w = Watch()
log_output = []
#Mock logs used for this test
w.stream = Mock(return_value=[
"Hello from Docker",
"",
"",
"\n\n",
"Another log line",
"",
"\n",
"Final log"
])
for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True):
log_output.append(event)
print(event)
# Print outputs
print(f"Captured logs: {log_output}")
# self.assertTrue(any("Hello from Docker" in line for line in log_output))
# self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs")
expected_log = [
"Hello from Docker",
"",
"",
"\n\n",
"Another log line",
"",
"\n",
"Final log"
]
self.assertEqual(log_output, expected_log, "Captured logs do not match expected logs")
except ApiException as e:
self.fail(f"Kubernetes API exception: {e}")
finally:
#checking pod is calling for delete
self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace)
self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace)
if __name__ == '__main__':
unittest.main()