Merge pull request #55 from flylo/flipped_sign

fixing flipped sign in expiry time padding
This commit is contained in:
Yu Liao 2018-04-05 09:52:46 -07:00 committed by GitHub
commit 789de6a60d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 38 additions and 13 deletions

View File

@ -63,7 +63,7 @@ def _create_temp_file_with_content(content):
def _is_expired(expiry):
return ((parse_rfc3339(expiry) + EXPIRY_SKEW_PREVENTION_DELAY) <=
return ((parse_rfc3339(expiry) - EXPIRY_SKEW_PREVENTION_DELAY) <=
datetime.datetime.utcnow().replace(tzinfo=UTC))

View File

@ -22,10 +22,9 @@ import unittest
import mock
import yaml
from six import PY3
from six import PY3, next
from .config_exception import ConfigException
from .dateutil import parse_rfc3339
from .kube_config import (ConfigNode, FileOrData, KubeConfigLoader,
_cleanup_temp_files, _create_temp_file_with_content,
list_kube_config_contexts, load_kube_config,
@ -33,6 +32,10 @@ from .kube_config import (ConfigNode, FileOrData, KubeConfigLoader,
BEARER_TOKEN_FORMAT = "Bearer %s"
EXPIRY_DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
# should be less than kube_config.EXPIRY_SKEW_PREVENTION_DELAY
EXPIRY_TIMEDELTA = 2
NON_EXISTING_FILE = "zz_non_existing_file_472398324"
@ -40,6 +43,17 @@ def _base64(string):
return base64.encodestring(string.encode()).decode()
def _format_expiry_datetime(dt):
return dt.strftime(EXPIRY_DATETIME_FORMAT)
def _get_expiry(loader):
expired_gcp_conf = (item for item in loader._config.value.get("users")
if item.get("name") == "expired_gcp")
return next(expired_gcp_conf).get("user").get("auth-provider") \
.get("config").get("expiry")
def _raise_exception(st):
raise Exception(st)
@ -59,6 +73,8 @@ TEST_USERNAME = "me"
TEST_PASSWORD = "pass"
# token for me:pass
TEST_BASIC_TOKEN = "Basic bWU6cGFzcw=="
TEST_TOKEN_EXPIRY = _format_expiry_datetime(
datetime.datetime.utcnow() - datetime.timedelta(minutes=EXPIRY_TIMEDELTA))
TEST_SSL_HOST = "https://test-host"
TEST_CERTIFICATE_AUTH = "cert-auth"
@ -194,10 +210,12 @@ class TestConfigNode(BaseTestCase):
{"name": "test_name2",
"value": {"key1", "test"}},
{"name": "test_name3", "value": [1, 2, 3]}],
"with_names_dup": [{"name": "test_name", "value": "test_value"},
{"name": "test_name",
"value": {"key1", "test"}},
{"name": "test_name3", "value": [1, 2, 3]}]}
"with_names_dup": [
{"name": "test_name", "value": "test_value"},
{"name": "test_name",
"value": {"key1", "test"}},
{"name": "test_name3", "value": [1, 2, 3]}
]}
def setUp(self):
super(TestConfigNode, self).setUp()
@ -213,7 +231,8 @@ class TestConfigNode(BaseTestCase):
self.assertEqual(3, len(self.node['key2']))
self.assertEqual("test_obj/key3", self.node['key3'].name)
self.assertEqual({"inner_key": "inner_value"}, self.node['key3'].value)
self.assertEqual({"inner_key": "inner_value"},
self.node['key3'].value)
self.assertEqual("inner_value", self.node['key3']["inner_key"])
self.assertEqual(1, len(self.node['key3']))
@ -255,7 +274,8 @@ class TestConfigNode(BaseTestCase):
def test_get_with_name_on_duplicate_name(self):
self.expect_exception(
lambda: self.node['with_names_dup'].get_with_name('test_name'),
"Expected only one object with name test_name in test_obj/with_names_dup list")
"Expected only one object with name test_name in "
"test_obj/with_names_dup list")
class FakeConfig:
@ -421,7 +441,8 @@ class TestKubeConfigLoader(BaseTestCase):
"name": "ssl",
"cluster": {
"server": TEST_SSL_HOST,
"certificate-authority-data": TEST_CERTIFICATE_AUTH_BASE64,
"certificate-authority-data":
TEST_CERTIFICATE_AUTH_BASE64,
}
},
{
@ -462,7 +483,7 @@ class TestKubeConfigLoader(BaseTestCase):
"name": "gcp",
"config": {
"access-token": TEST_DATA_BASE64,
"expiry": "2000-01-01T12:00:00Z", # always in past
"expiry": TEST_TOKEN_EXPIRY, # always in past
}
},
"token": TEST_DATA_BASE64, # should be ignored
@ -492,7 +513,8 @@ class TestKubeConfigLoader(BaseTestCase):
"id-token": TEST_OIDC_EXPIRED_LOGIN,
"idp-certificate-authority-data": TEST_OIDC_CA,
"idp-issuer-url": "https://example.org/identity",
"refresh-token": "lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
"refresh-token":
"lucWJjEhlxZW01cXI3YmVlcYnpxNGhzk"
}
}
}
@ -578,7 +600,6 @@ class TestKubeConfigLoader(BaseTestCase):
loader.token)
def test_load_gcp_token_with_refresh(self):
def cred(): return None
cred.token = TEST_ANOTHER_DATA_BASE64
cred.expiry = datetime.datetime.now()
@ -587,7 +608,11 @@ class TestKubeConfigLoader(BaseTestCase):
config_dict=self.TEST_KUBE_CONFIG,
active_context="expired_gcp",
get_google_credentials=lambda: cred)
original_expiry = _get_expiry(loader)
self.assertTrue(loader._load_gcp_token())
new_expiry = _get_expiry(loader)
# assert that the configs expiry actually updates
self.assertTrue(new_expiry > original_expiry)
self.assertEqual(BEARER_TOKEN_FORMAT % TEST_ANOTHER_DATA_BASE64,
loader.token)