remove required idp-certificate-authority-data in kubeconfig for oidc toke refresh, kubernetes-client/python#493

fix pep8 style
add unit test
This commit is contained in:
mvle 2018-05-23 21:35:32 +00:00
parent 5784a38708
commit 980f9b1042
2 changed files with 70 additions and 16 deletions

View File

@ -255,22 +255,27 @@ class KubeConfigLoader(object):
return self.token
def _refresh_oidc(self, provider):
ca_cert = tempfile.NamedTemporaryFile(delete=True)
if PY3:
cert = base64.b64decode(
provider['config']['idp-certificate-authority-data']
).decode('utf-8')
else:
cert = base64.b64decode(
provider['config']['idp-certificate-authority-data'] + "=="
)
with open(ca_cert.name, 'w') as fh:
fh.write(cert)
config = Configuration()
config.ssl_ca_cert = ca_cert.name
if 'idp-certificate-authority-data' in provider['config']:
ca_cert = tempfile.NamedTemporaryFile(delete=True)
if PY3:
cert = base64.b64decode(
provider['config']['idp-certificate-authority-data']
).decode('utf-8')
else:
cert = base64.b64decode(
provider['config']['idp-certificate-authority-data'] + "=="
)
with open(ca_cert.name, 'w') as fh:
fh.write(cert)
config.ssl_ca_cert = ca_cert.name
else:
config.verify_ssl = False
client = ApiClient(configuration=config)
@ -301,7 +306,7 @@ class KubeConfigLoader(object):
refresh_token=provider['config']['refresh-token'],
auth=(provider['config']['client-id'],
provider['config']['client-secret']),
verify=ca_cert.name
verify=config.ssl_ca_cert if config.verify_ssl else None
)
except oauthlib.oauth2.rfc6749.errors.InvalidClientIdError:
return

View File

@ -373,6 +373,13 @@ class TestKubeConfigLoader(BaseTestCase):
"user": "expired_oidc"
}
},
{
"name": "expired_oidc_nocert",
"context": {
"cluster": "default",
"user": "expired_oidc_nocert"
}
},
{
"name": "user_pass",
"context": {
@ -519,6 +526,22 @@ class TestKubeConfigLoader(BaseTestCase):
}
}
},
{
"name": "expired_oidc_nocert",
"user": {
"auth-provider": {
"name": "oidc",
"config": {
"client-id": "tectonic-kubectl",
"client-secret": "FAKE_SECRET",
"id-token": TEST_OIDC_EXPIRED_LOGIN,
"idp-issuer-url": "https://example.org/identity",
"refresh-token":
"lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
}
}
}
},
{
"name": "user_pass",
"user": {
@ -649,6 +672,32 @@ class TestKubeConfigLoader(BaseTestCase):
self.assertTrue(loader._load_oid_token())
self.assertEqual("Bearer abc123", loader.token)
@mock.patch('kubernetes.config.kube_config.OAuth2Session.refresh_token')
@mock.patch('kubernetes.config.kube_config.ApiClient.request')
def test_oidc_with_refresh_nocert(
self, mock_ApiClient, mock_OAuth2Session):
mock_response = mock.MagicMock()
type(mock_response).status = mock.PropertyMock(
return_value=200
)
type(mock_response).data = mock.PropertyMock(
return_value=json.dumps({
"token_endpoint": "https://example.org/identity/token"
})
)
mock_ApiClient.return_value = mock_response
mock_OAuth2Session.return_value = {"id_token": "abc123",
"refresh_token": "newtoken123"}
loader = KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="expired_oidc_nocert",
)
self.assertTrue(loader._load_oid_token())
self.assertEqual("Bearer abc123", loader.token)
def test_user_pass(self):
expected = FakeConfig(host=TEST_HOST, token=TEST_BASIC_TOKEN)
actual = FakeConfig()