Initial amphora status daemon

Initial status daemon. What's done:
sends udp to both ipv4 and ipv6
calculates hmac
has config file in json
initial command line arg parsing
a few unit tests

Not done:
config file changes
signal support
communication from API on amphora

Serious work in progress

Change-Id: I1e7759335ac43364d27e3176cf813c4f6ef549cb
This commit is contained in:
Alex Barclay 2014-12-18 16:06:34 -08:00
parent 6b511cc113
commit 968904ae48
13 changed files with 433 additions and 1 deletions

View File

@ -0,0 +1,76 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import json
import singleton
@singleton.singleton
class JSONFileConfig(collections.Mapping):
def __init__(self):
self.filename = None
self.conf = {}
self.observers = set()
""" Set the config filename and perform the first read
:param filename: a JSON file that contains the config
"""
def set_filename(self, filename):
self.filename = filename
self.read_config()
def __iter__(self):
return iter(self.conf)
def __getitem__(self, k):
return self.conf[k]
def __len__(self):
return len(self.conf)
""" Add a callable to be notified of config changes
:param obs: a callable to receive change events
"""
def add_observer(self, obs):
self.observers.add(obs)
""" Remove a callable to be notified of config changes
By design if the callable passed doesn't exist then just return
:param obs: a callable to attempt to remove
"""
def remove_observer(self, obs):
self.observers.discard(obs)
""" Force a reread of the config file and inform all observers
"""
def check_update(self):
self.read_config()
self.confirm_update()
def confirm_update(self):
for observer in self.observers:
observer()
def read_config(self):
if self.filename is None:
return
self.cfile = open(self.filename, 'r')
self.conf = json.load(self.cfile)

View File

@ -0,0 +1,65 @@
#! /usr/bin/env python
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import argparse
import sys
import time
import config
import health_sender
def run_sender():
sender = health_sender.UDPStatusSender()
cfg = config.JSONFileConfig()
sighup_received = False
seq = 0
while True:
if sighup_received:
print('re-reading config file')
sighup_received = False
cfg.check_update()
message = {'not the answer': 43,
'id': cfg['id'],
'seq': seq}
seq = seq + 1
sender.dosend(message)
time.sleep(cfg['delay'])
def parse_args():
parser = argparse.ArgumentParser(description='Health Sender Daemon')
parser.add_argument('-c', '--config', type=str, required=False,
help='config file path',
default='/etc/amphora/status_sender.json')
args = parser.parse_args()
return vars(args)
if __name__ == '__main__':
args = parse_args()
cfg = config.JSONFileConfig()
try:
cfg.set_filename(args['config'])
except IOError as exception:
print(exception)
sys.exit(1)
# Now start up the sender loop
run_sender()
sys.exit(0)

View File

@ -0,0 +1,54 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# TODO(barclaac) Need to decide how this hooks into rest of system,
# e.g. daemon, subprocess, thread etc.
import socket
import config
import status_message
class UDPStatusSender:
def __init__(self):
self.cfg = config.JSONFileConfig()
self.dests = {}
self.update(self.cfg['destination'], self.cfg['port'])
self.v4sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.v6sock = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
self.key = str(self.cfg['key'])
self.cfg.add_observer(self.config_change)
# TODO(barclaac) Still need to reread the address list if it gets changed
def config_change(self):
pass
def update(self, dest_list, port):
for dest in dest_list:
addrlist = socket.getaddrinfo(dest, port, 0, socket.SOCK_DGRAM)
# addrlist = [(family, socktype, proto, canonname, sockaddr) ...]
# e.g. 4 = sockaddr - what we actually need
for addr in addrlist:
self.dests[addr[4]] = addr
def dosend(self, envelope):
envelope_str = status_message.encode(envelope, self.key)
for dest in self.dests.itervalues():
# addrlist = [(family, socktype, proto, canonname, sockaddr) ...]
# e.g. 0 = sock family, 4 = sockaddr - what we actually need
if dest[0] == socket.AF_INET:
self.v4sock.sendto(envelope_str, dest[4])
elif dest[0] == socket.AF_INET6:
self.v6sock.sendto(envelope_str, dest[4])

View File

@ -0,0 +1,7 @@
{
"key": "asamplekey",
"delay": 2.5,
"destination": [ "::1", "127.1" ],
"port": 12345,
"id": "0dc47eda-872b-11e4-920b-000c294b76ae"
}

View File

@ -0,0 +1,26 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# TODO(barclaac) Someone needs to move this to be a library function for
# all of Octavia (Oslo even?)
def singleton(cls):
instances = {}
def getinstance():
if cls not in instances:
instances[cls] = cls()
return instances[cls]
return getinstance

View File

@ -0,0 +1,33 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import hmac
import json
def encode(msg, key):
result = {}
src = json.dumps(msg)
hmc = hmac.new(key, src, hashlib.sha1)
result['msg'] = msg
result['hmac'] = hmc.hexdigest()
return json.dumps(result)
def checkhmac(envelope_str, key):
envelope = json.loads(envelope_str)
src = json.dumps(envelope['msg'])
hmc = hmac.new(key, src, hashlib.sha1)
return hmc.hexdigest() == envelope['hmac']

View File

@ -0,0 +1,95 @@
# Copyright 2014 Hewlett-Packard Development-Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import tempfile
from testtools import matchers
from octavia.amphorae.backends.health_daemon import config
from octavia.tests.unit import base
class TestConfig(base.TestCase):
def setUp(self):
super(TestConfig, self).setUp()
self.setup_config_file()
self.addCleanup(self.remove_config_file)
def test_noconfig(self):
cfg = config.JSONFileConfig()
self.assertThat(lambda: cfg.set_filename('/doesnotexist'),
matchers.raises(IOError))
def test_config(self):
cfg = config.JSONFileConfig()
cfg.set_filename(self.sampleconfig[1])
# Check the singleton decorator
self.assertIs(cfg, config.JSONFileConfig())
cfg.add_observer(self.check_update)
self.update_called = False
cfg.check_update()
self.assertTrue(self.update_called)
self.assertIs(cfg['delay'], 10)
# First test - change the existing file - same file, no change
with open(self.sampleconfig[1], 'w+') as f:
cdata = {'delay': 5}
json.dump(cdata, f)
self.update_called = False
cfg.check_update()
self.assertTrue(self.update_called)
self.assertIs(cfg['delay'], 5)
# Check for removing an observer - Thanks Stephen
cfg.remove_observer(self.check_update)
self.update_called = False
cfg.check_update()
self.assertFalse(self.update_called)
# Better add it back for the next test
cfg.add_observer(self.check_update)
# Next, replace the file (new inode)
self.remove_config_file()
with open(self.sampleconfig[1], 'w+') as f:
cdata = {'delay': 3}
json.dump(cdata, f)
self.update_called = False
cfg.check_update()
self.assertTrue(self.update_called)
self.assertIs(cfg['delay'], 3)
def check_update(self):
self.assertFalse(self.update_called)
self.update_called = True
def setup_config_file(self):
self.sampleconfig = tempfile.mkstemp()
conffile = os.fdopen(self.sampleconfig[0], 'w+')
cdata = {'delay': 10}
json.dump(cdata, conffile)
conffile.close()
def remove_config_file(self):
os.unlink(self.sampleconfig[1])

View File

@ -0,0 +1,32 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from octavia.amphorae.backends.health_daemon import status_message
from octavia.tests.unit import base
class TestEnvelope(base.TestCase):
def setUp(self):
super(TestEnvelope, self).setUp()
def test_message_hmac(self):
statusMsg = {'seq': 42,
'status': 'OK',
'id': str(uuid.uuid4())}
sme = status_message.encode(statusMsg, 'samplekey1')
self.assertTrue(status_message.checkhmac(sme, 'samplekey1'))
self.assertFalse(status_message.checkhmac(sme, 'samplekey2'))

View File

@ -0,0 +1,42 @@
# Copyright 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import tempfile
from octavia.tests.unit import base
class TestSender(base.TestCase):
def setUp(self):
super(TestSender, self).setUp()
self.setupConfigFile()
self.addCleanup(self.removeConfigFile)
def setupConfigFile(self):
self.sampleconfig = tempfile.mkstemp()
conffile = os.fdopen(self.sampleconfig[0], 'w+')
cdata = {'delay': 10,
'target': ['127.0.0.1', '::1'],
'psk': 'fubar',
'dport': 12345}
json.dump(cdata, conffile)
conffile.close()
def removeConfigFile(self):
os.unlink(self.sampleconfig[1])
def test_message_output(self):
pass

View File

@ -20,7 +20,9 @@ commands = flake8
commands = python setup.py build_sphinx
[flake8]
ignore = None
# Ignoring O321 because it's unnecessarily restricting use of json package.
# jsonutils version doesn't add additional value
ignore = O321
show-source = true
builtins = _
exclude = .venv,.git,.tox,dist,doc,*openstack/common*,*lib/python*,*egg,build,tools,.ropeproject,rally-scenarios