python/leaderelection/leaderelection.py
Sumant 4d29af161b Leader Election issue #434
changed file naming style consistent with the existing go client code

Update example.py

Changed file and folder names

Rename LeaderElection.py to leaderelection.py

Rename threadingWithException.py to threadingwithexception.py

Rename ConfigMapLock.py to configmaplock.py

LeaderElection to leaderelection

Added boiler plate headers, updated variable and function names consistent with the guidelines, removed the ctypes dependency by using traces to kill threads, changed logic for leader now it gives up and doesn't re-join as a follower if it fails to update lease

added correct boiler plate year

Rename threadingWithTrace.py to threadingwithtrace.py

Update leaderelection.py

Update example.py

Changes based on review - logging, OnStoppedLeading is not killed abruptly, OnStartedLeading is not run in a separate thread, adding README

Update example.py

updated comments

set threads as daemon

Update README.md

Code made consistent with other clients.

Update example.py

Update leaderelection.py

Error & exception handling for the annotation, reduced indentation

Adding serializing functions for serializing & de-serializing locks, leader_election_record as a class

Adding a test

Adding boilerplate header

Rename leaderelectiontest.py to leaderelection_test.py

Updated boiler plates

handling imports for pytest

handling 'HTTP not found' compatibility with python 2 & 3, & handling relative imports

Update leaderelection.py

to check tests for tox

assertEquals -> assertEqual

Update leaderelection_test.py

making Threading compatible for Python 2

changing datetime.timestamp for backward compatibility with Python 2.7

Adding comments for test_Leader_election_with_renew_deadline & making
candidates run in parallel for test_leader_election

remove redundant daemon = True reassignment

common thread lock for MockResourceLock
2021-01-13 19:47:43 -05:00

192 lines
8.2 KiB
Python

# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import sys
import time
import json
import threading
from .leaderelectionrecord import LeaderElectionRecord
import logging
# if condition to be removed when support for python2 will be removed
if sys.version_info > (3, 0):
from http import HTTPStatus
else:
import httplib
logging.basicConfig(level=logging.INFO)
"""
This package implements leader election using an annotation in a Kubernetes object.
The onstarted_leading function is run in a thread and when it returns, if it does
it might not be safe to run it again in a process.
At first all candidates are considered followers. The one to create a lock or update
an existing lock first becomes the leader and remains so until it keeps renewing its
lease.
"""
class LeaderElection:
def __init__(self, election_config):
if election_config is None:
sys.exit("argument config not passed")
# Latest record observed in the created lock object
self.observed_record = None
# The configuration set for this candidate
self.election_config = election_config
# Latest update time of the lock
self.observed_time_milliseconds = 0
# Point of entry to Leader election
def run(self):
# Try to create/ acquire a lock
if self.acquire():
logging.info("{} successfully acquired lease".format(self.election_config.lock.identity))
# Start leading and call OnStartedLeading()
threading.daemon = True
threading.Thread(target=self.election_config.onstarted_leading).start()
self.renew_loop()
# Failed to update lease, run OnStoppedLeading callback
self.election_config.onstopped_leading()
def acquire(self):
# Follower
logging.info("{} is a follower".format(self.election_config.lock.identity))
retry_period = self.election_config.retry_period
while True:
succeeded = self.try_acquire_or_renew()
if succeeded:
return True
time.sleep(retry_period)
def renew_loop(self):
# Leader
logging.info("Leader has entered renew loop and will try to update lease continuously")
retry_period = self.election_config.retry_period
renew_deadline = self.election_config.renew_deadline * 1000
while True:
timeout = int(time.time() * 1000) + renew_deadline
succeeded = False
while int(time.time() * 1000) < timeout:
succeeded = self.try_acquire_or_renew()
if succeeded:
break
time.sleep(retry_period)
if succeeded:
time.sleep(retry_period)
continue
# failed to renew, return
return
def try_acquire_or_renew(self):
now_timestamp = time.time()
now = datetime.datetime.fromtimestamp(now_timestamp)
# Check if lock is created
lock_status, old_election_record = self.election_config.lock.get(self.election_config.lock.name,
self.election_config.lock.namespace)
# create a default Election record for this candidate
leader_election_record = LeaderElectionRecord(self.election_config.lock.identity,
str(self.election_config.lease_duration), str(now), str(now))
# A lock is not created with that name, try to create one
if not lock_status:
# To be removed when support for python2 will be removed
if sys.version_info > (3, 0):
if json.loads(old_election_record.body)['code'] != HTTPStatus.NOT_FOUND:
logging.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name,
old_election_record.reason))
return False
else:
if json.loads(old_election_record.body)['code'] != httplib.NOT_FOUND:
logging.info("Error retrieving resource lock {} as {}".format(self.election_config.lock.name,
old_election_record.reason))
return False
logging.info("{} is trying to create a lock".format(leader_election_record.holder_identity))
create_status = self.election_config.lock.create(name=self.election_config.lock.name,
namespace=self.election_config.lock.namespace,
election_record=leader_election_record)
if create_status is False:
logging.info("{} Failed to create lock".format(leader_election_record.holder_identity))
return False
self.observed_record = leader_election_record
self.observed_time_milliseconds = int(time.time() * 1000)
return True
# A lock exists with that name
# Validate old_election_record
if old_election_record is None:
# try to update lock with proper annotation and election record
return self.update_lock(leader_election_record)
if (old_election_record.holder_identity is None or old_election_record.lease_duration is None
or old_election_record.acquire_time is None or old_election_record.renew_time is None):
# try to update lock with proper annotation and election record
return self.update_lock(leader_election_record)
# Report transitions
if self.observed_record and self.observed_record.holder_identity != old_election_record.holder_identity:
logging.info("Leader has switched to {}".format(old_election_record.holder_identity))
if self.observed_record is None or old_election_record.__dict__ != self.observed_record.__dict__:
self.observed_record = old_election_record
self.observed_time_milliseconds = int(time.time() * 1000)
# If This candidate is not the leader and lease duration is yet to finish
if (self.election_config.lock.identity != self.observed_record.holder_identity
and self.observed_time_milliseconds + self.election_config.lease_duration * 1000 > int(now_timestamp * 1000)):
logging.info("yet to finish lease_duration, lease held by {} and has not expired".format(old_election_record.holder_identity))
return False
# If this candidate is the Leader
if self.election_config.lock.identity == self.observed_record.holder_identity:
# Leader updates renewTime, but keeps acquire_time unchanged
leader_election_record.acquire_time = self.observed_record.acquire_time
return self.update_lock(leader_election_record)
def update_lock(self, leader_election_record):
# Update object with latest election record
update_status = self.election_config.lock.update(self.election_config.lock.name,
self.election_config.lock.namespace,
leader_election_record)
if update_status is False:
logging.info("{} failed to acquire lease".format(leader_election_record.holder_identity))
return False
self.observed_record = leader_election_record
self.observed_time_milliseconds = int(time.time() * 1000)
logging.info("leader {} has successfully acquired lease".format(leader_election_record.holder_identity))
return True