Ian Wells c8d15c9bcc Remove six
Six is Python 2 and 3 compatibility.  It is no longer needed now that
we're purely py3.

Change-Id: I28a6f8539b3b3ce35221728ec631380a18c921b3
2020-07-27 23:15:31 +00:00

901 lines
32 KiB
Python

# Copyright (c) 2016 Cisco Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from abc import ABC, abstractmethod
import atexit
import etcd
import eventlet
import eventlet.semaphore
from oslo_config import cfg
from oslo_log import log as logging
import re
import time
import typing # noqa
from urllib3.exceptions import TimeoutError as UrllibTimeoutError
import uuid
from networking_vpp._i18n import _
from networking_vpp import exceptions as vpp_exceptions
from networking_vpp import jwt_agent
from oslo_serialization import jsonutils
LOG = logging.getLogger(__name__)
ETC_HOSTS_DELIMITER = ','
ETC_PORT_HOST_DELIMITER = ':'
class EtcdWriter(ABC):
@abstractmethod
def _process_read_value(self, key, value):
"""Turn a string from etcd into a value in the style we prefer
May parse, validate and/or otherwise modify the result.
"""
pass
@abstractmethod
def _process_written_value(self, key, value):
"""Turn a value into a serialised string to store in etcd
May serialise, sign and/or otherwise modify the result.
"""
pass
def __init__(self, etcd_client):
self.etcd_client = etcd_client
def write(self, key, data, *args, **kwargs):
"""Serialise and write a single data key in etcd.
The value is received as a Python data structure and stored in
a conventional format (serialised to JSON, in this class
instance).
"""
data = self._process_written_value(key, data)
self.etcd_client.write(key, data, *args, **kwargs)
def read(self, *args, **kwargs):
"""Watch and parse a data key in etcd.
The value is stored in a conventional format (serialised
somehow) and we parse it to Python. This may throw an
exception if the data cannot be read when .value is called on
any part of the result.
"""
return ParsedEtcdResult(self,
self.etcd_client.read(*args, **kwargs))
def watch(self, *args, **kwargs):
"""Watch and parse a data key in etcd.
The value is stored in a conventional format (serialised
somehow) and we parse it to Python. This may throw an
exception if the data cannot be read when .value is called on
any part of the result.
"""
return ParsedEtcdResult(self,
self.etcd_client.watch(*args, **kwargs))
def delete(self, key):
# NB there's no way of clearly indicating that a delete is
# legitimate - the parser has no role in it, here.
# We do augment the delete to delete more throroughly. This
# should probably be a mixin.
try:
self.etcd_client.delete(key)
except etcd.EtcdNotFile:
# We are asked to delete a directory as in the
# case of GPE where the empty mac address directory
# needs deletion after the key (IP) has been deleted.
try:
# TODO(ijw): define semantics: recursive delete?
self.etcd_client.delete(key, dir=True)
except Exception: # pass any exceptions
pass
except etcd.EtcdKeyNotFound:
# The key may have already been deleted
# no problem here
pass
def _unchanged(name):
def pt_get(self):
return getattr(self._result, name)
return property(pt_get)
class ParsedEtcdResult(etcd.EtcdResult):
"""Parsed version of an EtcdResult
An equivalent to EtcdResult that processes its values using the
parser that the reading class implements.
"""
def __init__(self, reader, result):
self._reader = reader
self._result = result
# A whole set of result properties are as they are on the original
# result.
key = _unchanged('key')
expiration = _unchanged('expiration')
ttl = _unchanged('ttl')
modifiedIndex = _unchanged('modifiedIndex')
createdIndex = _unchanged('createdIndex')
newKey = _unchanged('newKey')
dir = _unchanged('dir')
etcd_index = _unchanged('etcd_index')
raft_index = _unchanged('raft_index')
action = _unchanged('action')
# support iteration over result's children
children = _unchanged('children')
# used internally by etcd.client.Client
_prev_node = _unchanged('_prev_node')
# The one special case, where we need to change things
@property
def value(self):
return self._reader._process_read_value(self._result.key,
self._result.value)
def get_subtree(self, *args, **kwargs):
for f in self._result.get_subtree(*args, **kwargs):
# This returns a value which may itself have a subtree
# depending on args passed
return ParsedEtcdResult(self._reader, f)
# We know a bit too much about the internals of EtcdResult, but
# given that, we know that the internals all work from .value or
# .get_subtree() and the rest of the calls should use parsed
# result values.
def json_writer(etcd_client):
if cfg.CONF.ml2_vpp.jwt_signing:
return SignedEtcdJSONWriter(etcd_client)
else:
return EtcdJSONWriter(etcd_client)
class EtcdJSONWriter(EtcdWriter):
"""Write Python datastructures to etcd in a consistent form.
This takes values (typically as Python datastructures) and
converts them using JSON serialisation to a form that can be
stored in etcd, and vice versa.
"""
def __init__(self, etcd_client):
super(EtcdJSONWriter, self).__init__(etcd_client)
def _process_read_value(self, key, value):
value = jsonutils.loads(value)
return value
def _process_written_value(self, key, value):
return jsonutils.dumps(value)
class SignedEtcdJSONWriter(EtcdJSONWriter):
"""Write Python datastructures to etcd in a consistent form.
This takes values (typically as Python datastructures) and
converts them using JSON serialisation to a form that can be
stored in etcd, and vice versa.
"""
def __init__(self, etcd_client):
self.jwt_agent = jwt_agent.JWTUtils(
cfg.CONF.ml2_vpp.jwt_node_cert,
cfg.CONF.ml2_vpp.jwt_node_private_key,
cfg.CONF.ml2_vpp.jwt_ca_cert,
cfg.CONF.ml2_vpp.jwt_controller_name_pattern)
super(SignedEtcdJSONWriter, self).__init__(etcd_client)
def _process_read_value(self, key, value):
value = jsonutils.loads(value)
if (self.jwt_agent.should_path_be_signed(key)):
signerNodeName = self.jwt_agent.get_signer_name(key)
value = self.jwt_agent.verify(signerNodeName,
key,
value)
return value
def _process_written_value(self, key, value):
if (self.jwt_agent.should_path_be_signed(key)):
value = self.jwt_agent.sign(key, value)
return jsonutils.dumps(value)
elector_cleanup = [] # type: typing.List[EtcdElection]
@atexit.register
def cleanup_electors():
for f in elector_cleanup:
f.clean()
class EtcdElectionLost(Exception):
pass
class EtcdElection(object):
def __init__(self, etcd_client, name, election_path,
work_time,
recovery_time=5,
multi_worker_ok=False):
"""Participant in a leader election via etcd datastore.
etcd_client: the client handle for dealing with etcd
name: the category name - we elect one leader of this type
election_path: the location where we conduct elections in etcd
work_time: the typical time the leader spends doing work.
It remains elected for this long without conducting another
election.
recovery_time: the time, after we're certain the leader has
stopped doing work, that is the longest we want to wait before
someone else takes over if the leader has died (Note that
this means you can be without a working leader for work_time
+ recovery_time if the leader crashes just after winning the
election)
multi_worker_ok: True if you'd prefer to favour having at least
one elected leader over having no more than one elected leader.
Typically this will cause a second leader to start working even
if the original still believes it's elected, and is useful if
that's more likely to reduce pauses.
"""
self.etcd_client = etcd_client
self.name = name
# A unique value that identifies each worker thread
self.thread_id = str(uuid.uuid4())
# Sleeping threads wake up after this time and
# check if a master is alive and one of them will become the master
# if the current master key has expired
self.recovery_time = recovery_time
# Threads hold the lock for this lng because this is the most
# work they will do.
self.work_time = work_time
self.master_key = election_path + "/master_%s" % self.name
# We recommend you configure these log levels
# etcd_log = logging.getLogger('etcd.client')
# etcd_log.setLevel(logging.logging.WARNING)
# LOG.setLevel(logging.logging.INFO)
global elector_cleanup
elector_cleanup.append(self)
def wait_until_elected(self):
"""Wait indefinitely until we are the only master among a pool of workers.
Election Algorithm:-
1) Each worker thread is assigned a unique thread_id at launch time.
2) All threads start the election process by running this method.
3) An etcd master key, whose value equals its thread_id, with a TTL
equal to the work_time, controls the master election process.
3) The thread that first succeeds in atomically writing its ID
to the etcd master key, becomes the master. The
remaining threads and go to sleep after a master has been elected.
4) The master thread then breaks out of the election loop and
starts doing work. It periodically refreshes the TTL value
of its key in etcd to let other threads know that it is alive.
It is rather important that it's work takes less than the
heartbeat time *if* there must be only one thread running.
It is not so much of a concern if there can be multiple threads
running and this is just to keep the active thread count down.
5) The sleeping threads periodically wake up every recovery_time
to check if the master is alive. If the master key is absent,
in etcd, they trigger a re-election and elect a new master,
which begins doing the work.
"""
attempt = 0
while True:
attempt += 1
try:
# Attempt to become master
self.etcd_client.write(self.master_key,
self.thread_id,
prevExist=False,
ttl=self.work_time)
LOG.debug("new master for %s threads is thread_id %s",
self.name, self.thread_id)
# if successful, the master breaks to start doing work
break
# An etcdException means some other thread has already become
# the master
except etcd.EtcdException:
try:
# We may already be the master. Extend the election time.
self.extend_election(self.work_time)
LOG.debug('Thread %s refreshed master for %s for try %d',
self.thread_id, self.name, attempt)
break
# All non-master threads will end up here, watch for
# recovery_time (in case some etcd connection fault means
# we don't get a watch notify) and become master if the
# current master is dead
except EtcdElectionLost:
try:
with eventlet.Timeout(self.recovery_time + 1, False):
# Most threads will be waiting here.
self.etcd_client.watch(
self.master_key,
timeout=self.recovery_time)
except (etcd.EtcdWatchTimedOut, UrllibTimeoutError):
pass
except etcd.EtcdException:
eventlet.sleep(self.recovery_time)
def extend_election(self, duration):
"""Assuming we are the master, attempt to extend our election time."""
try:
self.etcd_client.write(self.master_key,
self.thread_id,
prevValue=self.thread_id,
ttl=duration)
LOG.debug("Master thread %s extended election by %d secs",
self.thread_id, duration)
except etcd.EtcdException:
raise EtcdElectionLost()
def clean(self):
"""Release the election lock if we're currently elected.
This happens on process exit to speed up the re-election.
"""
try:
self.etcd_client.delete(self.master_key,
self.thread_id,
prevValue=self.thread_id)
except etcd.EtcdException:
pass
class EtcdWatcher(ABC):
# There's a thread election here because we want to keep the number
# of equivalent watcher threads down as we are generally running
# with multiple processes.
# NB: worst case time between ticks is heartbeat + DANGER_PAUSE seconds
# or the length of a read (which should be fast)
# We wait this long after an etcd exception in watch_forever, in
# case it's going wrong repeatedly. This prevents a runaway CPU
# eater.
DANGER_PAUSE = 2
def __init__(self, etcd_client, name, watch_path, election_path=None,
wait_until_elected=False, recovery_time=5,
data=None, heartbeat=60):
# NB: heartbeat + recovery + DANGER_PAUSE + whatever work you do is
# the loop total time. This is important if we're going to
# do elections and we need to allow this quantity of time
# before the election lapses.
self.etcd_client = etcd_client
self.tick = None
self.name = name
self.watch_path = watch_path
self.data = data
self.heartbeat = heartbeat
# The wait_until_elected is a switch that controls whether the
# threads need to wait to do work until elected. Note that the agent
# watcher threads do not require waiting for an election and as a
# result, the this is set to False
if wait_until_elected:
work_time = heartbeat + self.DANGER_PAUSE + recovery_time
self.etcd_elector = EtcdElection(etcd_client, name, election_path,
work_time=work_time,
recovery_time=recovery_time)
else:
self.etcd_elector = None
# Explicitly a semaphore, because we don't monkey patch 'threading'
# in the agent and can't use threading.Lock
self.etcd_data_lock = eventlet.semaphore.Semaphore()
self.etcd_data = None
# Get the initial state of etcd.
self.expected_keys = self.init_resync_start()
self.refresh_all_data()
# NB needs the lock to run safely.
def short_keys():
for f in self.etcd_data.keys():
short_key = self.make_short_key(f)
if short_key:
yield short_key
with self.etcd_data_lock:
self.init_resync_end(short_keys())
def make_short_key(self, key):
# TODO(ijw) .startswith would be more sensible
m = re.match('^' + re.escape(self.watch_path) + '/(.*)$', key)
if m:
return m.group(1)
else:
return None
def init_resync_start(self):
"""Overrideable function when the first resync starts
Whatever is being driven by the etcd data, this is a good time
to find out what state it's currently in. It may be
persisting data over a restart of this process watching etcd,
so things that are no longer in etcd (for which we won't have
seen deletes in the downtime) are now gone. This is only
needed initially; if a resync is required while we're running,
we've tracked the etcd content and we know what has gone away.
Returns: None for no cleanup, or a set of keys expected
"""
return None
def init_resync_end(self, short_keys):
"""Clean up stale data
When we start up, we need to bring controlled elements
into line with what etcd wants. We've already gone
through the keys in etcd that currently exist and made
sure that the they are correctly configured, but we now
need to remove any structures that correspond to items
no longer in etcd.
This may be overridden to add functionality.
short_keys - an iterator for all the keys found in etcd
"""
if self.expected_keys is None:
# Resync has not been implemented for this
# TODO(ijw): we should make it mandatory for resync
# which means expected_resync_start will become
# abstract.
return
stale_keys = set(self.expected_keys) - set(short_keys)
for f in stale_keys:
self.removed(f)
def do_work(self, action, key, value):
"""Process an indiviudal update received in a watch
Override this if you can deal with individual updates given
their location. Leave it if all updates involve rereading all
downloaded data.
etcd_data is current when called and will not change during
the call.
"""
self.do_all_work()
@abstractmethod
def do_all_work(self):
"""Process all updates from a refreshing read or a watch
This may happen on startup, on lost history or if the reader has no
better way to behave than checking all data.
etcd_data is current when called and will not change during
the call.
"""
pass
def do_tick(self):
"""Do background tasks that can happen between etcd updates.
Will be called once per (heartbeat + result processing time)
"""
pass
def refresh_all_data(self):
"""Load the entirety of the data we're watching from etcd.
This is used on initialisation and when history is lost. It
also causes work for keys that changed.
"""
LOG.debug("%s: resyncing in full", self.name)
rv = self.etcd_client.read(self.watch_path,
recursive=True)
with self.etcd_data_lock:
self.etcd_data = {}
for f in rv.children:
self.etcd_data[f.key] = f.value
self.do_all_work()
# TODO(ijw): there's a better number to use here
self.tick = rv.etcd_index + 1
LOG.debug("%s watch index recovered: %s",
self.name, rv.modifiedIndex)
def watch_forever(self):
"""Watch a keyspace forevermore
This may not exit - if there are errors they're logged (and in case
they are persistent we pause).
"""
while True:
try:
self.do_tick()
if self.etcd_elector:
self.etcd_elector.wait_until_elected()
self.do_watch()
except Exception:
LOG.exception('%s: etcd threw exception',
self.name)
# In case of a dead etcd causing continuous
# exceptions, the pause here avoids eating all the
# CPU
time.sleep(self.DANGER_PAUSE)
def do_watch(self):
"""Watch a keyspace
This will conduct one watch or one read.
"""
try:
LOG.debug("%s: pausing", self.name)
try:
if self.tick is None:
# We have no state, so we have effectively
# 'fallen off of the history' and need to
# resync in full.
raise etcd.EtcdEventIndexCleared()
# Current versions of python-etcd use the timeout in
# interesting ways. Slow URL connections + no data
# to return can lead to timeouts much longer than you
# might expect. So we watch for a timeout for
# ourselves as well.
# Here, we use both etcd Client timeout and eventlet timeout
# As etcd.Client timeout is not reliable, enforce
# the timeout with eventlet.Timeout
# Most of the time, we timeout thanks to etcd client,
# if we timeout due to eventlet, we have an ugly error message
rv = None
with eventlet.Timeout(self.heartbeat + 5, False):
rv = self.etcd_client.watch(self.watch_path,
recursive=True,
index=self.tick,
timeout=self.heartbeat)
if rv:
with self.etcd_data_lock:
# The processing function is entitled to check all etcd
# data. Update it before we call the processor.
if rv.action == 'delete':
self.etcd_data.pop(rv.key, None)
else:
self.etcd_data[rv.key] = rv.value
# We can, in the case of a watch, hint at where the
# update went.
try:
self.do_work(rv.action, rv.key, rv.value)
except Exception:
LOG.exception(('%s key %s value %s could'
'not be processed')
% (rv.action, rv.key, rv.value))
# TODO(ijw) raise or not raise? This is probably
# fatal and incurable, because we will only repeat
# the action on the next round.
raise
# Update the tick only when all the above completes so that
# exceptions don't cause the count to skip before the data
# is processed
self.tick = rv.modifiedIndex + 1
except etcd.EtcdEventIndexCleared:
# We can't follow etcd history in teaspoons, so
# grab the current state in its entirety.
self.refresh_all_data()
except (etcd.EtcdWatchTimedOut, UrllibTimeoutError):
# This is normal behaviour, indicating either a watch timeout
# (nothing changed) or a connection timeout (we should retry)
pass
# Other exceptions are thrown further, but a sensible background
# thread probably catches them rather than terminating.
class EtcdChangeWatcher(EtcdWatcher):
"""An etcd watcher framework that notifies only discrete changes
This deals with the start/resync/watch dilemmas and makes a single
'this key has changed' call regardless of what prompts it.
It does this by remembering what it's notified (via key_change) in
the past, and avoiding any notification that amounts to 'this key has
changed to the same value'. However, when we have to do a full resync
the ordering of key updates is not guaranteed.
"""
def __init__(self, etcd_client, name, watch_path, election_path=None,
wait_until_elected=False, recovery_time=5,
data=None, heartbeat=60):
self.implemented_state = {}
self.watch_path = watch_path
super(EtcdChangeWatcher, self).__init__(
etcd_client, name, watch_path, election_path,
wait_until_elected, recovery_time,
data, heartbeat)
def do_all_work(self):
"""Reimplement etcd state pending a change of some of it
Some undefined quantity of etcd's data has changed.
Work out what we've implemented, and re-implement the
remainder.
"""
# First, spot keys that went away
in_keys = set(self.etcd_data.keys())
impl_keys = set(self.implemented_state.keys())
deleted_keys = impl_keys - in_keys
new_keys = in_keys - impl_keys
existing_keys = in_keys & impl_keys
for k in deleted_keys:
# Note: this will change implemented_state
self.do_work('delete', k, None)
for k in new_keys:
self.do_work('add', k, self.etcd_data[k])
for k in existing_keys:
if self.implemented_state[k] != \
self.etcd_data[k]:
self.do_work('change', k, self.etcd_data[k])
def do_work(self, action, key, value):
"""Implement etcd state when it changes
This implements the state in VPP and notes the key/value
that VPP has implemented in self.implemented_state.
"""
self.key_change(action, key, value)
if action == 'delete':
try:
del self.implemented_state[key]
except KeyError:
# If for any reason we see a double-delete that's fine
pass
else:
self.implemented_state[key] = value
def key_change(self, action, key, value):
"""Called when a key changes from the known value
This can be because it's added, changed, refreshed or deleted.
This default implementation does not notify of a change at
the root. We assume only subkeys are interesting.
"""
short_key = self.make_short_key(key)
if short_key is not None:
LOG.debug("Watcher %s got %s on shortkey %s",
self.name, action, short_key)
if action == 'delete':
self.removed(short_key)
else:
self.added(short_key, value)
def removed(self, key):
"""Called when a key is deleted
The watch path is removed, leaving only the subpath.
"""
pass
def added(self, key, value):
"""Called when a key is added, changed, updated...
The watch path is removed, leaving only the subpath.
"""
pass
class EtcdHelper(object):
def __init__(self, client):
self.etcd_client = client
def clear_state(self, key_space):
"""Clear the keys in the key_space"""
LOG.debug("Clearing key space: %s", key_space)
try:
rv = self.etcd_client.read(key_space)
for child in rv.children:
self.etcd_client.delete(child.key)
except etcd.EtcdNotFile:
# Can't delete directories - they're harmless anyway
pass
def ensure_dir(self, path):
try:
self.etcd_client.write(path, None, dir=True)
except etcd.EtcdNotFile:
# Thrown when the directory already exists, which is fine
pass
def remove_dir(self, path):
try:
self.etcd_client.delete(path, dir=True)
except etcd.EtcdNotFile:
# Thrown if the directory is not empty, which we error log
LOG.error("Directory path:%s is not empty and cannot be deleted",
path)
except etcd.EtcdKeyNotFound:
# Already gone, so not a problem
pass
# Base connection to etcd, using standard options.
_etcd_conn_opts = [
cfg.StrOpt('etcd_host', default="127.0.0.1",
help=_("Etcd host IP address(es) to connect etcd client."
"It takes two formats: single IP/host or a multiple "
"hosts list with this format: 'IP:Port,IP:Port'. "
"e.g: 192.168.1.1:2379,192.168.1.2:2379. If port "
"is absent, etcd_port is used.")),
cfg.IntOpt('etcd_port', default=4001,
help=_("Etcd port to connect the etcd client. This can "
"be overridden on a per-host basis if the multiple "
"host form of etcd_host is used.")),
cfg.StrOpt('etcd_user', default=None,
help=_("Username for etcd authentication")),
cfg.StrOpt('etcd_pass', default=None, secret=True,
help=_("Password for etcd authentication")),
# TODO(ijw): make false default
cfg.BoolOpt('etcd_insecure_explicit_disable_https', default=True,
help=_("Use TLS to access etcd")),
cfg.StrOpt('etcd_ca_cert', default=None,
help=_("etcd CA certificate file path")),
]
def register_etcd_conn_opts(cfg, group):
global _etcd_conn_opts
cfg.register_opts(_etcd_conn_opts, group)
def list_opts():
"""Return config details for use with Oslo-config generator"""
return _etcd_conn_opts
class EtcdClientFactory(object):
def _parse_host(self, etc_host_elem, default_port):
"""Parse a single etcd host entry (which can be host or host/port)
Returns a format suitable for the etcd client creation call.
NB: the client call is documented to take one host, host/port
tuple or a tuple of host/port tuples; in fact, it will take
a bare host in the tuple form as well.
"""
if not isinstance(etc_host_elem, str) or etc_host_elem == '':
raise vpp_exceptions.InvalidEtcHostConfig()
if ETC_PORT_HOST_DELIMITER in etc_host_elem:
try:
host, port = etc_host_elem.split(ETC_PORT_HOST_DELIMITER)
port = int(port)
etc_host = (host, port,)
except ValueError:
raise vpp_exceptions.InvalidEtcHostConfig()
else:
etc_host = (etc_host_elem, default_port)
return etc_host
def _parse_host_config(self, etc_host, default_port):
"""Parse etcd host config (host, host/port, or list of host/port)
Returns a format suitable for the etcd client creation call.
This always uses the list-of-hosts tuple format, even with a single
host.
"""
if not isinstance(etc_host, str):
raise vpp_exceptions.InvalidEtcHostsConfig()
if ETC_HOSTS_DELIMITER in etc_host:
hosts = etc_host.split(ETC_HOSTS_DELIMITER)
else:
hosts = [etc_host]
etc_hosts = ()
for host in hosts:
etc_hosts = etc_hosts + (self._parse_host(host, default_port),)
return etc_hosts
def __init__(self, conf_group):
hostconf = self._parse_host_config(conf_group.etcd_host,
conf_group.etcd_port)
self.etcd_args = {
'host': hostconf,
'username': conf_group.etcd_user,
'password': conf_group.etcd_pass,
'allow_reconnect': True}
if not conf_group.etcd_insecure_explicit_disable_https:
if conf_group.etcd_ca_cert is None:
raise vpp_exceptions.InvalidEtcdCAConfig()
self.etcd_args['protocol'] = 'https'
self.etcd_args['ca_cert'] = conf_group.etcd_ca_cert
else:
LOG.warning("etcd is not using HTTPS, insecure setting")
def client(self):
etcd_client = etcd.Client(**self.etcd_args)
return etcd_client