Add test to ensure kubernetes client threadpool is cleaned up

This commit is contained in:
Fabian von Feilitzsch 2020-02-06 12:37:16 -05:00 committed by Nabarun Pal
parent 9f880d8396
commit b4a3fc235e
No known key found for this signature in database
GPG Key ID: 611D5079D826B150

View File

@ -0,0 +1,25 @@
# coding: utf-8
import atexit
import weakref
import unittest
import kubernetes
class TestApiClient(unittest.TestCase):
def test_context_manager_closes_threadpool(self):
with kubernetes.client.ApiClient() as client:
self.assertIsNotNone(client.pool)
pool_ref = weakref.ref(client._pool)
self.assertIsNotNone(pool_ref())
self.assertIsNone(pool_ref())
def test_atexit_closes_threadpool(self):
client = kubernetes.client.ApiClient()
self.assertIsNotNone(client.pool)
self.assertIsNotNone(client._pool)
atexit._run_exitfuncs()
self.assertIsNone(client._pool)