
Six is Python 2 and 3 compatibility. It is no longer needed now that we're purely py3. Change-Id: I28a6f8539b3b3ce35221728ec631380a18c921b3
901 lines
32 KiB
Python
901 lines
32 KiB
Python
# Copyright (c) 2016 Cisco Systems, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from abc import ABC, abstractmethod
|
|
import atexit
|
|
import etcd
|
|
import eventlet
|
|
import eventlet.semaphore
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
import re
|
|
import time
|
|
import typing # noqa
|
|
from urllib3.exceptions import TimeoutError as UrllibTimeoutError
|
|
import uuid
|
|
|
|
from networking_vpp._i18n import _
|
|
from networking_vpp import exceptions as vpp_exceptions
|
|
from networking_vpp import jwt_agent
|
|
|
|
from oslo_serialization import jsonutils
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
ETC_HOSTS_DELIMITER = ','
|
|
ETC_PORT_HOST_DELIMITER = ':'
|
|
|
|
|
|
class EtcdWriter(ABC):
|
|
|
|
@abstractmethod
|
|
def _process_read_value(self, key, value):
|
|
"""Turn a string from etcd into a value in the style we prefer
|
|
|
|
May parse, validate and/or otherwise modify the result.
|
|
"""
|
|
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _process_written_value(self, key, value):
|
|
"""Turn a value into a serialised string to store in etcd
|
|
|
|
May serialise, sign and/or otherwise modify the result.
|
|
"""
|
|
pass
|
|
|
|
def __init__(self, etcd_client):
|
|
self.etcd_client = etcd_client
|
|
|
|
def write(self, key, data, *args, **kwargs):
|
|
"""Serialise and write a single data key in etcd.
|
|
|
|
The value is received as a Python data structure and stored in
|
|
a conventional format (serialised to JSON, in this class
|
|
instance).
|
|
|
|
"""
|
|
data = self._process_written_value(key, data)
|
|
|
|
self.etcd_client.write(key, data, *args, **kwargs)
|
|
|
|
def read(self, *args, **kwargs):
|
|
"""Watch and parse a data key in etcd.
|
|
|
|
The value is stored in a conventional format (serialised
|
|
somehow) and we parse it to Python. This may throw an
|
|
exception if the data cannot be read when .value is called on
|
|
any part of the result.
|
|
|
|
"""
|
|
return ParsedEtcdResult(self,
|
|
self.etcd_client.read(*args, **kwargs))
|
|
|
|
def watch(self, *args, **kwargs):
|
|
"""Watch and parse a data key in etcd.
|
|
|
|
The value is stored in a conventional format (serialised
|
|
somehow) and we parse it to Python. This may throw an
|
|
exception if the data cannot be read when .value is called on
|
|
any part of the result.
|
|
|
|
"""
|
|
|
|
return ParsedEtcdResult(self,
|
|
self.etcd_client.watch(*args, **kwargs))
|
|
|
|
def delete(self, key):
|
|
# NB there's no way of clearly indicating that a delete is
|
|
# legitimate - the parser has no role in it, here.
|
|
|
|
# We do augment the delete to delete more throroughly. This
|
|
# should probably be a mixin.
|
|
try:
|
|
self.etcd_client.delete(key)
|
|
except etcd.EtcdNotFile:
|
|
# We are asked to delete a directory as in the
|
|
# case of GPE where the empty mac address directory
|
|
# needs deletion after the key (IP) has been deleted.
|
|
try:
|
|
# TODO(ijw): define semantics: recursive delete?
|
|
self.etcd_client.delete(key, dir=True)
|
|
except Exception: # pass any exceptions
|
|
pass
|
|
except etcd.EtcdKeyNotFound:
|
|
# The key may have already been deleted
|
|
# no problem here
|
|
pass
|
|
|
|
|
|
def _unchanged(name):
|
|
def pt_get(self):
|
|
return getattr(self._result, name)
|
|
|
|
return property(pt_get)
|
|
|
|
|
|
class ParsedEtcdResult(etcd.EtcdResult):
|
|
"""Parsed version of an EtcdResult
|
|
|
|
An equivalent to EtcdResult that processes its values using the
|
|
parser that the reading class implements.
|
|
"""
|
|
|
|
def __init__(self, reader, result):
|
|
|
|
self._reader = reader
|
|
self._result = result
|
|
|
|
# A whole set of result properties are as they are on the original
|
|
# result.
|
|
|
|
key = _unchanged('key')
|
|
expiration = _unchanged('expiration')
|
|
ttl = _unchanged('ttl')
|
|
modifiedIndex = _unchanged('modifiedIndex')
|
|
createdIndex = _unchanged('createdIndex')
|
|
newKey = _unchanged('newKey')
|
|
dir = _unchanged('dir')
|
|
etcd_index = _unchanged('etcd_index')
|
|
raft_index = _unchanged('raft_index')
|
|
action = _unchanged('action')
|
|
|
|
# support iteration over result's children
|
|
children = _unchanged('children')
|
|
|
|
# used internally by etcd.client.Client
|
|
_prev_node = _unchanged('_prev_node')
|
|
|
|
# The one special case, where we need to change things
|
|
@property
|
|
def value(self):
|
|
return self._reader._process_read_value(self._result.key,
|
|
self._result.value)
|
|
|
|
def get_subtree(self, *args, **kwargs):
|
|
for f in self._result.get_subtree(*args, **kwargs):
|
|
# This returns a value which may itself have a subtree
|
|
# depending on args passed
|
|
|
|
return ParsedEtcdResult(self._reader, f)
|
|
|
|
# We know a bit too much about the internals of EtcdResult, but
|
|
# given that, we know that the internals all work from .value or
|
|
# .get_subtree() and the rest of the calls should use parsed
|
|
# result values.
|
|
|
|
|
|
def json_writer(etcd_client):
|
|
if cfg.CONF.ml2_vpp.jwt_signing:
|
|
return SignedEtcdJSONWriter(etcd_client)
|
|
else:
|
|
return EtcdJSONWriter(etcd_client)
|
|
|
|
|
|
class EtcdJSONWriter(EtcdWriter):
|
|
"""Write Python datastructures to etcd in a consistent form.
|
|
|
|
This takes values (typically as Python datastructures) and
|
|
converts them using JSON serialisation to a form that can be
|
|
stored in etcd, and vice versa.
|
|
|
|
"""
|
|
|
|
def __init__(self, etcd_client):
|
|
super(EtcdJSONWriter, self).__init__(etcd_client)
|
|
|
|
def _process_read_value(self, key, value):
|
|
value = jsonutils.loads(value)
|
|
return value
|
|
|
|
def _process_written_value(self, key, value):
|
|
return jsonutils.dumps(value)
|
|
|
|
|
|
class SignedEtcdJSONWriter(EtcdJSONWriter):
|
|
"""Write Python datastructures to etcd in a consistent form.
|
|
|
|
This takes values (typically as Python datastructures) and
|
|
converts them using JSON serialisation to a form that can be
|
|
stored in etcd, and vice versa.
|
|
|
|
"""
|
|
|
|
def __init__(self, etcd_client):
|
|
self.jwt_agent = jwt_agent.JWTUtils(
|
|
cfg.CONF.ml2_vpp.jwt_node_cert,
|
|
cfg.CONF.ml2_vpp.jwt_node_private_key,
|
|
cfg.CONF.ml2_vpp.jwt_ca_cert,
|
|
cfg.CONF.ml2_vpp.jwt_controller_name_pattern)
|
|
super(SignedEtcdJSONWriter, self).__init__(etcd_client)
|
|
|
|
def _process_read_value(self, key, value):
|
|
value = jsonutils.loads(value)
|
|
if (self.jwt_agent.should_path_be_signed(key)):
|
|
signerNodeName = self.jwt_agent.get_signer_name(key)
|
|
value = self.jwt_agent.verify(signerNodeName,
|
|
key,
|
|
value)
|
|
return value
|
|
|
|
def _process_written_value(self, key, value):
|
|
if (self.jwt_agent.should_path_be_signed(key)):
|
|
value = self.jwt_agent.sign(key, value)
|
|
return jsonutils.dumps(value)
|
|
|
|
|
|
elector_cleanup = [] # type: typing.List[EtcdElection]
|
|
|
|
|
|
@atexit.register
|
|
def cleanup_electors():
|
|
for f in elector_cleanup:
|
|
f.clean()
|
|
|
|
|
|
class EtcdElectionLost(Exception):
|
|
pass
|
|
|
|
|
|
class EtcdElection(object):
|
|
def __init__(self, etcd_client, name, election_path,
|
|
work_time,
|
|
recovery_time=5,
|
|
multi_worker_ok=False):
|
|
"""Participant in a leader election via etcd datastore.
|
|
|
|
etcd_client: the client handle for dealing with etcd
|
|
|
|
name: the category name - we elect one leader of this type
|
|
|
|
election_path: the location where we conduct elections in etcd
|
|
|
|
work_time: the typical time the leader spends doing work.
|
|
It remains elected for this long without conducting another
|
|
election.
|
|
|
|
recovery_time: the time, after we're certain the leader has
|
|
stopped doing work, that is the longest we want to wait before
|
|
someone else takes over if the leader has died (Note that
|
|
this means you can be without a working leader for work_time
|
|
+ recovery_time if the leader crashes just after winning the
|
|
election)
|
|
|
|
multi_worker_ok: True if you'd prefer to favour having at least
|
|
one elected leader over having no more than one elected leader.
|
|
Typically this will cause a second leader to start working even
|
|
if the original still believes it's elected, and is useful if
|
|
that's more likely to reduce pauses.
|
|
"""
|
|
|
|
self.etcd_client = etcd_client
|
|
self.name = name
|
|
# A unique value that identifies each worker thread
|
|
self.thread_id = str(uuid.uuid4())
|
|
# Sleeping threads wake up after this time and
|
|
# check if a master is alive and one of them will become the master
|
|
# if the current master key has expired
|
|
self.recovery_time = recovery_time
|
|
|
|
# Threads hold the lock for this lng because this is the most
|
|
# work they will do.
|
|
self.work_time = work_time
|
|
self.master_key = election_path + "/master_%s" % self.name
|
|
|
|
# We recommend you configure these log levels
|
|
# etcd_log = logging.getLogger('etcd.client')
|
|
# etcd_log.setLevel(logging.logging.WARNING)
|
|
# LOG.setLevel(logging.logging.INFO)
|
|
|
|
global elector_cleanup
|
|
elector_cleanup.append(self)
|
|
|
|
def wait_until_elected(self):
|
|
"""Wait indefinitely until we are the only master among a pool of workers.
|
|
|
|
Election Algorithm:-
|
|
1) Each worker thread is assigned a unique thread_id at launch time.
|
|
2) All threads start the election process by running this method.
|
|
3) An etcd master key, whose value equals its thread_id, with a TTL
|
|
equal to the work_time, controls the master election process.
|
|
3) The thread that first succeeds in atomically writing its ID
|
|
to the etcd master key, becomes the master. The
|
|
remaining threads and go to sleep after a master has been elected.
|
|
4) The master thread then breaks out of the election loop and
|
|
starts doing work. It periodically refreshes the TTL value
|
|
of its key in etcd to let other threads know that it is alive.
|
|
It is rather important that it's work takes less than the
|
|
heartbeat time *if* there must be only one thread running.
|
|
It is not so much of a concern if there can be multiple threads
|
|
running and this is just to keep the active thread count down.
|
|
5) The sleeping threads periodically wake up every recovery_time
|
|
to check if the master is alive. If the master key is absent,
|
|
in etcd, they trigger a re-election and elect a new master,
|
|
which begins doing the work.
|
|
|
|
"""
|
|
attempt = 0
|
|
while True:
|
|
attempt += 1
|
|
try:
|
|
# Attempt to become master
|
|
self.etcd_client.write(self.master_key,
|
|
self.thread_id,
|
|
prevExist=False,
|
|
ttl=self.work_time)
|
|
LOG.debug("new master for %s threads is thread_id %s",
|
|
self.name, self.thread_id)
|
|
# if successful, the master breaks to start doing work
|
|
break
|
|
# An etcdException means some other thread has already become
|
|
# the master
|
|
except etcd.EtcdException:
|
|
try:
|
|
# We may already be the master. Extend the election time.
|
|
self.extend_election(self.work_time)
|
|
|
|
LOG.debug('Thread %s refreshed master for %s for try %d',
|
|
self.thread_id, self.name, attempt)
|
|
break
|
|
# All non-master threads will end up here, watch for
|
|
# recovery_time (in case some etcd connection fault means
|
|
# we don't get a watch notify) and become master if the
|
|
# current master is dead
|
|
except EtcdElectionLost:
|
|
try:
|
|
with eventlet.Timeout(self.recovery_time + 1, False):
|
|
# Most threads will be waiting here.
|
|
self.etcd_client.watch(
|
|
self.master_key,
|
|
timeout=self.recovery_time)
|
|
except (etcd.EtcdWatchTimedOut, UrllibTimeoutError):
|
|
pass
|
|
except etcd.EtcdException:
|
|
eventlet.sleep(self.recovery_time)
|
|
|
|
def extend_election(self, duration):
|
|
"""Assuming we are the master, attempt to extend our election time."""
|
|
|
|
try:
|
|
self.etcd_client.write(self.master_key,
|
|
self.thread_id,
|
|
prevValue=self.thread_id,
|
|
ttl=duration)
|
|
LOG.debug("Master thread %s extended election by %d secs",
|
|
self.thread_id, duration)
|
|
except etcd.EtcdException:
|
|
raise EtcdElectionLost()
|
|
|
|
def clean(self):
|
|
"""Release the election lock if we're currently elected.
|
|
|
|
This happens on process exit to speed up the re-election.
|
|
"""
|
|
try:
|
|
self.etcd_client.delete(self.master_key,
|
|
self.thread_id,
|
|
prevValue=self.thread_id)
|
|
except etcd.EtcdException:
|
|
pass
|
|
|
|
|
|
class EtcdWatcher(ABC):
|
|
# There's a thread election here because we want to keep the number
|
|
# of equivalent watcher threads down as we are generally running
|
|
# with multiple processes.
|
|
|
|
# NB: worst case time between ticks is heartbeat + DANGER_PAUSE seconds
|
|
# or the length of a read (which should be fast)
|
|
|
|
# We wait this long after an etcd exception in watch_forever, in
|
|
# case it's going wrong repeatedly. This prevents a runaway CPU
|
|
# eater.
|
|
DANGER_PAUSE = 2
|
|
|
|
def __init__(self, etcd_client, name, watch_path, election_path=None,
|
|
wait_until_elected=False, recovery_time=5,
|
|
data=None, heartbeat=60):
|
|
|
|
# NB: heartbeat + recovery + DANGER_PAUSE + whatever work you do is
|
|
# the loop total time. This is important if we're going to
|
|
# do elections and we need to allow this quantity of time
|
|
# before the election lapses.
|
|
|
|
self.etcd_client = etcd_client
|
|
self.tick = None
|
|
self.name = name
|
|
self.watch_path = watch_path
|
|
self.data = data
|
|
self.heartbeat = heartbeat
|
|
|
|
# The wait_until_elected is a switch that controls whether the
|
|
# threads need to wait to do work until elected. Note that the agent
|
|
# watcher threads do not require waiting for an election and as a
|
|
# result, the this is set to False
|
|
if wait_until_elected:
|
|
work_time = heartbeat + self.DANGER_PAUSE + recovery_time
|
|
|
|
self.etcd_elector = EtcdElection(etcd_client, name, election_path,
|
|
work_time=work_time,
|
|
recovery_time=recovery_time)
|
|
else:
|
|
self.etcd_elector = None
|
|
|
|
# Explicitly a semaphore, because we don't monkey patch 'threading'
|
|
# in the agent and can't use threading.Lock
|
|
self.etcd_data_lock = eventlet.semaphore.Semaphore()
|
|
self.etcd_data = None
|
|
|
|
# Get the initial state of etcd.
|
|
self.expected_keys = self.init_resync_start()
|
|
self.refresh_all_data()
|
|
|
|
# NB needs the lock to run safely.
|
|
def short_keys():
|
|
for f in self.etcd_data.keys():
|
|
short_key = self.make_short_key(f)
|
|
if short_key:
|
|
yield short_key
|
|
|
|
with self.etcd_data_lock:
|
|
self.init_resync_end(short_keys())
|
|
|
|
def make_short_key(self, key):
|
|
# TODO(ijw) .startswith would be more sensible
|
|
m = re.match('^' + re.escape(self.watch_path) + '/(.*)$', key)
|
|
if m:
|
|
return m.group(1)
|
|
else:
|
|
return None
|
|
|
|
def init_resync_start(self):
|
|
"""Overrideable function when the first resync starts
|
|
|
|
Whatever is being driven by the etcd data, this is a good time
|
|
to find out what state it's currently in. It may be
|
|
persisting data over a restart of this process watching etcd,
|
|
so things that are no longer in etcd (for which we won't have
|
|
seen deletes in the downtime) are now gone. This is only
|
|
needed initially; if a resync is required while we're running,
|
|
we've tracked the etcd content and we know what has gone away.
|
|
|
|
Returns: None for no cleanup, or a set of keys expected
|
|
"""
|
|
return None
|
|
|
|
def init_resync_end(self, short_keys):
|
|
"""Clean up stale data
|
|
|
|
When we start up, we need to bring controlled elements
|
|
into line with what etcd wants. We've already gone
|
|
through the keys in etcd that currently exist and made
|
|
sure that the they are correctly configured, but we now
|
|
need to remove any structures that correspond to items
|
|
no longer in etcd.
|
|
|
|
This may be overridden to add functionality.
|
|
|
|
short_keys - an iterator for all the keys found in etcd
|
|
"""
|
|
|
|
if self.expected_keys is None:
|
|
# Resync has not been implemented for this
|
|
# TODO(ijw): we should make it mandatory for resync
|
|
# which means expected_resync_start will become
|
|
# abstract.
|
|
return
|
|
|
|
stale_keys = set(self.expected_keys) - set(short_keys)
|
|
for f in stale_keys:
|
|
self.removed(f)
|
|
|
|
def do_work(self, action, key, value):
|
|
"""Process an indiviudal update received in a watch
|
|
|
|
Override this if you can deal with individual updates given
|
|
their location. Leave it if all updates involve rereading all
|
|
downloaded data.
|
|
|
|
etcd_data is current when called and will not change during
|
|
the call.
|
|
"""
|
|
self.do_all_work()
|
|
|
|
@abstractmethod
|
|
def do_all_work(self):
|
|
"""Process all updates from a refreshing read or a watch
|
|
|
|
This may happen on startup, on lost history or if the reader has no
|
|
better way to behave than checking all data.
|
|
|
|
etcd_data is current when called and will not change during
|
|
the call.
|
|
"""
|
|
pass
|
|
|
|
def do_tick(self):
|
|
"""Do background tasks that can happen between etcd updates.
|
|
|
|
Will be called once per (heartbeat + result processing time)
|
|
"""
|
|
pass
|
|
|
|
def refresh_all_data(self):
|
|
"""Load the entirety of the data we're watching from etcd.
|
|
|
|
This is used on initialisation and when history is lost. It
|
|
also causes work for keys that changed.
|
|
|
|
"""
|
|
|
|
LOG.debug("%s: resyncing in full", self.name)
|
|
rv = self.etcd_client.read(self.watch_path,
|
|
recursive=True)
|
|
|
|
with self.etcd_data_lock:
|
|
self.etcd_data = {}
|
|
for f in rv.children:
|
|
self.etcd_data[f.key] = f.value
|
|
|
|
self.do_all_work()
|
|
|
|
# TODO(ijw): there's a better number to use here
|
|
self.tick = rv.etcd_index + 1
|
|
|
|
LOG.debug("%s watch index recovered: %s",
|
|
self.name, rv.modifiedIndex)
|
|
|
|
def watch_forever(self):
|
|
"""Watch a keyspace forevermore
|
|
|
|
This may not exit - if there are errors they're logged (and in case
|
|
they are persistent we pause).
|
|
"""
|
|
|
|
while True:
|
|
try:
|
|
self.do_tick()
|
|
if self.etcd_elector:
|
|
self.etcd_elector.wait_until_elected()
|
|
self.do_watch()
|
|
except Exception:
|
|
LOG.exception('%s: etcd threw exception',
|
|
self.name)
|
|
# In case of a dead etcd causing continuous
|
|
# exceptions, the pause here avoids eating all the
|
|
# CPU
|
|
time.sleep(self.DANGER_PAUSE)
|
|
|
|
def do_watch(self):
|
|
"""Watch a keyspace
|
|
|
|
This will conduct one watch or one read.
|
|
"""
|
|
|
|
try:
|
|
LOG.debug("%s: pausing", self.name)
|
|
|
|
try:
|
|
if self.tick is None:
|
|
# We have no state, so we have effectively
|
|
# 'fallen off of the history' and need to
|
|
# resync in full.
|
|
raise etcd.EtcdEventIndexCleared()
|
|
# Current versions of python-etcd use the timeout in
|
|
# interesting ways. Slow URL connections + no data
|
|
# to return can lead to timeouts much longer than you
|
|
# might expect. So we watch for a timeout for
|
|
# ourselves as well.
|
|
|
|
# Here, we use both etcd Client timeout and eventlet timeout
|
|
# As etcd.Client timeout is not reliable, enforce
|
|
# the timeout with eventlet.Timeout
|
|
# Most of the time, we timeout thanks to etcd client,
|
|
# if we timeout due to eventlet, we have an ugly error message
|
|
rv = None
|
|
with eventlet.Timeout(self.heartbeat + 5, False):
|
|
rv = self.etcd_client.watch(self.watch_path,
|
|
recursive=True,
|
|
index=self.tick,
|
|
timeout=self.heartbeat)
|
|
|
|
if rv:
|
|
with self.etcd_data_lock:
|
|
|
|
# The processing function is entitled to check all etcd
|
|
# data. Update it before we call the processor.
|
|
if rv.action == 'delete':
|
|
self.etcd_data.pop(rv.key, None)
|
|
else:
|
|
self.etcd_data[rv.key] = rv.value
|
|
|
|
# We can, in the case of a watch, hint at where the
|
|
# update went.
|
|
|
|
try:
|
|
self.do_work(rv.action, rv.key, rv.value)
|
|
except Exception:
|
|
LOG.exception(('%s key %s value %s could'
|
|
'not be processed')
|
|
% (rv.action, rv.key, rv.value))
|
|
# TODO(ijw) raise or not raise? This is probably
|
|
# fatal and incurable, because we will only repeat
|
|
# the action on the next round.
|
|
raise
|
|
|
|
# Update the tick only when all the above completes so that
|
|
# exceptions don't cause the count to skip before the data
|
|
# is processed
|
|
self.tick = rv.modifiedIndex + 1
|
|
|
|
except etcd.EtcdEventIndexCleared:
|
|
# We can't follow etcd history in teaspoons, so
|
|
# grab the current state in its entirety.
|
|
|
|
self.refresh_all_data()
|
|
|
|
except (etcd.EtcdWatchTimedOut, UrllibTimeoutError):
|
|
# This is normal behaviour, indicating either a watch timeout
|
|
# (nothing changed) or a connection timeout (we should retry)
|
|
pass
|
|
|
|
# Other exceptions are thrown further, but a sensible background
|
|
# thread probably catches them rather than terminating.
|
|
|
|
|
|
class EtcdChangeWatcher(EtcdWatcher):
|
|
"""An etcd watcher framework that notifies only discrete changes
|
|
|
|
This deals with the start/resync/watch dilemmas and makes a single
|
|
'this key has changed' call regardless of what prompts it.
|
|
It does this by remembering what it's notified (via key_change) in
|
|
the past, and avoiding any notification that amounts to 'this key has
|
|
changed to the same value'. However, when we have to do a full resync
|
|
the ordering of key updates is not guaranteed.
|
|
"""
|
|
|
|
def __init__(self, etcd_client, name, watch_path, election_path=None,
|
|
wait_until_elected=False, recovery_time=5,
|
|
data=None, heartbeat=60):
|
|
self.implemented_state = {}
|
|
self.watch_path = watch_path
|
|
|
|
super(EtcdChangeWatcher, self).__init__(
|
|
etcd_client, name, watch_path, election_path,
|
|
wait_until_elected, recovery_time,
|
|
data, heartbeat)
|
|
|
|
def do_all_work(self):
|
|
"""Reimplement etcd state pending a change of some of it
|
|
|
|
Some undefined quantity of etcd's data has changed.
|
|
Work out what we've implemented, and re-implement the
|
|
remainder.
|
|
"""
|
|
|
|
# First, spot keys that went away
|
|
|
|
in_keys = set(self.etcd_data.keys())
|
|
impl_keys = set(self.implemented_state.keys())
|
|
|
|
deleted_keys = impl_keys - in_keys
|
|
new_keys = in_keys - impl_keys
|
|
existing_keys = in_keys & impl_keys
|
|
|
|
for k in deleted_keys:
|
|
# Note: this will change implemented_state
|
|
self.do_work('delete', k, None)
|
|
|
|
for k in new_keys:
|
|
self.do_work('add', k, self.etcd_data[k])
|
|
|
|
for k in existing_keys:
|
|
if self.implemented_state[k] != \
|
|
self.etcd_data[k]:
|
|
self.do_work('change', k, self.etcd_data[k])
|
|
|
|
def do_work(self, action, key, value):
|
|
"""Implement etcd state when it changes
|
|
|
|
This implements the state in VPP and notes the key/value
|
|
that VPP has implemented in self.implemented_state.
|
|
"""
|
|
self.key_change(action, key, value)
|
|
if action == 'delete':
|
|
try:
|
|
del self.implemented_state[key]
|
|
except KeyError:
|
|
# If for any reason we see a double-delete that's fine
|
|
pass
|
|
else:
|
|
self.implemented_state[key] = value
|
|
|
|
def key_change(self, action, key, value):
|
|
"""Called when a key changes from the known value
|
|
|
|
This can be because it's added, changed, refreshed or deleted.
|
|
This default implementation does not notify of a change at
|
|
the root. We assume only subkeys are interesting.
|
|
"""
|
|
short_key = self.make_short_key(key)
|
|
|
|
if short_key is not None:
|
|
|
|
LOG.debug("Watcher %s got %s on shortkey %s",
|
|
self.name, action, short_key)
|
|
|
|
if action == 'delete':
|
|
self.removed(short_key)
|
|
else:
|
|
self.added(short_key, value)
|
|
|
|
def removed(self, key):
|
|
"""Called when a key is deleted
|
|
|
|
The watch path is removed, leaving only the subpath.
|
|
"""
|
|
pass
|
|
|
|
def added(self, key, value):
|
|
"""Called when a key is added, changed, updated...
|
|
|
|
The watch path is removed, leaving only the subpath.
|
|
"""
|
|
pass
|
|
|
|
|
|
class EtcdHelper(object):
|
|
|
|
def __init__(self, client):
|
|
self.etcd_client = client
|
|
|
|
def clear_state(self, key_space):
|
|
"""Clear the keys in the key_space"""
|
|
LOG.debug("Clearing key space: %s", key_space)
|
|
try:
|
|
rv = self.etcd_client.read(key_space)
|
|
for child in rv.children:
|
|
self.etcd_client.delete(child.key)
|
|
except etcd.EtcdNotFile:
|
|
# Can't delete directories - they're harmless anyway
|
|
pass
|
|
|
|
def ensure_dir(self, path):
|
|
try:
|
|
self.etcd_client.write(path, None, dir=True)
|
|
except etcd.EtcdNotFile:
|
|
# Thrown when the directory already exists, which is fine
|
|
pass
|
|
|
|
def remove_dir(self, path):
|
|
try:
|
|
self.etcd_client.delete(path, dir=True)
|
|
except etcd.EtcdNotFile:
|
|
# Thrown if the directory is not empty, which we error log
|
|
LOG.error("Directory path:%s is not empty and cannot be deleted",
|
|
path)
|
|
except etcd.EtcdKeyNotFound:
|
|
# Already gone, so not a problem
|
|
pass
|
|
|
|
|
|
# Base connection to etcd, using standard options.
|
|
_etcd_conn_opts = [
|
|
cfg.StrOpt('etcd_host', default="127.0.0.1",
|
|
help=_("Etcd host IP address(es) to connect etcd client."
|
|
"It takes two formats: single IP/host or a multiple "
|
|
"hosts list with this format: 'IP:Port,IP:Port'. "
|
|
"e.g: 192.168.1.1:2379,192.168.1.2:2379. If port "
|
|
"is absent, etcd_port is used.")),
|
|
cfg.IntOpt('etcd_port', default=4001,
|
|
help=_("Etcd port to connect the etcd client. This can "
|
|
"be overridden on a per-host basis if the multiple "
|
|
"host form of etcd_host is used.")),
|
|
cfg.StrOpt('etcd_user', default=None,
|
|
help=_("Username for etcd authentication")),
|
|
cfg.StrOpt('etcd_pass', default=None, secret=True,
|
|
help=_("Password for etcd authentication")),
|
|
# TODO(ijw): make false default
|
|
cfg.BoolOpt('etcd_insecure_explicit_disable_https', default=True,
|
|
help=_("Use TLS to access etcd")),
|
|
cfg.StrOpt('etcd_ca_cert', default=None,
|
|
help=_("etcd CA certificate file path")),
|
|
]
|
|
|
|
|
|
def register_etcd_conn_opts(cfg, group):
|
|
global _etcd_conn_opts
|
|
cfg.register_opts(_etcd_conn_opts, group)
|
|
|
|
|
|
def list_opts():
|
|
"""Return config details for use with Oslo-config generator"""
|
|
return _etcd_conn_opts
|
|
|
|
|
|
class EtcdClientFactory(object):
|
|
def _parse_host(self, etc_host_elem, default_port):
|
|
"""Parse a single etcd host entry (which can be host or host/port)
|
|
|
|
Returns a format suitable for the etcd client creation call.
|
|
NB: the client call is documented to take one host, host/port
|
|
tuple or a tuple of host/port tuples; in fact, it will take
|
|
a bare host in the tuple form as well.
|
|
"""
|
|
|
|
if not isinstance(etc_host_elem, str) or etc_host_elem == '':
|
|
raise vpp_exceptions.InvalidEtcHostConfig()
|
|
|
|
if ETC_PORT_HOST_DELIMITER in etc_host_elem:
|
|
try:
|
|
host, port = etc_host_elem.split(ETC_PORT_HOST_DELIMITER)
|
|
port = int(port)
|
|
etc_host = (host, port,)
|
|
except ValueError:
|
|
raise vpp_exceptions.InvalidEtcHostConfig()
|
|
else:
|
|
etc_host = (etc_host_elem, default_port)
|
|
|
|
return etc_host
|
|
|
|
def _parse_host_config(self, etc_host, default_port):
|
|
"""Parse etcd host config (host, host/port, or list of host/port)
|
|
|
|
Returns a format suitable for the etcd client creation call.
|
|
This always uses the list-of-hosts tuple format, even with a single
|
|
host.
|
|
"""
|
|
|
|
if not isinstance(etc_host, str):
|
|
raise vpp_exceptions.InvalidEtcHostsConfig()
|
|
|
|
if ETC_HOSTS_DELIMITER in etc_host:
|
|
hosts = etc_host.split(ETC_HOSTS_DELIMITER)
|
|
else:
|
|
hosts = [etc_host]
|
|
|
|
etc_hosts = ()
|
|
for host in hosts:
|
|
etc_hosts = etc_hosts + (self._parse_host(host, default_port),)
|
|
|
|
return etc_hosts
|
|
|
|
def __init__(self, conf_group):
|
|
hostconf = self._parse_host_config(conf_group.etcd_host,
|
|
conf_group.etcd_port)
|
|
|
|
self.etcd_args = {
|
|
'host': hostconf,
|
|
'username': conf_group.etcd_user,
|
|
'password': conf_group.etcd_pass,
|
|
'allow_reconnect': True}
|
|
|
|
if not conf_group.etcd_insecure_explicit_disable_https:
|
|
if conf_group.etcd_ca_cert is None:
|
|
raise vpp_exceptions.InvalidEtcdCAConfig()
|
|
|
|
self.etcd_args['protocol'] = 'https'
|
|
self.etcd_args['ca_cert'] = conf_group.etcd_ca_cert
|
|
|
|
else:
|
|
LOG.warning("etcd is not using HTTPS, insecure setting")
|
|
|
|
def client(self):
|
|
etcd_client = etcd.Client(**self.etcd_args)
|
|
|
|
return etcd_client
|