Merge "Implementing EventStreamer"
This commit is contained in:
commit
a73aae6117
|
@ -46,6 +46,13 @@
|
|||
# health_check_interval = 3
|
||||
# sock_rlimit = 0
|
||||
|
||||
# EventStreamer options are
|
||||
# queue_event_streamer,
|
||||
# noop_event_streamer
|
||||
# event_streamer_driver = noop_event_streamer
|
||||
|
||||
|
||||
|
||||
[keystone_authtoken]
|
||||
# auth_uri = https://localhost:5000/v3
|
||||
# admin_user = octavia
|
||||
|
@ -175,6 +182,9 @@
|
|||
# Topic (i.e. Queue) Name
|
||||
# topic = octavia_prov
|
||||
|
||||
# Topic for octavia's events sent to a queue
|
||||
# event_stream_topic = neutron_lbaas_event
|
||||
|
||||
[house_keeping]
|
||||
# Interval in seconds to initiate spare amphora checks
|
||||
# spare_check_interval = 30
|
||||
|
|
|
@ -19,7 +19,7 @@ from octavia.amphorae.drivers import driver_base as driver_base
|
|||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LoggingMixIn(driver_base.HealthMixin, driver_base.StatsMixin):
|
||||
class LoggingUpdate(object):
|
||||
def update_stats(self, stats):
|
||||
LOG.debug("Amphora %s no-op, update stats %s",
|
||||
self.__class__.__name__, stats)
|
||||
|
|
|
@ -22,8 +22,7 @@ from oslo_reports import guru_meditation_report as gmr
|
|||
from octavia.amphorae.drivers.health import heartbeat_udp
|
||||
from octavia.common import service
|
||||
from octavia.controller.healthmanager import health_manager
|
||||
from octavia.controller.healthmanager import update_health_mixin
|
||||
from octavia.controller.healthmanager import update_stats_mixin
|
||||
from octavia.controller.healthmanager import update_db
|
||||
from octavia.i18n import _LI
|
||||
from octavia import version
|
||||
|
||||
|
@ -36,8 +35,8 @@ CONF.import_group('health_manager', 'octavia.common.config')
|
|||
def hm_listener():
|
||||
# TODO(german): steved'or load those drivers
|
||||
udp_getter = heartbeat_udp.UDPStatusGetter(
|
||||
update_health_mixin.UpdateHealthMixin(),
|
||||
update_stats_mixin.UpdateStatsMixin())
|
||||
update_db.UpdateHealthDb(),
|
||||
update_db.UpdateStatsDb())
|
||||
while True:
|
||||
udp_getter.check()
|
||||
|
||||
|
|
|
@ -143,11 +143,20 @@ healthmanager_opts = [
|
|||
default=[]),
|
||||
cfg.IntOpt('heartbeat_interval',
|
||||
default=10,
|
||||
help=_('Sleep time between sending hearthbeats.'))
|
||||
]
|
||||
help=_('Sleep time between sending hearthbeats.')),
|
||||
cfg.StrOpt('event_streamer_driver',
|
||||
help=_('Specifies which driver to use for the event_streamer '
|
||||
'for syncing the octavia and neutron_lbaas dbs. If you '
|
||||
'don\'t need to sync the database or are running '
|
||||
'octavia in stand alone mode use the '
|
||||
'noop_event_streamer'),
|
||||
default='noop_event_streamer')]
|
||||
|
||||
oslo_messaging_opts = [
|
||||
cfg.StrOpt('topic'),
|
||||
cfg.StrOpt('event_stream_topic',
|
||||
default='neutron_lbaas_event',
|
||||
help=_('topic name for communicating events through a queue')),
|
||||
]
|
||||
|
||||
keystone_authtoken_v3_opts = [
|
||||
|
|
|
@ -33,6 +33,9 @@ HEALTH_MONITOR_HTTPS = 'HTTPS'
|
|||
SUPPORTED_HEALTH_MONITOR_TYPES = (HEALTH_MONITOR_HTTP, HEALTH_MONITOR_HTTPS,
|
||||
HEALTH_MONITOR_PING, HEALTH_MONITOR_TCP)
|
||||
|
||||
UPDATE_STATS = 'UPDATE_STATS'
|
||||
UPDATE_HEALTH = 'UPDATE_HEALTH'
|
||||
|
||||
PROTOCOL_TCP = 'TCP'
|
||||
PROTOCOL_HTTP = 'HTTP'
|
||||
PROTOCOL_HTTPS = 'HTTPS'
|
||||
|
@ -69,6 +72,7 @@ OFFLINE = 'OFFLINE'
|
|||
DEGRADED = 'DEGRADED'
|
||||
ERROR = 'ERROR'
|
||||
NO_MONITOR = 'NO_MONITOR'
|
||||
OPERATING_STATUS = 'operating_status'
|
||||
SUPPORTED_OPERATING_STATUSES = (ONLINE, OFFLINE, DEGRADED, ERROR, NO_MONITOR)
|
||||
|
||||
AMPHORA_VM = 'VM'
|
||||
|
|
|
@ -14,25 +14,31 @@
|
|||
|
||||
import datetime
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import six
|
||||
import sqlalchemy
|
||||
from stevedore import driver as stevedore_driver
|
||||
|
||||
from octavia.amphorae.drivers import driver_base as driver_base
|
||||
from octavia.common import constants
|
||||
from octavia.controller.healthmanager import update_serializer
|
||||
from octavia.controller.queue import event_queue
|
||||
from octavia.db import api as db_api
|
||||
from octavia.db import repositories as repo
|
||||
from octavia.i18n import _LE, _LW
|
||||
|
||||
import six
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UpdateHealthMixin(driver_base.HealthMixin):
|
||||
|
||||
class UpdateHealthDb(object):
|
||||
def __init__(self):
|
||||
super(UpdateHealthMixin, self).__init__()
|
||||
super(UpdateHealthDb, self).__init__()
|
||||
# first setup repo for amphora, listener,member(nodes),pool repo
|
||||
self.event_streamer = stevedore_driver.DriverManager(
|
||||
namespace='octavia.controller.queues',
|
||||
name=cfg.CONF.health_manager.event_streamer_driver,
|
||||
invoke_on_load=True).driver
|
||||
self.amphora_repo = repo.AmphoraRepository()
|
||||
self.amphora_health_repo = repo.AmphoraHealthRepository()
|
||||
self.listener_repo = repo.ListenerRepository()
|
||||
|
@ -40,6 +46,23 @@ class UpdateHealthMixin(driver_base.HealthMixin):
|
|||
self.member_repo = repo.MemberRepository()
|
||||
self.pool_repo = repo.PoolRepository()
|
||||
|
||||
def emit(self, info_type, info_id, info_obj):
|
||||
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
|
||||
self.event_streamer.emit(cnt)
|
||||
|
||||
def _update_status_and_emit_event(self, session, repo, entity_type,
|
||||
entity_id, new_op_status):
|
||||
entity = repo.get(session, id=entity_id)
|
||||
if entity.operating_status.lower() != new_op_status.lower():
|
||||
LOG.debug("%s %s status has changed from %s to "
|
||||
"%s. Updating db and sending event.",
|
||||
entity_type, entity_id, entity.operating_status,
|
||||
new_op_status)
|
||||
repo.update(session, entity_id, operating_status=new_op_status)
|
||||
self.emit(
|
||||
entity_type, entity_id,
|
||||
{constants.OPERATING_STATUS: new_op_status})
|
||||
|
||||
def update_health(self, health):
|
||||
"""This function is to update db info based on amphora status
|
||||
|
||||
|
@ -115,9 +138,10 @@ class UpdateHealthMixin(driver_base.HealthMixin):
|
|||
|
||||
try:
|
||||
if listener_status is not None:
|
||||
self.listener_repo.update(
|
||||
session, listener_id,
|
||||
operating_status=listener_status)
|
||||
self._update_status_and_emit_event(
|
||||
session, self.listener_repo, constants.LISTENER,
|
||||
listener_id, listener_status
|
||||
)
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error(_LE("Listener %s is not in DB"), listener_id)
|
||||
|
||||
|
@ -158,22 +182,78 @@ class UpdateHealthMixin(driver_base.HealthMixin):
|
|||
|
||||
try:
|
||||
if member_status is not None:
|
||||
self.member_repo.update(session, id=member_id,
|
||||
operating_status=(
|
||||
member_status))
|
||||
self._update_status_and_emit_event(
|
||||
session, self.member_repo, constants.MEMBER,
|
||||
member_id, member_status
|
||||
)
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error(_LE("Member %s is not able to update "
|
||||
"in DB"), member_id)
|
||||
|
||||
try:
|
||||
if pool_status is not None:
|
||||
self.pool_repo.update(session, pool_id,
|
||||
operating_status=pool_status)
|
||||
self._update_status_and_emit_event(
|
||||
session, self.pool_repo, constants.POOL,
|
||||
pool_id, pool_status
|
||||
)
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error(_LE("Pool %s is not in DB"), pool_id)
|
||||
|
||||
try:
|
||||
self.loadbalancer_repo.update(session, lb_id,
|
||||
operating_status=lb_status)
|
||||
self._update_status_and_emit_event(
|
||||
session, self.loadbalancer_repo,
|
||||
constants.LOADBALANCER, lb_id, lb_status
|
||||
)
|
||||
except sqlalchemy.orm.exc.NoResultFound:
|
||||
LOG.error(_LE("Load balancer %s is not in DB"), lb_id)
|
||||
|
||||
|
||||
class UpdateStatsDb(object):
|
||||
|
||||
def __init__(self):
|
||||
super(UpdateStatsDb, self).__init__()
|
||||
self.listener_stats_repo = repo.ListenerStatisticsRepository()
|
||||
self.event_streamer = event_queue.EventStreamerNeutron()
|
||||
|
||||
def emit(self, info_type, info_id, info_obj):
|
||||
cnt = update_serializer.InfoContainer(info_type, info_id, info_obj)
|
||||
self.event_streamer.emit(cnt)
|
||||
|
||||
def update_stats(self, health_message):
|
||||
"""This function is to update the db with listener stats
|
||||
|
||||
:param health_message: The health message containing the listener stats
|
||||
:type map: string
|
||||
:returns: null
|
||||
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {
|
||||
"listener-id-1": {"status": constants.OPEN,
|
||||
'stats': {'conns': 0,
|
||||
'totconns': 0,
|
||||
'rx': 0,
|
||||
'tx': 0},
|
||||
"pools": {
|
||||
"pool-id-1": {"status": constants.UP,
|
||||
"members": {"member-id-1": constants.ONLINE}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"""
|
||||
session = db_api.get_session()
|
||||
|
||||
listeners = health_message['listeners']
|
||||
for listener_id, listener in six.iteritems(listeners):
|
||||
|
||||
stats = listener.get('stats')
|
||||
stats = {'bytes_in': stats['rx'], 'bytes_out': stats['tx'],
|
||||
'active_connections': stats['conns'],
|
||||
'total_connections': stats['totconns']}
|
||||
LOG.debug("Updating listener stats in db and sending event.")
|
||||
LOG.debug("Listener %s stats: %s", listener_id, stats)
|
||||
self.listener_stats_repo.replace(session, listener_id, **stats)
|
||||
self.emit('listener_stats', listener_id, stats)
|
|
@ -0,0 +1,47 @@
|
|||
# Copyright 2014 Rackspace
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
|
||||
class InfoContainer(object):
|
||||
@staticmethod
|
||||
def from_dict(dict_obj):
|
||||
return InfoContainer(dict_obj['info_type'],
|
||||
dict_obj['info_id'],
|
||||
dict_obj['info_payload'])
|
||||
|
||||
def __init__(self, info_type, info_id, info_payload):
|
||||
self.info_type = copy.copy(info_type)
|
||||
self.info_id = copy.copy(info_id)
|
||||
self.info_payload = copy.deepcopy(info_payload)
|
||||
|
||||
def to_dict(self):
|
||||
return {'info_type': self.info_type,
|
||||
'info_id': self.info_id,
|
||||
'info_payload': self.info_payload}
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, InfoContainer):
|
||||
return False
|
||||
if self.info_type != other.info_type:
|
||||
return False
|
||||
if self.info_id != other.info_id:
|
||||
return False
|
||||
if self.info_payload != other.info_payload:
|
||||
return False
|
||||
return True
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self == other
|
|
@ -1,65 +0,0 @@
|
|||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from octavia.amphorae.drivers import driver_base as driver_base
|
||||
from octavia.db import api as db_api
|
||||
from octavia.db import repositories as repo
|
||||
|
||||
import six
|
||||
|
||||
|
||||
class UpdateStatsMixin(driver_base.StatsMixin):
|
||||
|
||||
def __init__(self):
|
||||
super(UpdateStatsMixin, self).__init__()
|
||||
self.listener_stats_repo = repo.ListenerStatisticsRepository()
|
||||
|
||||
def update_stats(self, health_message):
|
||||
"""This function is to update the db with listener stats
|
||||
|
||||
:param health_message: The health message containing the listener stats
|
||||
:type map: string
|
||||
:returns: null
|
||||
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {
|
||||
"listener-id-1": {"status": constants.OPEN,
|
||||
'stats': {'conns': 0,
|
||||
'totconns': 0,
|
||||
'rx': 0,
|
||||
'tx': 0},
|
||||
"pools": {
|
||||
"pool-id-1": {"status": constants.UP,
|
||||
"members": {"member-id-1": constants.ONLINE}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
"""
|
||||
session = db_api.get_session()
|
||||
|
||||
listeners = health_message['listeners']
|
||||
for listener_id, listener in six.iteritems(listeners):
|
||||
|
||||
stats = listener.get('stats')
|
||||
|
||||
self.listener_stats_repo.replace(session, listener_id,
|
||||
bytes_in=stats['rx'],
|
||||
bytes_out=stats['tx'],
|
||||
active_connections=stats['conns'],
|
||||
total_connections=(
|
||||
stats['totconns']))
|
|
@ -0,0 +1,71 @@
|
|||
# Copyright 2015 Rackspace
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging
|
||||
import six
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class EventStreamerBase(object):
|
||||
"""Base class for EventStreamer
|
||||
|
||||
A stand in abstract class that defines what methods are stevedore loaded
|
||||
implementations of event streamer is expected to provide.
|
||||
"""
|
||||
@abc.abstractmethod
|
||||
def emit(self, cnt):
|
||||
"""method to send a DB event to neutron-lbaas if it is needed.
|
||||
|
||||
:param cnt: an InfoContainer container object
|
||||
:return: None
|
||||
"""
|
||||
|
||||
|
||||
class EventStreamerNoop(EventStreamerBase):
|
||||
"""Nop class implementation of EventStreamer
|
||||
|
||||
Usefull if your running in standalone mode and don't need to send
|
||||
updates to Neutron Lbaas
|
||||
"""
|
||||
|
||||
def emit(self, cnt):
|
||||
pass
|
||||
|
||||
|
||||
class EventStreamerNeutron(EventStreamerBase):
|
||||
"""Neutron LBaaS
|
||||
|
||||
When you're using Octavia alongside neutron LBaaS this class provides
|
||||
a mechanism to send updates to neutron LBaaS database via
|
||||
oslo_messaging queues.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
topic = cfg.CONF.oslo_messaging.event_stream_topic
|
||||
self.transport = oslo_messaging.get_transport(cfg.CONF)
|
||||
self.target = oslo_messaging.Target(topic=topic, exchange="common",
|
||||
namespace='control', fanout=False,
|
||||
version='1.0')
|
||||
self.client = oslo_messaging.RPCClient(self.transport, self.target)
|
||||
|
||||
def emit(self, cnt):
|
||||
LOG.debug("Emitting data to event streamer %s", cnt.to_dict())
|
||||
self.client.cast({}, 'update_info', container=cnt.to_dict())
|
|
@ -14,19 +14,19 @@
|
|||
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from octavia.amphorae.drivers.noop_driver import driver as driver
|
||||
from octavia.amphorae.drivers.noop_driver import driver
|
||||
from octavia.common import data_models
|
||||
from octavia.network import data_models as network_models
|
||||
from octavia.tests.unit import base as base
|
||||
from octavia.tests.unit import base
|
||||
|
||||
|
||||
FAKE_UUID_1 = uuidutils.generate_uuid()
|
||||
|
||||
|
||||
class LoggingMixIn(base.TestCase):
|
||||
class TestLoggingUpdate(base.TestCase):
|
||||
def setUp(self):
|
||||
super(LoggingMixIn, self).setUp()
|
||||
self.mixin = driver.LoggingMixIn()
|
||||
super(TestLoggingUpdate, self).setUp()
|
||||
self.mixin = driver.LoggingUpdate()
|
||||
|
||||
def test_update_stats(self):
|
||||
self.mixin.update_stats('test update stats')
|
||||
|
|
|
@ -24,9 +24,9 @@ class TestHealthManagerCMD(base.TestCase):
|
|||
super(TestHealthManagerCMD, self).setUp()
|
||||
|
||||
@mock.patch('octavia.controller.healthmanager.'
|
||||
'update_stats_mixin.UpdateStatsMixin')
|
||||
'update_db.UpdateStatsDb')
|
||||
@mock.patch('octavia.controller.healthmanager.'
|
||||
'update_health_mixin.UpdateHealthMixin')
|
||||
'update_db.UpdateHealthDb')
|
||||
@mock.patch('octavia.amphorae.drivers.health.'
|
||||
'heartbeat_udp.UDPStatusGetter')
|
||||
def test_hm_listener(self, mock_getter, mock_health, mock_stats):
|
||||
|
|
|
@ -12,23 +12,31 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import random
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import uuidutils
|
||||
import six
|
||||
import sqlalchemy
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.healthmanager import update_health_mixin as healthmixin
|
||||
import octavia.tests.unit.base as base
|
||||
from octavia.common import data_models
|
||||
from octavia.controller.healthmanager import update_db
|
||||
from octavia.tests.unit import base
|
||||
|
||||
|
||||
class TestUpdateHealthMixin(base.TestCase):
|
||||
class TestUpdateHealthDb(base.TestCase):
|
||||
FAKE_UUID_1 = uuidutils.generate_uuid()
|
||||
|
||||
def setUp(self):
|
||||
super(TestUpdateHealthMixin, self).setUp()
|
||||
self.hm = healthmixin.UpdateHealthMixin()
|
||||
|
||||
super(TestUpdateHealthDb, self).setUp()
|
||||
cfg.CONF.set_override(group='health_manager',
|
||||
name='event_streamer_driver',
|
||||
override='queue_event_streamer')
|
||||
self.hm = update_db.UpdateHealthDb()
|
||||
self.event_client = mock.MagicMock()
|
||||
self.hm.event_streamer.client = self.event_client
|
||||
self.amphora_repo = mock.MagicMock()
|
||||
self.amphora_health_repo = mock.MagicMock()
|
||||
self.listener_repo = mock.MagicMock()
|
||||
|
@ -46,6 +54,33 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
self.hm.member_repo = self.member_repo
|
||||
self.hm.pool_repo = self.pool_repo
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
def test_update_health_event_stream(self, session):
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {
|
||||
"listener-id-1": {"status": constants.OPEN, "pools": {
|
||||
"pool-id-1": {"status": constants.UP,
|
||||
"members": {"member-id-1": constants.UP}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
self.hm.update_health(health)
|
||||
self.event_client.cast.assert_any_call(
|
||||
{}, 'update_info', container={
|
||||
'info_type': 'listener', 'info_id': 'listener-id-1',
|
||||
'info_payload': {'operating_status': 'ONLINE'}})
|
||||
self.event_client.cast.assert_any_call(
|
||||
{}, 'update_info', container={
|
||||
'info_type': 'member', 'info_id': 'member-id-1',
|
||||
'info_payload': {'operating_status': 'ONLINE'}})
|
||||
self.event_client.cast.assert_any_call(
|
||||
{}, 'update_info', container={
|
||||
'info_type': 'pool', 'info_id': 'pool-id-1',
|
||||
'info_payload': {'operating_status': 'ONLINE'}})
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
def test_update_health_Online(self, session):
|
||||
|
||||
|
@ -62,7 +97,6 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
}
|
||||
|
||||
session.return_value = 'blah'
|
||||
|
||||
self.hm.update_health(health)
|
||||
self.assertTrue(self.amphora_health_repo.replace.called)
|
||||
|
||||
|
@ -81,7 +115,7 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
for member_id, member in six.iteritems(
|
||||
pool.get('members', {})):
|
||||
self.member_repo.update.assert_any_call(
|
||||
'blah', id=member_id,
|
||||
'blah', member_id,
|
||||
operating_status=constants.ONLINE)
|
||||
|
||||
self.hm.listener_repo.count.return_value = 2
|
||||
|
@ -124,7 +158,7 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
pool.get('members', {})):
|
||||
|
||||
self.member_repo.update.assert_any_call(
|
||||
'blah', id=member_id,
|
||||
'blah', member_id,
|
||||
operating_status=constants.ERROR)
|
||||
|
||||
self.hm.listener_repo.count.return_value = 2
|
||||
|
@ -168,7 +202,7 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
pool.get('members', {})):
|
||||
|
||||
self.member_repo.update.assert_any_call(
|
||||
'blah', id=member_id,
|
||||
'blah', member_id,
|
||||
operating_status=constants.NO_MONITOR)
|
||||
|
||||
self.hm.listener_repo.count.return_value = 2
|
||||
|
@ -211,7 +245,7 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
pool.get('members', {})):
|
||||
|
||||
self.member_repo.update.assert_any_call(
|
||||
'blah', id=member_id,
|
||||
'blah', member_id,
|
||||
operating_status=constants.ERROR)
|
||||
|
||||
self.hm.listener_repo.count.return_value = 2
|
||||
|
@ -254,7 +288,7 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
pool.get('members', {})):
|
||||
|
||||
self.member_repo.update.assert_any_call(
|
||||
'blah', id=member_id, operating_status=constants.ERROR)
|
||||
'blah', member_id, operating_status=constants.ERROR)
|
||||
|
||||
# Test the logic code paths
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
|
@ -353,5 +387,100 @@ class TestUpdateHealthMixin(base.TestCase):
|
|||
pool.get('members', {})):
|
||||
|
||||
self.member_repo.update.assert_any_call(
|
||||
'blah', id=member_id,
|
||||
'blah', member_id,
|
||||
operating_status=constants.ONLINE)
|
||||
|
||||
def test_update_health_no_status_change(self):
|
||||
health = {
|
||||
"id": self.FAKE_UUID_1,
|
||||
"listeners": {
|
||||
"listener-id-1": {
|
||||
"status": constants.OPEN, "pools": {
|
||||
"pool-id-1": {
|
||||
"status": constants.UP, "members": {
|
||||
"member-id-1": constants.UP
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
db_lb = data_models.LoadBalancer(
|
||||
id=self.FAKE_UUID_1, operating_status=constants.ONLINE
|
||||
)
|
||||
db_listener = data_models.Listener(
|
||||
id='listener-id-', operating_status=constants.ONLINE,
|
||||
load_balancer_id=self.FAKE_UUID_1
|
||||
)
|
||||
db_pool = data_models.Pool(
|
||||
id='pool-id-1', operating_status=constants.ONLINE
|
||||
)
|
||||
db_member = data_models.Member(
|
||||
id='member-id-1', operating_status=constants.ONLINE
|
||||
)
|
||||
self.listener_repo.get.return_value = db_listener
|
||||
self.pool_repo.get.return_value = db_pool
|
||||
self.member_repo.get.return_value = db_member
|
||||
self.loadbalancer_repo.get.return_value = db_lb
|
||||
self.hm.update_health(health)
|
||||
self.event_client.cast.assert_not_called()
|
||||
self.loadbalancer_repo.update.assert_not_called()
|
||||
self.listener_repo.update.assert_not_called()
|
||||
self.pool_repo.update.assert_not_called()
|
||||
self.member_repo.update.assert_not_called()
|
||||
|
||||
|
||||
class TestUpdateStatsDb(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestUpdateStatsDb, self).setUp()
|
||||
self.sm = update_db.UpdateStatsDb()
|
||||
self.event_client = mock.MagicMock()
|
||||
self.sm.event_streamer.client = self.event_client
|
||||
|
||||
self.listener_stats_repo = mock.MagicMock()
|
||||
self.sm.listener_stats_repo = self.listener_stats_repo
|
||||
|
||||
self.bytes_in = random.randrange(1000000000)
|
||||
self.bytes_out = random.randrange(1000000000)
|
||||
self.active_conns = random.randrange(1000000000)
|
||||
self.total_conns = random.randrange(1000000000)
|
||||
self.loadbalancer_id = uuidutils.generate_uuid()
|
||||
self.listener_id = uuidutils.generate_uuid()
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
def test_update_stats(self, session):
|
||||
|
||||
health = {
|
||||
"id": self.loadbalancer_id,
|
||||
"listeners": {
|
||||
self.listener_id: {"status": constants.OPEN,
|
||||
"stats": {"conns": self.active_conns,
|
||||
"totconns": self.total_conns,
|
||||
"rx": self.bytes_in,
|
||||
"tx": self.bytes_out},
|
||||
"pools": {"pool-id-1":
|
||||
{"status": constants.UP,
|
||||
"members":
|
||||
{"member-id-1": constants.ONLINE}
|
||||
}
|
||||
}
|
||||
}}}
|
||||
|
||||
session.return_value = 'blah'
|
||||
|
||||
self.sm.update_stats(health)
|
||||
|
||||
self.listener_stats_repo.replace.assert_called_once_with(
|
||||
'blah', self.listener_id, bytes_in=self.bytes_in,
|
||||
bytes_out=self.bytes_out, active_connections=self.active_conns,
|
||||
total_connections=self.total_conns)
|
||||
self.event_client.cast.assert_called_once_with(
|
||||
{}, 'update_info', container={
|
||||
'info_type': 'listener_stats',
|
||||
'info_id': self.listener_id,
|
||||
'info_payload': {
|
||||
'bytes_in': self.bytes_in,
|
||||
'total_connections': self.total_conns,
|
||||
'active_connections': self.active_conns,
|
||||
'bytes_out': self.bytes_out}})
|
|
@ -0,0 +1,39 @@
|
|||
# Copyright 2014 Rackspace
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.healthmanager import update_serializer
|
||||
import octavia.tests.unit.base as base
|
||||
|
||||
|
||||
class TestUpdateSerializer(base.TestCase):
|
||||
def setUp(self):
|
||||
super(TestUpdateSerializer, self).setUp()
|
||||
|
||||
def test_serializer_from_dict_to_dict(self):
|
||||
obj_id = str(uuid.uuid4())
|
||||
obj_type = constants.UPDATE_HEALTH
|
||||
obj_payload = {'test': [1, 2, 3, 4], id: obj_id}
|
||||
obj = update_serializer.InfoContainer(obj_id, obj_type, obj_payload)
|
||||
cloned_obj = update_serializer.InfoContainer.from_dict(obj.to_dict())
|
||||
self.assertEqual(obj, cloned_obj)
|
||||
|
||||
obj_id = str(uuid.uuid4())
|
||||
obj_type = constants.UPDATE_HEALTH
|
||||
obj_payload = {'test': [3, 2, 1, 6], id: obj_id, 'x': {'y': 1}}
|
||||
obj = update_serializer.InfoContainer(obj_id, obj_type, obj_payload)
|
||||
cloned_obj = update_serializer.InfoContainer.from_dict(obj.to_dict())
|
||||
self.assertEqual(obj, cloned_obj)
|
|
@ -1,67 +0,0 @@
|
|||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import random
|
||||
|
||||
import mock
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from octavia.common import constants
|
||||
from octavia.controller.healthmanager import update_stats_mixin as statsmixin
|
||||
import octavia.tests.unit.base as base
|
||||
|
||||
|
||||
class TestUpdateStatsMixin(base.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestUpdateStatsMixin, self).setUp()
|
||||
|
||||
self.sm = statsmixin.UpdateStatsMixin()
|
||||
self.listener_stats_repo = mock.MagicMock()
|
||||
self.sm.listener_stats_repo = self.listener_stats_repo
|
||||
|
||||
self.bytes_in = random.randrange(1000000000)
|
||||
self.bytes_out = random.randrange(1000000000)
|
||||
self.active_conns = random.randrange(1000000000)
|
||||
self.total_conns = random.randrange(1000000000)
|
||||
self.loadbalancer_id = uuidutils.generate_uuid()
|
||||
self.listener_id = uuidutils.generate_uuid()
|
||||
|
||||
@mock.patch('octavia.db.api.get_session')
|
||||
def test_update_stats(self, session):
|
||||
|
||||
health = {
|
||||
"id": self.loadbalancer_id,
|
||||
"listeners": {
|
||||
self.listener_id: {"status": constants.OPEN,
|
||||
"stats": {"conns": self.active_conns,
|
||||
"totconns": self.total_conns,
|
||||
"rx": self.bytes_in,
|
||||
"tx": self.bytes_out},
|
||||
"pools": {"pool-id-1":
|
||||
{"status": constants.UP,
|
||||
"members":
|
||||
{"member-id-1": constants.ONLINE}
|
||||
}
|
||||
}
|
||||
}}}
|
||||
|
||||
session.return_value = 'blah'
|
||||
|
||||
self.sm.update_stats(health)
|
||||
|
||||
self.listener_stats_repo.replace.assert_called_once_with(
|
||||
'blah', self.listener_id, bytes_in=self.bytes_in,
|
||||
bytes_out=self.bytes_out, active_connections=self.active_conns,
|
||||
total_connections=self.total_conns)
|
|
@ -50,6 +50,9 @@ octavia.amphora.drivers =
|
|||
amphora_noop_driver = octavia.amphorae.drivers.noop_driver.driver:NoopAmphoraLoadBalancerDriver
|
||||
amphora_haproxy_rest_driver = octavia.amphorae.drivers.haproxy.rest_api_driver:HaproxyAmphoraLoadBalancerDriver
|
||||
amphora_haproxy_ssh_driver = octavia.amphorae.drivers.haproxy.ssh_driver:HaproxyManager
|
||||
octavia.controller.queues =
|
||||
noop_event_streamer = octavia.controller.queue.neutron_queue:EventStreamerNoop
|
||||
queue_event_streamer = octavia.controller.queue.event_queue:EventStreamerNeutron
|
||||
octavia.compute.drivers =
|
||||
compute_noop_driver = octavia.compute.drivers.noop_driver.driver:NoopComputeDriver
|
||||
compute_nova_driver = octavia.compute.drivers.nova_driver:VirtualMachineManager
|
||||
|
|
Loading…
Reference in New Issue