Accept client certificates from an authn/authz plugin

(Plugin interface reference: https://kubernetes.io/docs/reference/access-authn-authz/authentication/#input-and-output-formats)

When handling the response from the authn/authz plugin, `token` will be used if provided, which maintains current behaviour. Newly added is handling `clientCertificateData`: if it is present, that certificate (and its key) will be used as provided by the plugin. (And any certificate/key pair provided via the `users` section of the configuration file will be ignored.)
This commit is contained in:
Graham Reed 2020-05-29 17:09:38 +01:00
parent fb86b8acb6
commit b85aff2b3e
2 changed files with 70 additions and 11 deletions

View File

@ -472,11 +472,31 @@ class KubeConfigLoader(object):
return
try:
status = ExecProvider(self._user['exec']).run()
if 'token' not in status:
logging.error('exec: missing token field in plugin output')
return None
self.token = "Bearer %s" % status['token']
return True
if 'token' in status:
self.token = "Bearer %s" % status['token']
return True
if 'clientCertificateData' in status:
# https://kubernetes.io/docs/reference/access-authn-authz/authentication/#input-and-output-formats
# Plugin has provided certificates instead of a token.
if 'clientKeyData' not in status:
logging.error('exec: missing clientKeyData field in '
'plugin output')
return None
base_path = self._get_base_path(self._cluster.path)
self.cert_file = FileOrData(
status, None,
data_key_name='clientCertificateData',
file_base_path=base_path,
base64_file_content=False).as_file()
self.key_file = FileOrData(
status, None,
data_key_name='clientKeyData',
file_base_path=base_path,
base64_file_content=False).as_file()
return True
logging.error('exec: missing token or clientCertificateData field '
'in plugin output')
return None
except Exception as e:
logging.error(str(e))
@ -512,12 +532,16 @@ class KubeConfigLoader(object):
self.ssl_ca_cert = FileOrData(
self._cluster, 'certificate-authority',
file_base_path=base_path).as_file()
self.cert_file = FileOrData(
self._user, 'client-certificate',
file_base_path=base_path).as_file()
self.key_file = FileOrData(
self._user, 'client-key',
file_base_path=base_path).as_file()
if 'cert_file' not in self.__dict__:
# cert_file could have been provided by
# _load_from_exec_plugin; only load from the _user
# section if we need it.
self.cert_file = FileOrData(
self._user, 'client-certificate',
file_base_path=base_path).as_file()
self.key_file = FileOrData(
self._user, 'client-key',
file_base_path=base_path).as_file()
if 'insecure-skip-tls-verify' in self._cluster:
self.verify_ssl = not self._cluster['insecure-skip-tls-verify']

View File

@ -541,6 +541,13 @@ class TestKubeConfigLoader(BaseTestCase):
"user": "exec_cred_user"
}
},
{
"name": "exec_cred_user_certificate",
"context": {
"cluster": "ssl",
"user": "exec_cred_user_certificate"
}
},
{
"name": "contexttestcmdpath",
"context": {
@ -865,6 +872,16 @@ class TestKubeConfigLoader(BaseTestCase):
}
}
},
{
"name": "exec_cred_user_certificate",
"user": {
"exec": {
"apiVersion": "client.authentication.k8s.io/v1beta1",
"command": "custom-certificate-authenticator",
"args": []
}
}
},
{
"name": "usertestcmdpath",
"user": {
@ -1295,6 +1312,24 @@ class TestKubeConfigLoader(BaseTestCase):
active_context="exec_cred_user").load_and_set(actual)
self.assertEqual(expected, actual)
@mock.patch('kubernetes.config.kube_config.ExecProvider.run')
def test_user_exec_auth_certificates(self, mock):
mock.return_value = {
"clientCertificateData": TEST_CLIENT_CERT,
"clientKeyData": TEST_CLIENT_KEY,
}
expected = FakeConfig(
host=TEST_SSL_HOST,
cert_file=self._create_temp_file(TEST_CLIENT_CERT),
key_file=self._create_temp_file(TEST_CLIENT_KEY),
ssl_ca_cert=self._create_temp_file(TEST_CERTIFICATE_AUTH),
verify_ssl=True)
actual = FakeConfig()
KubeConfigLoader(
config_dict=self.TEST_KUBE_CONFIG,
active_context="exec_cred_user_certificate").load_and_set(actual)
self.assertEqual(expected, actual)
def test_user_cmd_path(self):
A = namedtuple('A', ['token', 'expiry'])
token = "dummy"