Added amphora_id to listener_statistics table to stop data loss

This is a fix for the bug listed below. Adding the amphora id
to the table allows multiple amphora to send statistics via
the heartbeat without each heartbeat overwriting other
heartbeats for the same listener / different amphora.

Change-Id: I9f50a5de2c1b0665e62d45fcc5815f2b4093b2df
Closes-Bug: 1573607
This commit is contained in:
Matt Alline 2016-05-02 10:33:11 -05:00 committed by Brandon Logan
parent 93983ad04c
commit 2f33429db9
10 changed files with 171 additions and 30 deletions

View File

@ -132,10 +132,11 @@ class SessionPersistence(BaseDataModel):
class ListenerStatistics(BaseDataModel):
def __init__(self, listener_id=None, bytes_in=None, bytes_out=None,
active_connections=None, total_connections=None,
listener=None):
def __init__(self, listener_id=None, amphora_id=None, bytes_in=None,
bytes_out=None, active_connections=None,
total_connections=None, listener=None):
self.listener_id = listener_id
self.amphora_id = amphora_id
self.bytes_in = bytes_in
self.bytes_out = bytes_out
self.active_connections = active_connections

View File

@ -249,6 +249,7 @@ class UpdateStatsDb(object):
"""
session = db_api.get_session()
amphora_id = health_message['id']
listeners = health_message['listeners']
for listener_id, listener in six.iteritems(listeners):
@ -257,6 +258,8 @@ class UpdateStatsDb(object):
'active_connections': stats['conns'],
'total_connections': stats['totconns']}
LOG.debug("Updating listener stats in db and sending event.")
LOG.debug("Listener %s stats: %s", listener_id, stats)
self.listener_stats_repo.replace(session, listener_id, **stats)
LOG.debug("Listener %s / Amphora %s stats: %s",
listener_id, amphora_id, stats)
self.listener_stats_repo.replace(
session, listener_id, amphora_id, **stats)
self.emit('listener_stats', listener_id, stats)

View File

@ -0,0 +1,57 @@
# Copyright 2016 Rackspace
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""adding Amphora ID to listener_statistics table
Revision ID: 9bf4d21caaea
Revises: 8c0851bdf6c3
Create Date: 2016-05-02 07:50:12.888263
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9bf4d21caaea'
down_revision = '8c0851bdf6c3'
def upgrade():
# OpenStack has decided that "down" migrations are not supported.
# The downgrade() method has been omitted for this reason.
op.add_column('listener_statistics',
sa.Column('amphora_id',
sa.String(36),
nullable=False)
)
op.drop_constraint('fk_listener_statistics_listener_id',
'listener_statistics',
type_='foreignkey')
op.drop_constraint('PRIMARY',
'listener_statistics',
type_='primary')
op.create_primary_key('pk_listener_statistics', 'listener_statistics',
['listener_id', 'amphora_id'])
op.create_foreign_key('fk_listener_statistics_listener_id',
'listener_statistics',
'listener',
['listener_id'],
['id'])
op.create_foreign_key('fk_listener_statistic_amphora_id',
'listener_statistics',
'amphora',
['amphora_id'],
['id'])

View File

@ -120,6 +120,12 @@ class ListenerStatistics(base_models.BASE):
name="fk_listener_statistics_listener_id"),
primary_key=True,
nullable=False)
amphora_id = sa.Column(
sa.String(36),
sa.ForeignKey("amphora.id",
name="fk_listener_statistics_amphora_id"),
primary_key=True,
nullable=False)
bytes_in = sa.Column(sa.BigInteger, nullable=False)
bytes_out = sa.Column(sa.BigInteger, nullable=False)
active_connections = sa.Column(sa.Integer, nullable=False)
@ -127,6 +133,9 @@ class ListenerStatistics(base_models.BASE):
listener = orm.relationship("Listener", uselist=False,
backref=orm.backref("stats", uselist=False,
cascade="delete"))
amphora = orm.relationship("Amphora", uselist=False,
backref=orm.backref("stats", uselist=False,
cascade="delete"))
@staticmethod
@validates('bytes_in', 'bytes_out',

View File

@ -451,17 +451,20 @@ class ListenerRepository(BaseRepository):
class ListenerStatisticsRepository(BaseRepository):
model_class = models.ListenerStatistics
def replace(self, session, listener_id, **model_kwargs):
def replace(self, session, listener_id, amphora_id, **model_kwargs):
"""replace or insert listener into database."""
with session.begin(subtransactions=True):
count = session.query(self.model_class).filter_by(
listener_id=listener_id).count()
listener_id=listener_id, amphora_id=amphora_id).count()
if count:
session.query(self.model_class).filter_by(
listener_id=listener_id).update(model_kwargs,
synchronize_session=False)
listener_id=listener_id,
amphora_id=amphora_id).update(
model_kwargs,
synchronize_session=False)
else:
model_kwargs['listener_id'] = listener_id
model_kwargs['amphora_id'] = amphora_id
self.create(session, **model_kwargs)
def update(self, session, listener_id, **model_kwargs):

View File

@ -14,6 +14,7 @@
import mock
from oslo_config import cfg
from oslo_utils import uuidutils
import pecan
import pecan.testing
@ -55,6 +56,7 @@ class BaseAPITest(base_db_test.OctaviaDBTestBase):
self.listener_stats_repo = repositories.ListenerStatisticsRepository()
self.pool_repo = repositories.PoolRepository()
self.member_repo = repositories.MemberRepository()
self.amphora_repo = repositories.AmphoraRepository()
patcher = mock.patch('octavia.api.v1.handlers.controller_simulator.'
'handler.SimulatedControllerHandler')
self.handler_mock = patcher.start()
@ -125,12 +127,24 @@ class BaseAPITest(base_db_test.OctaviaDBTestBase):
response = self.post(path, req_dict)
return response.json
def create_listener_stats(self, listener_id):
def create_listener_stats(self, listener_id, amphora_id):
db_ls = self.listener_stats_repo.create(
db_api.get_session(), listener_id=listener_id, bytes_in=0,
db_api.get_session(), listener_id=listener_id,
amphora_id=amphora_id, bytes_in=0,
bytes_out=0, active_connections=0, total_connections=0)
return db_ls.to_dict()
def create_amphora(self, amphora_id, loadbalancer_id, **optionals):
# We need to default these values in the request.
opts = {'compute_id': uuidutils.generate_uuid(),
'status': constants.ACTIVE}
opts.update(optionals)
amphora = self.amphora_repo.create(
self.session, id=amphora_id,
load_balancer_id=loadbalancer_id,
**opts)
return amphora
def get_listener(self, lb_id, listener_id):
path = self.LISTENER_PATH.format(lb_id=lb_id, listener_id=listener_id)
response = self.get(path)

View File

@ -15,8 +15,11 @@
from octavia.common import constants
from octavia.tests.functional.api.v1 import base
from oslo_utils import uuidutils
class TestListenerStatistics(base.BaseAPITest):
FAKE_UUID_1 = uuidutils.generate_uuid()
def setUp(self):
super(TestListenerStatistics, self).setUp()
@ -27,11 +30,16 @@ class TestListenerStatistics(base.BaseAPITest):
self.set_lb_status(self.lb.get('id'))
self.ls_path = self.LISTENER_STATS_PATH.format(
lb_id=self.lb.get('id'), listener_id=self.listener.get('id'))
self.amphora = self.create_amphora(uuidutils.generate_uuid(),
self.lb.get('id'))
def test_get(self):
ls = self.create_listener_stats(listener_id=self.listener.get('id'))
ls = self.create_listener_stats(listener_id=self.listener.get('id'),
amphora_id=self.amphora.id)
ls.pop('listener')
ls.pop('listener_id')
ls.pop('amphora')
ls.pop('amphora_id')
response = self.get(self.ls_path)
response_body = response.json
self.assertEqual(ls, response_body)

View File

@ -50,8 +50,10 @@ class ModelTestMixin(object):
kwargs.update(overrides)
return self._insert(session, models.Listener, kwargs)
def create_listener_statistics(self, session, listener_id, **overrides):
def create_listener_statistics(self, session, listener_id, amphora_id,
**overrides):
kwargs = {'listener_id': listener_id,
'amphora_id': amphora_id,
'bytes_in': 0,
'bytes_out': 0,
'active_connections': 0,
@ -331,7 +333,9 @@ class ListenerModelTest(base.OctaviaDBTestBase, ModelTestMixin):
def test_listener_statistics_relationship(self):
listener = self.create_listener(self.session)
self.create_listener_statistics(self.session, listener_id=listener.id)
amphora = self.create_amphora(self.session)
self.create_listener_statistics(self.session, listener_id=listener.id,
amphora_id=amphora.id)
new_listener = self.session.query(models.Listener).filter_by(
id=listener.id).first()
self.assertIsNotNone(new_listener.stats)
@ -373,19 +377,23 @@ class ListenerStatisticsModelTest(base.OctaviaDBTestBase, ModelTestMixin):
def setUp(self):
super(ListenerStatisticsModelTest, self).setUp()
self.listener = self.create_listener(self.session)
self.amphora = self.create_amphora(self.session)
def test_create(self):
self.create_listener_statistics(self.session, self.listener.id)
self.create_listener_statistics(self.session, self.listener.id,
self.amphora.id)
def test_update(self):
stats = self.create_listener_statistics(self.session, self.listener.id)
stats = self.create_listener_statistics(self.session, self.listener.id,
self.amphora.id)
stats.name = 'test1'
new_stats = self.session.query(models.ListenerStatistics).filter_by(
listener_id=self.listener.id).first()
self.assertEqual('test1', new_stats.name)
def test_delete(self):
stats = self.create_listener_statistics(self.session, self.listener.id)
stats = self.create_listener_statistics(self.session, self.listener.id,
self.amphora.id)
with self.session.begin():
self.session.delete(stats)
self.session.flush()
@ -394,7 +402,8 @@ class ListenerStatisticsModelTest(base.OctaviaDBTestBase, ModelTestMixin):
self.assertIsNone(new_stats)
def test_listener_relationship(self):
self.create_listener_statistics(self.session, self.listener.id)
self.create_listener_statistics(self.session, self.listener.id,
self.amphora.id)
new_stats = self.session.query(models.ListenerStatistics).filter_by(
listener_id=self.listener.id).first()
self.assertIsNotNone(new_stats.listener)
@ -800,7 +809,8 @@ class TestDataModelConversionTest(base.OctaviaDBTestBase, ModelTestMixin):
default_pool_id=self.pool.id,
load_balancer_id=self.lb.id)
self.stats = self.create_listener_statistics(self.session,
self.listener.id)
self.listener.id,
self.amphora.id)
self.sni = self.create_sni(self.session, listener_id=self.listener.id)
self.l7policy = self.create_l7policy(
self.session, listener_id=self.listener.id,
@ -1257,7 +1267,8 @@ class TestDataModelManipulations(base.OctaviaDBTestBase, ModelTestMixin):
default_pool_id=self.pool.id,
load_balancer_id=self.lb.id)
self.stats = self.create_listener_statistics(self.session,
self.listener.id)
self.listener.id,
self.amphora.id)
self.sni = self.create_sni(self.session, listener_id=self.listener.id)
self.l7policy = self.create_l7policy(
self.session, listener_id=self.listener.id,

View File

@ -644,6 +644,15 @@ class TestListenerRepositoryTest(BaseRepositoryTest):
provisioning_status=constants.ACTIVE, enabled=True, peer_port=1025)
return listener
def create_amphora(self, amphora_id, loadbalancer_id):
amphora = self.amphora_repo.create(self.session, id=amphora_id,
load_balancer_id=loadbalancer_id,
compute_id=self.FAKE_UUID_3,
status=constants.ACTIVE,
vrrp_ip=self.FAKE_IP,
lb_network_ip=self.FAKE_IP)
return amphora
def create_loadbalancer(self, lb_id):
lb = self.lb_repo.create(self.session, id=lb_id,
project_id=self.FAKE_UUID_2, name="lb_name",
@ -771,8 +780,11 @@ class TestListenerRepositoryTest(BaseRepositoryTest):
def test_delete_with_stats(self):
listener = self.create_listener(self.FAKE_UUID_1, 80)
lb = self.create_loadbalancer(uuidutils.generate_uuid())
amphora = self.create_amphora(uuidutils.generate_uuid(), lb.id)
stats = self.listener_stats_repo.create(
self.session, listener_id=listener.id, bytes_in=1, bytes_out=1,
self.session, listener_id=listener.id, amphora_id=amphora.id,
bytes_in=1, bytes_out=1,
active_connections=1, total_connections=1)
new_listener = self.listener_repo.get(self.session, id=listener.id)
self.assertIsNotNone(new_listener)
@ -812,8 +824,12 @@ class TestListenerRepositoryTest(BaseRepositoryTest):
default_pool_id=pool.id)
sni = self.sni_repo.create(self.session, listener_id=listener.id,
tls_container_id=self.FAKE_UUID_3)
lb = self.create_loadbalancer(uuidutils.generate_uuid())
amphora = self.create_amphora(uuidutils.generate_uuid(), lb.id)
stats = self.listener_stats_repo.create(
self.session, listener_id=listener.id, bytes_in=1, bytes_out=1,
self.session, listener_id=listener.id,
amphora_id=amphora.id,
bytes_in=1, bytes_out=1,
active_connections=1, total_connections=1)
new_listener = self.listener_repo.get(self.session, id=listener.id)
self.assertIsNotNone(new_listener)
@ -857,22 +873,38 @@ class ListenerStatisticsRepositoryTest(BaseRepositoryTest):
protocol=constants.PROTOCOL_HTTP, protocol_port=80,
connection_limit=1, provisioning_status=constants.ACTIVE,
operating_status=constants.ONLINE, enabled=True, peer_port=1025)
self.lb = self.lb_repo.create(self.session,
id=uuidutils.generate_uuid(),
project_id=self.FAKE_UUID_2,
name="lb_name",
description="lb_description",
provisioning_status=constants.ACTIVE,
operating_status=constants.ONLINE,
enabled=True)
self.amphora = self.amphora_repo.create(self.session,
id=uuidutils.generate_uuid(),
load_balancer_id=self.lb.id,
compute_id=self.FAKE_UUID_3,
status=constants.ACTIVE,
vrrp_ip=self.FAKE_IP,
lb_network_ip=self.FAKE_IP)
def create_listener_stats(self, listener_id):
def create_listener_stats(self, listener_id, amphora_id):
stats = self.listener_stats_repo.create(
self.session, listener_id=listener_id, bytes_in=1, bytes_out=1,
self.session, listener_id=listener_id, amphora_id=amphora_id,
bytes_in=1, bytes_out=1,
active_connections=1, total_connections=1)
return stats
def test_get(self):
stats = self.create_listener_stats(self.listener.id)
stats = self.create_listener_stats(self.listener.id, self.amphora.id)
new_stats = self.listener_stats_repo.get(self.session,
listener_id=stats.listener_id)
self.assertIsInstance(new_stats, models.ListenerStatistics)
self.assertEqual(stats.listener_id, new_stats.listener_id)
def test_create(self):
stats = self.create_listener_stats(self.listener.id)
stats = self.create_listener_stats(self.listener.id, self.amphora.id)
new_stats = self.listener_stats_repo.get(self.session,
listener_id=stats.listener_id)
self.assertEqual(self.listener.id, new_stats.listener_id)
@ -883,7 +915,7 @@ class ListenerStatisticsRepositoryTest(BaseRepositoryTest):
def test_update(self):
bytes_in_change = 2
stats = self.create_listener_stats(self.listener.id)
stats = self.create_listener_stats(self.listener.id, self.amphora.id)
self.listener_stats_repo.update(self.session, stats.listener_id,
bytes_in=bytes_in_change)
new_stats = self.listener_stats_repo.get(self.session,
@ -892,7 +924,7 @@ class ListenerStatisticsRepositoryTest(BaseRepositoryTest):
self.assertEqual(stats.listener_id, new_stats.listener_id)
def test_delete(self):
stats = self.create_listener_stats(self.listener.id)
stats = self.create_listener_stats(self.listener.id, self.amphora.id)
self.listener_stats_repo.delete(self.session,
listener_id=stats.listener_id)
self.assertIsNone(self.listener_stats_repo.get(
@ -911,6 +943,7 @@ class ListenerStatisticsRepositoryTest(BaseRepositoryTest):
self.assertIsNone(self.listener_stats_repo.get(
self.session, listener_id=self.listener.id))
self.listener_stats_repo.replace(self.session, self.listener.id,
self.amphora.id,
bytes_in=bytes_in,
bytes_out=bytes_out,
active_connections=active_conns,
@ -930,6 +963,7 @@ class ListenerStatisticsRepositoryTest(BaseRepositoryTest):
active_conns_2 = random.randrange(1000000000)
total_conns_2 = random.randrange(1000000000)
self.listener_stats_repo.replace(self.session, self.listener.id,
self.amphora.id,
bytes_in=bytes_in_2,
bytes_out=bytes_out_2,
active_connections=active_conns_2,

View File

@ -490,8 +490,9 @@ class TestUpdateStatsDb(base.TestCase):
self.sm.update_stats(health)
self.listener_stats_repo.replace.assert_called_once_with(
'blah', self.listener_id, bytes_in=self.bytes_in,
bytes_out=self.bytes_out, active_connections=self.active_conns,
'blah', self.listener_id, self.loadbalancer_id,
bytes_in=self.bytes_in, bytes_out=self.bytes_out,
active_connections=self.active_conns,
total_connections=self.total_conns)
self.event_client.cast.assert_called_once_with(
{}, 'update_info', container={