changed file naming style consistent with the existing go client code Update example.py Changed file and folder names Rename LeaderElection.py to leaderelection.py Rename threadingWithException.py to threadingwithexception.py Rename ConfigMapLock.py to configmaplock.py LeaderElection to leaderelection Added boiler plate headers, updated variable and function names consistent with the guidelines, removed the ctypes dependency by using traces to kill threads, changed logic for leader now it gives up and doesn't re-join as a follower if it fails to update lease added correct boiler plate year Rename threadingWithTrace.py to threadingwithtrace.py Update leaderelection.py Update example.py Changes based on review - logging, OnStoppedLeading is not killed abruptly, OnStartedLeading is not run in a separate thread, adding README Update example.py updated comments set threads as daemon Update README.md Code made consistent with other clients. Update example.py Update leaderelection.py Error & exception handling for the annotation, reduced indentation Adding serializing functions for serializing & de-serializing locks, leader_election_record as a class Adding a test Adding boilerplate header Rename leaderelectiontest.py to leaderelection_test.py Updated boiler plates handling imports for pytest handling 'HTTP not found' compatibility with python 2 & 3, & handling relative imports Update leaderelection.py to check tests for tox assertEquals -> assertEqual Update leaderelection_test.py making Threading compatible for Python 2 changing datetime.timestamp for backward compatibility with Python 2.7 Adding comments for test_Leader_election_with_renew_deadline & making candidates run in parallel for test_leader_election remove redundant daemon = True reassignment common thread lock for MockResourceLock
271 lines
9.7 KiB
Python
271 lines
9.7 KiB
Python
# Copyright 2021 The Kubernetes Authors.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
|
|
from . import leaderelection
|
|
from .leaderelectionrecord import LeaderElectionRecord
|
|
from kubernetes.client.rest import ApiException
|
|
from . import electionconfig
|
|
import unittest
|
|
import threading
|
|
import json
|
|
import time
|
|
import pytest
|
|
|
|
thread_lock = threading.RLock()
|
|
|
|
class LeaderElectionTest(unittest.TestCase):
|
|
def test_simple_leader_election(self):
|
|
election_history = []
|
|
leadership_history = []
|
|
|
|
def on_create():
|
|
election_history.append("create record")
|
|
leadership_history.append("get leadership")
|
|
|
|
def on_update():
|
|
election_history.append("update record")
|
|
|
|
def on_change():
|
|
election_history.append("change record")
|
|
|
|
mock_lock = MockResourceLock("mock", "mock_namespace", "mock", thread_lock, on_create, on_update, on_change, None)
|
|
|
|
def on_started_leading():
|
|
leadership_history.append("start leading")
|
|
|
|
def on_stopped_leading():
|
|
leadership_history.append("stop leading")
|
|
|
|
# Create config 4.5 4 3
|
|
config = electionconfig.Config(lock=mock_lock, lease_duration=2.5,
|
|
renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading,
|
|
onstopped_leading=on_stopped_leading)
|
|
|
|
# Enter leader election
|
|
leaderelection.LeaderElection(config).run()
|
|
|
|
self.assert_history(election_history, ["create record", "update record", "update record", "update record"])
|
|
self.assert_history(leadership_history, ["get leadership", "start leading", "stop leading"])
|
|
|
|
def test_leader_election(self):
|
|
election_history = []
|
|
leadership_history = []
|
|
|
|
def on_create_A():
|
|
election_history.append("A creates record")
|
|
leadership_history.append("A gets leadership")
|
|
|
|
def on_update_A():
|
|
election_history.append("A updates record")
|
|
|
|
def on_change_A():
|
|
election_history.append("A gets leadership")
|
|
|
|
mock_lock_A = MockResourceLock("mock", "mock_namespace", "MockA", thread_lock, on_create_A, on_update_A, on_change_A, None)
|
|
mock_lock_A.renew_count_max = 3
|
|
|
|
def on_started_leading_A():
|
|
leadership_history.append("A starts leading")
|
|
|
|
def on_stopped_leading_A():
|
|
leadership_history.append("A stops leading")
|
|
|
|
config_A = electionconfig.Config(lock=mock_lock_A, lease_duration=2.5,
|
|
renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading_A,
|
|
onstopped_leading=on_stopped_leading_A)
|
|
|
|
def on_create_B():
|
|
election_history.append("B creates record")
|
|
leadership_history.append("B gets leadership")
|
|
|
|
def on_update_B():
|
|
election_history.append("B updates record")
|
|
|
|
def on_change_B():
|
|
leadership_history.append("B gets leadership")
|
|
|
|
mock_lock_B = MockResourceLock("mock", "mock_namespace", "MockB", thread_lock, on_create_B, on_update_B, on_change_B, None)
|
|
mock_lock_B.renew_count_max = 4
|
|
|
|
def on_started_leading_B():
|
|
leadership_history.append("B starts leading")
|
|
|
|
def on_stopped_leading_B():
|
|
leadership_history.append("B stops leading")
|
|
|
|
config_B = electionconfig.Config(lock=mock_lock_B, lease_duration=2.5,
|
|
renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading_B,
|
|
onstopped_leading=on_stopped_leading_B)
|
|
|
|
mock_lock_B.leader_record = mock_lock_A.leader_record
|
|
|
|
threading.daemon = True
|
|
# Enter leader election for A
|
|
threading.Thread(target=leaderelection.LeaderElection(config_A).run()).start()
|
|
|
|
# Enter leader election for B
|
|
threading.Thread(target=leaderelection.LeaderElection(config_B).run()).start()
|
|
|
|
time.sleep(5)
|
|
|
|
self.assert_history(election_history,
|
|
["A creates record",
|
|
"A updates record",
|
|
"A updates record",
|
|
"B updates record",
|
|
"B updates record",
|
|
"B updates record",
|
|
"B updates record"])
|
|
self.assert_history(leadership_history,
|
|
["A gets leadership",
|
|
"A starts leading",
|
|
"A stops leading",
|
|
"B gets leadership",
|
|
"B starts leading",
|
|
"B stops leading"])
|
|
|
|
|
|
"""Expected behavior: to check if the leader stops leading if it fails to update the lock within the renew_deadline
|
|
and stops leading after finally timing out. The difference between each try comes out to be approximately the sleep
|
|
time.
|
|
Example:
|
|
create record: 0s
|
|
on try update: 1.5s
|
|
on update: zzz s
|
|
on try update: 3s
|
|
on update: zzz s
|
|
on try update: 4.5s
|
|
on try update: 6s
|
|
Timeout - Leader Exits"""
|
|
def test_Leader_election_with_renew_deadline(self):
|
|
election_history = []
|
|
leadership_history = []
|
|
|
|
def on_create():
|
|
election_history.append("create record")
|
|
leadership_history.append("get leadership")
|
|
|
|
def on_update():
|
|
election_history.append("update record")
|
|
|
|
def on_change():
|
|
election_history.append("change record")
|
|
|
|
def on_try_update():
|
|
election_history.append("try update record")
|
|
|
|
mock_lock = MockResourceLock("mock", "mock_namespace", "mock", thread_lock, on_create, on_update, on_change, on_try_update)
|
|
mock_lock.renew_count_max = 3
|
|
|
|
def on_started_leading():
|
|
leadership_history.append("start leading")
|
|
|
|
def on_stopped_leading():
|
|
leadership_history.append("stop leading")
|
|
|
|
# Create config
|
|
config = electionconfig.Config(lock=mock_lock, lease_duration=2.5,
|
|
renew_deadline=2, retry_period=1.5, onstarted_leading=on_started_leading,
|
|
onstopped_leading=on_stopped_leading)
|
|
|
|
# Enter leader election
|
|
leaderelection.LeaderElection(config).run()
|
|
|
|
self.assert_history(election_history,
|
|
["create record",
|
|
"try update record",
|
|
"update record",
|
|
"try update record",
|
|
"update record",
|
|
"try update record",
|
|
"try update record"])
|
|
|
|
self.assert_history(leadership_history, ["get leadership", "start leading", "stop leading"])
|
|
|
|
def assert_history(self, history, expected):
|
|
self.assertIsNotNone(expected)
|
|
self.assertIsNotNone(history)
|
|
self.assertEqual(len(expected), len(history))
|
|
|
|
for idx in range(len(history)):
|
|
self.assertEqual(history[idx], expected[idx],
|
|
msg="Not equal at index {}, expected {}, got {}".format(idx, expected[idx],
|
|
history[idx]))
|
|
|
|
|
|
class MockResourceLock:
|
|
def __init__(self, name, namespace, identity, shared_lock, on_create=None, on_update=None, on_change=None, on_try_update=None):
|
|
# self.leader_record is shared between two MockResourceLock objects
|
|
self.leader_record = []
|
|
self.renew_count = 0
|
|
self.renew_count_max = 4
|
|
self.name = name
|
|
self.namespace = namespace
|
|
self.identity = str(identity)
|
|
self.lock = shared_lock
|
|
|
|
self.on_create = on_create
|
|
self.on_update = on_update
|
|
self.on_change = on_change
|
|
self.on_try_update = on_try_update
|
|
|
|
def get(self, name, namespace):
|
|
self.lock.acquire()
|
|
try:
|
|
if self.leader_record:
|
|
return True, self.leader_record[0]
|
|
|
|
ApiException.body = json.dumps({'code': 404})
|
|
return False, ApiException
|
|
finally:
|
|
self.lock.release()
|
|
|
|
def create(self, name, namespace, election_record):
|
|
self.lock.acquire()
|
|
try:
|
|
if len(self.leader_record) == 1:
|
|
return False
|
|
self.leader_record.append(election_record)
|
|
self.on_create()
|
|
self.renew_count += 1
|
|
return True
|
|
finally:
|
|
self.lock.release()
|
|
|
|
def update(self, name, namespace, updated_record):
|
|
self.lock.acquire()
|
|
try:
|
|
if self.on_try_update:
|
|
self.on_try_update()
|
|
if self.renew_count >= self.renew_count_max:
|
|
return False
|
|
|
|
old_record = self.leader_record[0]
|
|
self.leader_record[0] = updated_record
|
|
|
|
self.on_update()
|
|
|
|
if old_record.holder_identity != updated_record.holder_identity:
|
|
self.on_change()
|
|
|
|
self.renew_count += 1
|
|
return True
|
|
finally:
|
|
self.lock.release()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|