Update watch_test.py

Added a unit test name test_pod_log_empty_lines to check if watch is printing empty lines. And made changes in test_watch_with_interspersed_newlines as the watch is also printing empty line. Added a condition to check for empty lines in ogs to avoid the errors.
This commit is contained in:
Raj Bhargav 2025-03-18 13:46:41 +05:30 committed by GitHub
parent b2f975537c
commit 3cee537e63
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -14,12 +14,18 @@
import unittest
import os
import time
from unittest.mock import Mock, call
from kubernetes import client
from kubernetes import client,config
from .watch import Watch
from kubernetes.client import ApiException
class WatchTests(unittest.TestCase):
def setUp(self):
@ -99,6 +105,9 @@ class WatchTests(unittest.TestCase):
# Note that "timeout_seconds" below is not a timeout; rather, it disables retries and is
# the only way to do so. Without that, the stream will re-read the test data forever.
for e in w.stream(fake_api.get_namespaces, timeout_seconds=1):
# Here added a statement for exception for empty lines.
if e is None:
continue
count += 1
self.assertEqual("test%d" % count, e['object'].metadata.name)
self.assertEqual(3, count)
@ -488,7 +497,82 @@ class WatchTests(unittest.TestCase):
amt=None, decode_content=False)
fake_resp.close.assert_called_once()
fake_resp.release_conn.assert_called_once()
@classmethod
def setUpClass(cls):
cls.api = Mock()
cls.namespace = "default"
def test_pod_log_empty_lines(self):
pod_name = "demo-bug"
# Manifest with busybax to keep pod engaged for sometiem
pod_manifest = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"name": pod_name},
"spec": {
"containers": [{
"image": "busybox",
"name": "my-container",
"command": ["sh", "-c", "while true; do echo Hello from Docker ; sleep 10; done"]
}]
},
}
try:
self.api.create_namespaced_pod = Mock()
self.api.read_namespaced_pod = Mock()
self.api.delete_namespaced_pod = Mock()
self.api.read_namespaced_pod_log = Mock()
#pod creating step
self.api.create_namespaced_pod.return_value = None
#Checking pod status
mock_pod = Mock()
mock_pod.status.phase = "Running"
self.api.read_namespaced_pod.return_value = mock_pod
# Printing at pod output
self.api.read_namespaced_pod_log.return_value = iter(["Hello from Docker\n"])
# Wait for the pod to reach 'Running'
timeout = 60
start_time = time.time()
while time.time() - start_time < timeout:
pod = self.api.read_namespaced_pod(name=pod_name, namespace=self.namespace)
if pod.status.phase == "Running":
break
time.sleep(2)
else:
self.fail("Pod did not reach 'Running' state within timeout")
# Reading and streaming logs using Watch (mocked)
w = Watch()
log_output = []
#Mock logs used for this test
w.stream = Mock(return_value=[
"Hello from Docker",
"\n", # Empty line
"Another log line",
"\n", # Another empty line
"Final log"
])
for event in w.stream(self.api.read_namespaced_pod_log, name=pod_name, namespace=self.namespace, follow=True):
log_output.append(event)
print(event)
# Print outputs
print(f"Captured logs: {log_output}")
# self.assertTrue(any("Hello from Docker" in line for line in log_output))
self.assertTrue(any(line.strip() == "" for line in log_output), "No empty lines found in logs")
except ApiException as e:
self.fail(f"Kubernetes API exception: {e}")
finally:
#checking pod is calling for delete
self.api.delete_namespaced_pod(name=pod_name, namespace=self.namespace)
self.api.delete_namespaced_pod.assert_called_once_with(name=pod_name, namespace=self.namespace)
if __name__ == '__main__':
unittest.main()