From 3cee537e633f71bec43f25fbc09f54c9be3549e6 Mon Sep 17 00:00:00 2001 From: Raj Bhargav <72274012+p172913@users.noreply.github.com> Date: Tue, 18 Mar 2025 13:46:41 +0530 Subject: [PATCH] Update watch_test.py Added a unit test name test_pod_log_empty_lines to check if watch is printing empty lines. And made changes in test_watch_with_interspersed_newlines as the watch is also printing empty line. Added a condition to check for empty lines in ogs to avoid the errors. --- kubernetes/base/watch/watch_test.py | 86 ++++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) diff --git a/kubernetes/base/watch/watch_test.py b/kubernetes/base/watch/watch_test.py index c5bc5c378..fa079363c 100644 --- a/kubernetes/base/watch/watch_test.py +++ b/kubernetes/base/watch/watch_test.py @@ -14,12 +14,18 @@ import unittest +import os + +import time + from unittest.mock import Mock, call -from kubernetes import client +from kubernetes import client,config from .watch import Watch +from kubernetes.client import ApiException + class WatchTests(unittest.TestCase): def setUp(self): @@ -99,6 +105,9 @@ class WatchTests(unittest.TestCase): # Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is # the only way to do so. Without that, the stream will re-read the test data forever. for e in w.stream(fake_api.get_namespaces, timeout_seconds=1): + # Here added a statement for exception for empty lines. + if e is None: + continue count += 1 self.assertEqual("test%d" % count, e['object'].metadata.name) self.assertEqual(3, count) @@ -488,7 +497,82 @@ class WatchTests(unittest.TestCase): amt=None, decode_content=False) fake_resp.close.assert_called_once() fake_resp.release_conn.assert_called_once() + + @classmethod + def setUpClass(cls): + cls.api = Mock() + cls.namespace = "default" + def test_pod_log_empty_lines(self): + pod_name = "demo-bug" + # Manifest with busybax to keep pod engaged for sometiem + pod_manifest = { + "apiVersion": "v1", + "kind": "Pod", + "metadata": {"name": pod_name}, + "spec": { + "containers": [{ + "image": "busybox", + "name": "my-container", + "command": ["sh", "-c", "while true; do echo Hello from Docker ; sleep 10; done"] + }] + }, + } + + try: + self.api.create_namespaced_pod = Mock() + self.api.read_namespaced_pod = Mock() + self.api.delete_namespaced_pod = Mock() + self.api.read_namespaced_pod_log = Mock() + + #pod creating step + self.api.create_namespaced_pod.return_value = None + + #Checking pod status + mock_pod = Mock() + mock_pod.status.phase = "Running" + self.api.read_namespaced_pod.return_value = mock_pod + + # Printing at pod output + self.api.read_namespaced_pod_log.return_value = iter(["Hello from Docker\n"]) + + # Wait for the pod to reach 'Running' + timeout = 60 + start_time = time.time() + while time.time() - start_time < timeout: + pod = self.api.read_namespaced_pod(name=pod_name, namespace=self.namespace) + if pod.status.phase == "Running": + break + time.sleep(2) + else: + self.fail("Pod did not reach 'Running' state within timeout") + + # Reading and streaming logs using Watch (mocked) + w = Watch() + log_output = [] + #Mock logs used for this test + w.stream = Mock(return_value=[ + "Hello from Docker", + "\n", # Empty line + "Another log line", + "\n", # Another empty line + "Final log" + ]) + for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True): + log_output.append(event) + print(event) + + # Print outputs + print(f"Captured logs: {log_output}") + # self.assertTrue(any("Hello from Docker" in line for line in log_output)) + self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs") + + except ApiException as e: + self.fail(f"Kubernetes API exception: {e}") + finally: + #checking pod is calling for delete + self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace) + self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace) if __name__ == '__main__': unittest.main()