Merge "Add statistics to V2 API"

This commit is contained in:
Jenkins 2017-07-06 19:14:41 +00:00 committed by Gerrit Code Review
commit 4652085d9a
10 changed files with 369 additions and 36 deletions

View File

@ -28,6 +28,7 @@ from octavia.api.v2.types import listener as listener_types
from octavia.common import constants from octavia.common import constants
from octavia.common import data_models from octavia.common import data_models
from octavia.common import exceptions from octavia.common import exceptions
from octavia.common import stats
from octavia.db import api as db_api from octavia.db import api as db_api
from octavia.db import prepare as db_prepare from octavia.db import prepare as db_prepare
@ -328,3 +329,41 @@ class ListenersController(base.BaseController):
self.repositories.listener.update( self.repositories.listener.update(
lock_session, db_listener.id, lock_session, db_listener.id,
provisioning_status=constants.ERROR) provisioning_status=constants.ERROR)
@pecan.expose()
def _lookup(self, id, *remainder):
"""Overridden pecan _lookup method for custom routing.
Currently it checks if this was a stats request and routes
the request to the StatsController.
"""
if id and len(remainder) and remainder[0] == 'stats':
return StatisticsController(listener_id=id), remainder[1:]
class StatisticsController(base.BaseController, stats.StatsMixin):
RBAC_TYPE = constants.RBAC_LISTENER
def __init__(self, listener_id):
super(StatisticsController, self).__init__()
self.id = listener_id
@wsme_pecan.wsexpose(listener_types.StatisticsRootResponse, wtypes.text,
status_code=200)
def get(self):
context = pecan.request.context.get('octavia_context')
db_listener = self._get_db_listener(context.session, self.id)
if not db_listener:
LOG.info("Listener %s not found.", id)
raise exceptions.NotFound(
resource=data_models.Listener._name(),
id=id)
self._auth_validate_action(context, db_listener.project_id,
constants.RBAC_GET_STATS)
listener_stats = self.get_listener_stats(context.session, self.id)
result = self._convert_db_to_type(
listener_stats, listener_types.ListenerStatisticsResponse)
return listener_types.StatisticsRootResponse(stats=result)

View File

@ -29,6 +29,7 @@ from octavia.api.v2.types import load_balancer as lb_types
from octavia.common import constants from octavia.common import constants
from octavia.common import data_models from octavia.common import data_models
from octavia.common import exceptions from octavia.common import exceptions
from octavia.common import stats
from octavia.common import utils from octavia.common import utils
import octavia.common.validate as validate import octavia.common.validate as validate
from octavia.db import api as db_api from octavia.db import api as db_api
@ -401,18 +402,24 @@ class LoadBalancersController(base.BaseController):
Currently it checks if this was a statuses request and routes Currently it checks if this was a statuses request and routes
the request to the StatusesController. the request to the StatusesController.
""" """
if id and len(remainder) and remainder[0] == 'statuses': if id and len(remainder) and (remainder[0] == 'status' or
return StatusesController(lb_id=id), remainder[1:] remainder[0] == 'stats'):
controller = remainder[0]
remainder = remainder[1:]
if controller == 'status':
return StatusController(lb_id=id), remainder
elif controller == 'stats':
return StatisticsController(lb_id=id), remainder
class StatusesController(base.BaseController): class StatusController(base.BaseController):
RBAC_TYPE = constants.RBAC_LOADBALANCER RBAC_TYPE = constants.RBAC_LOADBALANCER
def __init__(self, lb_id): def __init__(self, lb_id):
super(StatusesController, self).__init__() super(StatusController, self).__init__()
self.id = lb_id self.id = lb_id
@wsme_pecan.wsexpose(lb_types.StatusesRootResponse, wtypes.text, @wsme_pecan.wsexpose(lb_types.StatusRootResponse, wtypes.text,
status_code=200) status_code=200)
def get(self): def get(self):
context = pecan.request.context.get('octavia_context') context = pecan.request.context.get('octavia_context')
@ -427,6 +434,34 @@ class StatusesController(base.BaseController):
constants.RBAC_GET_STATUS) constants.RBAC_GET_STATUS)
result = self._convert_db_to_type( result = self._convert_db_to_type(
load_balancer, lb_types.LoadBalancerStatusesResponse) load_balancer, lb_types.LoadBalancerStatusResponse)
result = lb_types.StatusesResponse(loadbalancer=result) result = lb_types.StatusResponse(loadbalancer=result)
return lb_types.StatusesRootResponse(statuses=result) return lb_types.StatusRootResponse(statuses=result)
class StatisticsController(base.BaseController, stats.StatsMixin):
RBAC_TYPE = constants.RBAC_LOADBALANCER
def __init__(self, lb_id):
super(StatisticsController, self).__init__()
self.id = lb_id
@wsme_pecan.wsexpose(lb_types.StatisticsRootResponse, wtypes.text,
status_code=200)
def get(self):
context = pecan.request.context.get('octavia_context')
load_balancer = self._get_db_lb(context.session, self.id)
if not load_balancer:
LOG.info("Load balancer %s not found.", id)
raise exceptions.NotFound(
resource=data_models.LoadBalancer._name(),
id=id)
self._auth_validate_action(context, load_balancer.project_id,
constants.RBAC_GET_STATS)
lb_stats = self.get_loadbalancer_stats(context.session, self.id)
result = self._convert_db_to_type(
lb_stats, lb_types.LoadBalancerStatisticsResponse)
return lb_types.StatisticsRootResponse(stats=result)

View File

@ -157,8 +157,8 @@ class HealthMonitorSingleCreate(BaseHealthMonitorType):
admin_state_up = wtypes.wsattr(bool, default=True) admin_state_up = wtypes.wsattr(bool, default=True)
class HealthMonitorStatusesResponse(BaseHealthMonitorType): class HealthMonitorStatusResponse(BaseHealthMonitorType):
"""Defines which attributes are to be shown on statuses response.""" """Defines which attributes are to be shown on status response."""
id = wtypes.wsattr(wtypes.UuidType()) id = wtypes.wsattr(wtypes.UuidType())
name = wtypes.wsattr(wtypes.StringType()) name = wtypes.wsattr(wtypes.StringType())
type = wtypes.wsattr(wtypes.text) type = wtypes.wsattr(wtypes.text)

View File

@ -156,20 +156,20 @@ class ListenerSingleCreate(BaseListenerType):
wtypes.DictType(str, wtypes.StringType(max_length=255))) wtypes.DictType(str, wtypes.StringType(max_length=255)))
class ListenerStatusesResponse(BaseListenerType): class ListenerStatusResponse(BaseListenerType):
"""Defines which attributes are to be shown on statuses response.""" """Defines which attributes are to be shown on status response."""
id = wtypes.wsattr(wtypes.UuidType()) id = wtypes.wsattr(wtypes.UuidType())
name = wtypes.wsattr(wtypes.StringType()) name = wtypes.wsattr(wtypes.StringType())
operating_status = wtypes.wsattr(wtypes.StringType()) operating_status = wtypes.wsattr(wtypes.StringType())
provisioning_status = wtypes.wsattr(wtypes.StringType()) provisioning_status = wtypes.wsattr(wtypes.StringType())
pools = wtypes.wsattr([pool.PoolStatusesResponse]) pools = wtypes.wsattr([pool.PoolStatusResponse])
@classmethod @classmethod
def from_data_model(cls, data_model, children=False): def from_data_model(cls, data_model, children=False):
listener = super(ListenerStatusesResponse, cls).from_data_model( listener = super(ListenerStatusResponse, cls).from_data_model(
data_model, children=children) data_model, children=children)
pool_model = pool.PoolStatusesResponse pool_model = pool.PoolStatusResponse
listener.pools = [ listener.pools = [
pool_model.from_data_model(i) for i in data_model.pools] pool_model.from_data_model(i) for i in data_model.pools]
@ -177,3 +177,22 @@ class ListenerStatusesResponse(BaseListenerType):
listener.name = "" listener.name = ""
return listener return listener
class ListenerStatisticsResponse(BaseListenerType):
"""Defines which attributes are to show on stats response."""
bytes_in = wtypes.wsattr(wtypes.IntegerType())
bytes_out = wtypes.wsattr(wtypes.IntegerType())
active_connections = wtypes.wsattr(wtypes.IntegerType())
total_connections = wtypes.wsattr(wtypes.IntegerType())
request_errors = wtypes.wsattr(wtypes.IntegerType())
@classmethod
def from_data_model(cls, data_model, children=False):
result = super(ListenerStatisticsResponse, cls).from_data_model(
data_model, children=children)
return result
class StatisticsRootResponse(types.BaseType):
stats = wtypes.wsattr(ListenerStatisticsResponse)

View File

@ -141,19 +141,19 @@ class LoadBalancerRootPUT(types.BaseType):
loadbalancer = wtypes.wsattr(LoadBalancerPUT) loadbalancer = wtypes.wsattr(LoadBalancerPUT)
class LoadBalancerStatusesResponse(BaseLoadBalancerType): class LoadBalancerStatusResponse(BaseLoadBalancerType):
"""Defines which attributes are to be shown on statuses response.""" """Defines which attributes are to be shown on status response."""
id = wtypes.wsattr(wtypes.UuidType()) id = wtypes.wsattr(wtypes.UuidType())
name = wtypes.wsattr(wtypes.StringType()) name = wtypes.wsattr(wtypes.StringType())
operating_status = wtypes.wsattr(wtypes.StringType()) operating_status = wtypes.wsattr(wtypes.StringType())
provisioning_status = wtypes.wsattr(wtypes.StringType()) provisioning_status = wtypes.wsattr(wtypes.StringType())
listeners = wtypes.wsattr([listener.ListenerStatusesResponse]) listeners = wtypes.wsattr([listener.ListenerStatusResponse])
@classmethod @classmethod
def from_data_model(cls, data_model, children=False): def from_data_model(cls, data_model, children=False):
result = super(LoadBalancerStatusesResponse, cls).from_data_model( result = super(LoadBalancerStatusResponse, cls).from_data_model(
data_model, children=children) data_model, children=children)
listener_model = listener.ListenerStatusesResponse listener_model = listener.ListenerStatusResponse
result.listeners = [ result.listeners = [
listener_model.from_data_model(i) for i in data_model.listeners] listener_model.from_data_model(i) for i in data_model.listeners]
if not result.name: if not result.name:
@ -162,9 +162,28 @@ class LoadBalancerStatusesResponse(BaseLoadBalancerType):
return result return result
class StatusesResponse(wtypes.Base): class StatusResponse(wtypes.Base):
loadbalancer = wtypes.wsattr(LoadBalancerStatusesResponse) loadbalancer = wtypes.wsattr(LoadBalancerStatusResponse)
class StatusesRootResponse(types.BaseType): class StatusRootResponse(types.BaseType):
statuses = wtypes.wsattr(StatusesResponse) statuses = wtypes.wsattr(StatusResponse)
class LoadBalancerStatisticsResponse(BaseLoadBalancerType):
"""Defines which attributes are to show on stats response."""
bytes_in = wtypes.wsattr(wtypes.IntegerType())
bytes_out = wtypes.wsattr(wtypes.IntegerType())
active_connections = wtypes.wsattr(wtypes.IntegerType())
total_connections = wtypes.wsattr(wtypes.IntegerType())
request_errors = wtypes.wsattr(wtypes.IntegerType())
@classmethod
def from_data_model(cls, data_model, children=False):
result = super(LoadBalancerStatisticsResponse, cls).from_data_model(
data_model, children=children)
return result
class StatisticsRootResponse(types.BaseType):
stats = wtypes.wsattr(LoadBalancerStatisticsResponse)

View File

@ -114,8 +114,8 @@ class MemberSingleCreate(BaseMemberType):
subnet_id = wtypes.wsattr(wtypes.UuidType()) subnet_id = wtypes.wsattr(wtypes.UuidType())
class MemberStatusesResponse(BaseMemberType): class MemberStatusResponse(BaseMemberType):
"""Defines which attributes are to be shown on statuses response.""" """Defines which attributes are to be shown on status response."""
id = wtypes.wsattr(wtypes.UuidType()) id = wtypes.wsattr(wtypes.UuidType())
name = wtypes.wsattr(wtypes.StringType()) name = wtypes.wsattr(wtypes.StringType())
operating_status = wtypes.wsattr(wtypes.StringType()) operating_status = wtypes.wsattr(wtypes.StringType())
@ -125,7 +125,7 @@ class MemberStatusesResponse(BaseMemberType):
@classmethod @classmethod
def from_data_model(cls, data_model, children=False): def from_data_model(cls, data_model, children=False):
member = super(MemberStatusesResponse, cls).from_data_model( member = super(MemberStatusResponse, cls).from_data_model(
data_model, children=children) data_model, children=children)
if not member.name: if not member.name:

View File

@ -166,25 +166,25 @@ class PoolSingleCreate(BasePoolType):
members = wtypes.wsattr([member.MemberSingleCreate]) members = wtypes.wsattr([member.MemberSingleCreate])
class PoolStatusesResponse(BasePoolType): class PoolStatusResponse(BasePoolType):
"""Defines which attributes are to be shown on statuses response.""" """Defines which attributes are to be shown on status response."""
id = wtypes.wsattr(wtypes.UuidType()) id = wtypes.wsattr(wtypes.UuidType())
name = wtypes.wsattr(wtypes.StringType()) name = wtypes.wsattr(wtypes.StringType())
provisioning_status = wtypes.wsattr(wtypes.StringType()) provisioning_status = wtypes.wsattr(wtypes.StringType())
operating_status = wtypes.wsattr(wtypes.StringType()) operating_status = wtypes.wsattr(wtypes.StringType())
health_monitor = wtypes.wsattr( health_monitor = wtypes.wsattr(
health_monitor.HealthMonitorStatusesResponse) health_monitor.HealthMonitorStatusResponse)
members = wtypes.wsattr([member.MemberStatusesResponse]) members = wtypes.wsattr([member.MemberStatusResponse])
@classmethod @classmethod
def from_data_model(cls, data_model, children=False): def from_data_model(cls, data_model, children=False):
pool = super(PoolStatusesResponse, cls).from_data_model( pool = super(PoolStatusResponse, cls).from_data_model(
data_model, children=children) data_model, children=children)
member_model = member.MemberStatusesResponse member_model = member.MemberStatusResponse
if data_model.health_monitor: if data_model.health_monitor:
pool.health_monitor = ( pool.health_monitor = (
health_monitor.HealthMonitorStatusesResponse.from_data_model( health_monitor.HealthMonitorStatusResponse.from_data_model(
data_model.health_monitor)) data_model.health_monitor))
pool.members = [ pool.members = [
member_model.from_data_model(i) for i in data_model.members] member_model.from_data_model(i) for i in data_model.members]

View File

@ -189,6 +189,17 @@ class BaseAPITest(base_db_test.OctaviaDBTestBase):
request_errors=0) request_errors=0)
return db_ls.to_dict() return db_ls.to_dict()
def create_listener_stats_dynamic(self, listener_id, amphora_id,
bytes_in=0, bytes_out=0,
active_connections=0,
total_connections=0, request_errors=0):
db_ls = self.listener_stats_repo.create(
db_api.get_session(), listener_id=listener_id,
amphora_id=amphora_id, bytes_in=bytes_in,
bytes_out=bytes_out, active_connections=active_connections,
total_connections=total_connections, request_errors=request_errors)
return db_ls.to_dict()
def create_amphora(self, amphora_id, loadbalancer_id, **optionals): def create_amphora(self, amphora_id, loadbalancer_id, **optionals):
# We need to default these values in the request. # We need to default these values in the request.
opts = {'compute_id': uuidutils.generate_uuid(), opts = {'compute_id': uuidutils.generate_uuid(),

View File

@ -14,6 +14,7 @@
# under the License. # under the License.
import mock import mock
import random
from oslo_config import cfg from oslo_config import cfg
from oslo_config import fixture as oslo_fixture from oslo_config import fixture as oslo_fixture
@ -1128,3 +1129,107 @@ class TestListener(base.BaseAPITest):
'insert_headers': {'X-Forwarded-Four': 'true'}} 'insert_headers': {'X-Forwarded-Four': 'true'}}
body = self._build_body(lb_listener) body = self._build_body(lb_listener)
self.post(self.LISTENERS_PATH, body, status=400) self.post(self.LISTENERS_PATH, body, status=400)
def _getStats(self, listener_id):
res = self.get(self.LISTENER_PATH.format(
listener_id=listener_id + "/stats"))
return res.json.get('stats')
def test_statistics(self):
lb = self.create_load_balancer(
uuidutils.generate_uuid()).get('loadbalancer')
self.set_lb_status(lb['id'])
li = self.create_listener(
constants.PROTOCOL_HTTP, 80, lb.get('id')).get('listener')
amphora = self.create_amphora(uuidutils.generate_uuid(), lb['id'])
ls = self.create_listener_stats_dynamic(
listener_id=li.get('id'),
amphora_id=amphora.id,
bytes_in=random.randint(1, 9),
bytes_out=random.randint(1, 9),
total_connections=random.randint(1, 9),
request_errors=random.randint(1, 9))
response = self._getStats(li['id'])
self.assertEqual(ls['bytes_in'], response['bytes_in'])
self.assertEqual(ls['bytes_out'], response['bytes_out'])
self.assertEqual(ls['total_connections'],
response['total_connections'])
self.assertEqual(ls['active_connections'],
response['active_connections'])
self.assertEqual(ls['request_errors'],
response['request_errors'])
def test_statistics_authorized(self):
project_id = uuidutils.generate_uuid()
lb = self.create_load_balancer(
uuidutils.generate_uuid(),
project_id=project_id).get('loadbalancer')
self.set_lb_status(lb['id'])
li = self.create_listener(
constants.PROTOCOL_HTTP, 80, lb.get('id')).get('listener')
amphora = self.create_amphora(uuidutils.generate_uuid(), lb['id'])
ls = self.create_listener_stats_dynamic(
listener_id=li.get('id'),
amphora_id=amphora.id,
bytes_in=random.randint(1, 9),
bytes_out=random.randint(1, 9),
total_connections=random.randint(1, 9),
request_errors=random.randint(1, 9))
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
response = self._getStats(li['id'])
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(ls['bytes_in'], response['bytes_in'])
self.assertEqual(ls['bytes_out'], response['bytes_out'])
self.assertEqual(ls['total_connections'],
response['total_connections'])
self.assertEqual(ls['active_connections'],
response['active_connections'])
self.assertEqual(ls['request_errors'],
response['request_errors'])
def test_statistics_not_authorized(self):
lb = self.create_load_balancer(
uuidutils.generate_uuid()).get('loadbalancer')
self.set_lb_status(lb['id'])
li = self.create_listener(
constants.PROTOCOL_HTTP, 80, lb.get('id')).get('listener')
amphora = self.create_amphora(uuidutils.generate_uuid(), lb['id'])
self.create_listener_stats_dynamic(
listener_id=li.get('id'),
amphora_id=amphora.id,
bytes_in=random.randint(1, 9),
bytes_out=random.randint(1, 9),
total_connections=random.randint(1, 9),
request_errors=random.randint(1, 9))
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
uuidutils.generate_uuid()):
res = self.get(self.LISTENER_PATH.format(
listener_id=li['id'] + "/stats"), status=403)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, res.json)

View File

@ -13,6 +13,7 @@
# under the License. # under the License.
import copy import copy
import random
import mock import mock
from oslo_config import cfg from oslo_config import cfg
@ -1827,7 +1828,7 @@ class TestLoadBalancerGraph(base.BaseAPITest):
self.post(self.LBS_PATH, body) self.post(self.LBS_PATH, body)
def _getStatus(self, lb_id): def _getStatus(self, lb_id):
res = self.get(self.LB_PATH.format(lb_id=lb_id + "/statuses")) res = self.get(self.LB_PATH.format(lb_id=lb_id + "/status"))
return res.json.get('statuses').get('loadbalancer') return res.json.get('statuses').get('loadbalancer')
def test_statuses(self): def test_statuses(self):
@ -2122,7 +2123,111 @@ class TestLoadBalancerGraph(base.BaseAPITest):
with mock.patch.object(octavia.common.context.Context, 'project_id', with mock.patch.object(octavia.common.context.Context, 'project_id',
uuidutils.generate_uuid()): uuidutils.generate_uuid()):
res = self.get(self.LB_PATH.format(lb_id=lb['id'] + "/statuses"), res = self.get(self.LB_PATH.format(lb_id=lb['id'] + "/status"),
status=403) status=403)
self.conf.config(group='api_settings', auth_strategy=auth_strategy) self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, res.json) self.assertEqual(self.NOT_AUTHORIZED_BODY, res.json)
def _getStats(self, lb_id):
res = self.get(self.LB_PATH.format(lb_id=lb_id + "/stats"))
return res.json.get('stats')
def test_statistics(self):
lb = self.create_load_balancer(
uuidutils.generate_uuid()).get('loadbalancer')
self.set_lb_status(lb['id'])
li = self.create_listener(
constants.PROTOCOL_HTTP, 80, lb.get('id')).get('listener')
amphora = self.create_amphora(uuidutils.generate_uuid(), lb['id'])
ls = self.create_listener_stats_dynamic(
listener_id=li.get('id'),
amphora_id=amphora.id,
bytes_in=random.randint(1, 9),
bytes_out=random.randint(1, 9),
total_connections=random.randint(1, 9),
request_errors=random.randint(1, 9))
response = self._getStats(lb['id'])
self.assertEqual(ls['bytes_in'], response['bytes_in'])
self.assertEqual(ls['bytes_out'], response['bytes_out'])
self.assertEqual(ls['total_connections'],
response['total_connections'])
self.assertEqual(ls['active_connections'],
response['active_connections'])
self.assertEqual(ls['request_errors'],
response['request_errors'])
def test_statistics_authorized(self):
project_id = uuidutils.generate_uuid()
lb = self.create_load_balancer(
uuidutils.generate_uuid(),
project_id=project_id).get('loadbalancer')
self.set_lb_status(lb['id'])
li = self.create_listener(
constants.PROTOCOL_HTTP, 80, lb.get('id')).get('listener')
amphora = self.create_amphora(uuidutils.generate_uuid(), lb['id'])
ls = self.create_listener_stats_dynamic(
listener_id=li.get('id'),
amphora_id=amphora.id,
bytes_in=random.randint(1, 9),
bytes_out=random.randint(1, 9),
total_connections=random.randint(1, 9),
request_errors=random.randint(1, 9))
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
project_id):
override_credentials = {
'service_user_id': None,
'user_domain_id': None,
'is_admin_project': True,
'service_project_domain_id': None,
'service_project_id': None,
'roles': ['load-balancer_member'],
'user_id': None,
'is_admin': False,
'service_user_domain_id': None,
'project_domain_id': None,
'service_roles': [],
'project_id': project_id}
with mock.patch(
"oslo_context.context.RequestContext.to_policy_values",
return_value=override_credentials):
response = self._getStats(lb['id'])
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(ls['bytes_in'], response['bytes_in'])
self.assertEqual(ls['bytes_out'], response['bytes_out'])
self.assertEqual(ls['total_connections'],
response['total_connections'])
self.assertEqual(ls['active_connections'],
response['active_connections'])
self.assertEqual(ls['request_errors'],
response['request_errors'])
def test_statistics_not_authorized(self):
lb = self.create_load_balancer(
uuidutils.generate_uuid()).get('loadbalancer')
self.set_lb_status(lb['id'])
li = self.create_listener(
constants.PROTOCOL_HTTP, 80, lb.get('id')).get('listener')
amphora = self.create_amphora(uuidutils.generate_uuid(), lb['id'])
self.create_listener_stats_dynamic(
listener_id=li.get('id'),
amphora_id=amphora.id,
bytes_in=random.randint(1, 9),
bytes_out=random.randint(1, 9),
total_connections=random.randint(1, 9))
self.conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
auth_strategy = self.conf.conf.api_settings.get('auth_strategy')
self.conf.config(group='api_settings', auth_strategy=constants.TESTING)
with mock.patch.object(octavia.common.context.Context, 'project_id',
uuidutils.generate_uuid()):
res = self.get(self.LB_PATH.format(lb_id=lb['id'] + "/stats"),
status=403)
self.conf.config(group='api_settings', auth_strategy=auth_strategy)
self.assertEqual(self.NOT_AUTHORIZED_BODY, res.json)