Add Logging commands for OSC plugin

This commit supports Logging CLI as OSC plugin[1].
[1] https://bugs.launchpad.net/neutron/+bug/1468366

Depends-On: Ib8668dd25ee7c5000a6dafcc7db3dbc33ad190be
Partially-implements: blueprint security-group-logging
Change-Id: If9a5e0e3958ea32f4c0d3573efcb3ac4101c63a6
This commit is contained in:
Yushiro FURUKAWA 2016-12-12 23:14:56 +09:00
parent 05218d9500
commit c569d985c4
9 changed files with 1085 additions and 1 deletions

View File

@ -0,0 +1,15 @@
===========
network log
===========
A **network log** is a container to group security groups or ports for logging.
Specified resources can be logged via these event (``ALL``, ``ACCEPT`` or
``DROP``).
Network v2
.. autoprogram-cliff:: openstack.neutronclient.v2
:command: network loggable resources list
.. autoprogram-cliff:: openstack.neutronclient.v2
:command: network log *

View File

View File

@ -0,0 +1,289 @@
# Copyright 2017 FUJTISU LIMITED.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils
from oslo_log import log as logging
from neutronclient._i18n import _
from neutronclient.common import utils as nc_utils
from neutronclient.osc import utils as osc_utils
LOG = logging.getLogger(__name__)
_attr_map = (
('id', 'ID', osc_utils.LIST_BOTH),
('description', 'Description', osc_utils.LIST_LONG_ONLY),
('enabled', 'Enabled', osc_utils.LIST_BOTH),
('name', 'Name', osc_utils.LIST_BOTH),
('target_id', 'Target', osc_utils.LIST_LONG_ONLY),
('project_id', 'Project', osc_utils.LIST_LONG_ONLY),
('resource_id', 'Resource', osc_utils.LIST_LONG_ONLY),
('resource_type', 'Type', osc_utils.LIST_BOTH),
('event', 'Event', osc_utils.LIST_LONG_ONLY),
('summary', 'Summary', osc_utils.LIST_SHORT_ONLY),
)
_attr_map_for_loggable = (
('type', 'Supported types', osc_utils.LIST_BOTH),
)
NET_LOG = 'network_log'
def _get_common_parser(parser):
parser.add_argument(
'--description',
metavar='<description>',
help=_('Description of the network log'))
enable_group = parser.add_mutually_exclusive_group()
enable_group.add_argument(
'--enable',
action='store_true',
help=_('Enable this log (default is disabled)'))
enable_group.add_argument(
'--disable',
action='store_true',
help=_('Disable this log'))
return parser
def _get_common_attrs(client_manager, parsed_args, is_create=True):
attrs = {}
client = client_manager.neutronclient
if is_create:
if 'project' in parsed_args and parsed_args.project is not None:
attrs['project_id'] = osc_utils.find_project(
client_manager.identity,
parsed_args.project,
parsed_args.project_domain,
).id
if parsed_args.resource:
attrs['resource_id'] = client.find_resource(
'security_group', parsed_args.resource)['id']
if parsed_args.target:
# NOTE(yushiro) Currently, we're supporting only port
attrs['target_id'] = client.find_resource(
'port', parsed_args.target)['id']
if parsed_args.event:
attrs['event'] = parsed_args.event
if parsed_args.resource_type:
attrs['resource_type'] = parsed_args.resource_type
if parsed_args.enable:
attrs['enabled'] = True
if parsed_args.disable:
attrs['enabled'] = False
if parsed_args.name:
attrs['name'] = parsed_args.name
if parsed_args.description:
attrs['description'] = parsed_args.description
return attrs
class CreateNetworkLog(command.ShowOne):
_description = _("Create a new network log")
def get_parser(self, prog_name):
parser = super(CreateNetworkLog, self).get_parser(prog_name)
_get_common_parser(parser)
osc_utils.add_project_owner_option_to_parser(parser)
parser.add_argument(
'name',
metavar='<name>',
help=_('Name for the network log'))
parser.add_argument(
'--event',
metavar='<event>',
choices=['ALL', 'ACCEPT', 'DROP'],
type=nc_utils.convert_to_uppercase,
help=_('An event to store with log'))
# NOTE(yushiro) '--resource-type' is managed by following command:
# "openstack network loggable resource list". Therefore, this option
# shouldn't have "choices" like ['security_group']
parser.add_argument(
'--resource-type',
metavar='<resource-type>',
required=True,
type=nc_utils.convert_to_lowercase,
help=_('Network log type(s). '
'You can see supported type(s) with following command:\n'
'$ openstack network loggable resource list'))
parser.add_argument(
'--resource',
metavar='<resource>',
help=_('Security group (name or ID) for logging. You can control '
'for logging target combination with --target option.'))
parser.add_argument(
'--target',
metavar='<target>',
help=_('Port (name or ID) for logging. You can control '
'for logging target combination with --resource option.'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_common_attrs(self.app.client_manager, parsed_args)
obj = client.create_network_log({'log': attrs})['log']
columns, display_columns = osc_utils.get_columns(obj, _attr_map)
data = utils.get_dict_properties(obj, columns)
return (display_columns, data)
class DeleteNetworkLog(command.Command):
_description = _("Delete network log(s)")
def get_parser(self, prog_name):
parser = super(DeleteNetworkLog, self).get_parser(prog_name)
parser.add_argument(
'network_log',
metavar='<network-log>',
nargs='+',
help=_('Network log(s) to delete (name or ID)'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
result = 0
for log_res in parsed_args.network_log:
try:
log_id = client.find_resource(
'log', log_res, cmd_resource=NET_LOG)['id']
client.delete_network_log(log_id)
except Exception as e:
result += 1
LOG.error(_("Failed to delete network log with "
"name or ID '%(network_log)s': %(e)s"),
{'network_log': log_res, 'e': e})
if result > 0:
total = len(parsed_args.network_log)
msg = (_("%(result)s of %(total)s network log(s) "
"failed to delete") % {'result': result, 'total': total})
raise exceptions.CommandError(msg)
class ListLoggableResource(command.Lister):
_description = _("List supported loggable resources")
def get_parser(self, prog_name):
parser = super(ListLoggableResource, self).get_parser(prog_name)
parser.add_argument(
'--long',
action='store_true',
help=_("List additional fields in output")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
obj = client.list_network_loggable_resources()['loggable_resources']
headers, columns = osc_utils.get_column_definitions(
_attr_map_for_loggable, long_listing=parsed_args.long)
return (headers, (utils.get_dict_properties(s, columns) for s in obj))
class ListNetworkLog(command.Lister):
_description = _("List network logs")
def get_parser(self, prog_name):
parser = super(ListNetworkLog, self).get_parser(prog_name)
parser.add_argument(
'--long',
action='store_true',
help=_("List additional fields in output")
)
# TODO(yushiro): We'll support filtering in the future.
return parser
def _extend_list(self, data, parsed_args):
ext_data = copy.deepcopy(data)
for d in ext_data:
e_prefix = 'Event: '
if d['event']:
event = e_prefix + d['event'].upper()
port = '(port) ' + d['target_id'] if d['target_id'] else ''
sg = ('(security_group) ' + d['resource_id']
if d['resource_id'] else '')
t_prefix = 'Logged: '
t = sg + ' on ' + port if port and sg else sg + port
target = t_prefix + t if t else t_prefix + '(None specified)'
d['summary'] = ',\n'.join([event, target])
return ext_data
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
obj = client.list_network_logs()['logs']
obj_extend = self._extend_list(obj, parsed_args)
headers, columns = osc_utils.get_column_definitions(
_attr_map, long_listing=parsed_args.long)
return (headers, (
utils.get_dict_properties(s, columns) for s in obj_extend))
class SetNetworkLog(command.Command):
_description = _("Set network log properties")
def get_parser(self, prog_name):
parser = super(SetNetworkLog, self).get_parser(prog_name)
_get_common_parser(parser)
parser.add_argument(
'network_log',
metavar='<network-log>',
help=_('Network log to set (name or ID)'))
parser.add_argument(
'--name',
metavar='<name>',
help=_('Name of the network log'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
log_id = client.find_resource(
'log', parsed_args.network_log, cmd_resource=NET_LOG)['id']
attrs = _get_common_attrs(self.app.client_manager, parsed_args,
is_create=False)
try:
client.update_network_log(log_id, {'log': attrs})
except Exception as e:
msg = (_("Failed to set network log '%(logging)s': %(e)s")
% {'logging': parsed_args.network_log, 'e': e})
raise exceptions.CommandError(msg)
class ShowNetworkLog(command.ShowOne):
_description = _("Display network log details")
def get_parser(self, prog_name):
parser = super(ShowNetworkLog, self).get_parser(prog_name)
parser.add_argument(
'network_log',
metavar='<network-log>',
help=_('Network log to show (name or ID)'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
log_id = client.find_resource(
'log', parsed_args.network_log, cmd_resource=NET_LOG)['id']
obj = client.show_network_log(log_id)['log']
columns, display_columns = osc_utils.get_columns(obj, _attr_map)
data = utils.get_dict_properties(obj, columns)
return (display_columns, data)

View File

@ -0,0 +1,79 @@
# Copyright 2017 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import collections
import copy
import uuid
import mock
class FakeLogging(object):
def create(self, attrs={}):
"""Create a fake network logs
:param Dictionary attrs:
A dictionary with all attributes
:return:
A OrderedDict faking the network log
"""
self.ordered.update(attrs)
return copy.deepcopy(self.ordered)
def bulk_create(self, attrs=None, count=2):
"""Create multiple fake network logs
:param Dictionary attrs:
A dictionary with all attributes
:param int count:
The number of network logs to fake
:return:
A list of dictionaries faking the network logs
"""
return [self.create(attrs=attrs) for i in range(0, count)]
def get(self, attrs=None, count=2):
"""Create multiple fake network logs
:param Dictionary attrs:
A dictionary with all attributes
:param int count:
The number of network logs to fake
:return:
A list of dictionaries faking the network log
"""
if attrs is None:
self.attrs = self.bulk_create(count=count)
return mock.Mock(side_effect=attrs)
class NetworkLog(FakeLogging):
"""Fake one or more network log"""
def __init__(self):
super(NetworkLog, self).__init__()
self.ordered = collections.OrderedDict((
('id', 'log-id-' + uuid.uuid4().hex),
('description', 'my-desc-' + uuid.uuid4().hex),
('enabled', False),
('name', 'my-log-' + uuid.uuid4().hex),
('target_id', None),
('project_id', 'project-id-' + uuid.uuid4().hex),
('resource_id', None),
('resource_type', 'security_group'),
('event', 'all'),
))

View File

@ -0,0 +1,658 @@
# Copyright 2017 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import copy
import mock
from osc_lib import exceptions
from osc_lib.tests import utils
import testtools
from neutronclient.osc import utils as osc_utils
from neutronclient.osc.v2.logging import network_log
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
from neutronclient.tests.unit.osc.v2.logging import fakes
_log = fakes.NetworkLog().create()
RES_TYPE_SG = 'security_group'
CONVERT_MAP = {
'project': 'project_id',
'enable': 'enabled',
'disable': 'enabled',
'target': 'target_id',
'resource': 'resource_id',
'event': 'event',
}
def _generate_data(ordered_dict=None, data=None):
source = ordered_dict if ordered_dict else _log
if data:
source.update(data)
return tuple(source[key] for key in source)
def _generate_req_and_res(verifylist):
request = dict(verifylist)
response = copy.deepcopy(_log)
for key, val in verifylist:
converted = CONVERT_MAP.get(key, key)
del request[key]
if key == 'enable' and val:
new_value = True
elif key == 'disable' and val:
new_value = False
else:
new_value = val
request[converted] = new_value
response[converted] = new_value
return request, response
class TestNetworkLog(test_fakes.TestNeutronClientOSCV2):
def check_results(self, headers, data, exp_req, is_list=False):
if is_list:
req_body = {'logs': [exp_req]}
else:
req_body = {'log': exp_req}
self.mocked.assert_called_once_with(req_body)
self.assertEqual(self.ordered_headers, headers)
self.assertEqual(self.ordered_data, data)
def setUp(self):
super(TestNetworkLog, self).setUp()
self.neutronclient.find_resource = mock.Mock()
self.neutronclient.find_resource.side_effect = \
lambda x, y, **k: {'id': y}
osc_utils.find_project = mock.Mock()
osc_utils.find_project.id = _log['project_id']
self.res = _log
self.headers = (
'ID',
'Description',
'Enabled',
'Name',
'Target',
'Project',
'Resource',
'Type',
'Event',
)
self.data = _generate_data()
self.ordered_headers = (
'Description',
'Enabled',
'Event',
'ID',
'Name',
'Project',
'Resource',
'Target',
'Type',
)
self.ordered_data = (
_log['description'],
_log['enabled'],
_log['event'],
_log['id'],
_log['name'],
_log['project_id'],
_log['resource_id'],
_log['target_id'],
_log['resource_type'],
)
self.ordered_columns = (
'description',
'enabled',
'event',
'id',
'name',
'project_id',
'resource_id',
'target_id',
'resource_type',
)
class TestCreateNetworkLog(TestNetworkLog):
def setUp(self):
super(TestCreateNetworkLog, self).setUp()
self.neutronclient.create_network_log = mock.Mock(
return_value={'log': _log})
self.mocked = self.neutronclient.create_network_log
self.cmd = network_log.CreateNetworkLog(self.app, self.namespace)
def _update_expect_response(self, request, response):
"""Set expected request and response
:param request
A dictionary of request body(dict of verifylist)
:param response
A OrderedDict of request body
"""
# Update response body
self.neutronclient.create_network_log.return_value = \
{'log': dict(response)}
osc_utils.find_project.return_value.id = response['project_id']
# Update response(finally returns 'data')
self.data = _generate_data(ordered_dict=response)
self.ordered_data = tuple(
response[column] for column in self.ordered_columns
)
def _set_all_params(self, args={}):
name = args.get('name', 'my-log')
desc = args.get('description', 'my-description-for-log')
event = args.get('event', 'ACCEPT')
resource = args.get('resource', 'id-target-log')
target = args.get('target', 'id-target-log')
resource_type = args.get('resource_type', 'security_group')
project = args.get('project_id', 'id-my-project')
arglist = [
name,
'--description', desc,
'--enable',
'--target', target,
'--resource', resource,
'--event', event,
'--project', project,
'--resource-type', resource_type,
]
verifylist = [
('description', desc),
('enable', True),
('event', event),
('name', name),
('target', target),
('project', project),
('resource', target),
('resource_type', resource_type),
]
return arglist, verifylist
def _test_create_with_all_params(self, args={}):
arglist, verifylist = self._set_all_params(args)
request, response = _generate_req_and_res(verifylist)
self._update_expect_response(request, response)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.check_results(headers, data, request)
def test_create_with_no_options_and_raise(self):
arglist = []
verifylist = []
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_create_with_mandatory_params(self):
name = self.res['name']
arglist = [
name,
'--resource-type', RES_TYPE_SG,
]
verifylist = [
('name', name),
('resource_type', RES_TYPE_SG),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
expect = {
'name': self.res['name'],
'resource_type': self.res['resource_type'],
}
self.mocked.assert_called_once_with({'log': expect})
self.assertEqual(self.ordered_headers, headers)
self.assertEqual(self.ordered_data, data)
def test_create_with_disable(self):
name = self.res['name']
arglist = [
name,
'--resource-type', RES_TYPE_SG,
'--disable',
]
verifylist = [
('name', name),
('resource_type', RES_TYPE_SG),
('disable', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
expect = {
'name': self.res['name'],
'resource_type': self.res['resource_type'],
'enabled': False,
}
self.mocked.assert_called_once_with({'log': expect})
self.assertEqual(self.ordered_headers, headers)
self.assertEqual(self.ordered_data, data)
def test_create_with_all_params(self):
self._test_create_with_all_params()
def test_create_with_all_params_event_drop(self):
self._test_create_with_all_params({'event': 'DROP'})
def test_create_with_all_params_event_all(self):
self._test_create_with_all_params({'event': 'ALL'})
def test_create_with_all_params_except_event(self):
arglist, verifylist = self._set_all_params({'event': ''})
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_create_with_all_params_event_upper_capitalized(self):
for event in ('all', 'All', 'dROP', 'accePt', 'accept', 'drop'):
arglist, verifylist = self._set_all_params({'event': event})
self.assertRaises(
testtools.matchers._impl.MismatchError,
self.check_parser, self.cmd, arglist, verifylist)
def test_create_with_all_params_resource_type_upper_capitalized(self):
for res_type in ('SECURITY_GROUP', 'Security_group', 'security_Group'):
arglist, verifylist = self._set_all_params(
{'resource_type': res_type})
self.assertRaises(
testtools.matchers._impl.MismatchError,
self.check_parser, self.cmd, arglist, verifylist)
class TestListNetworkLog(TestNetworkLog):
def _setup_summary(self, expect=None):
event = 'Event: ' + self.res['event'].upper()
target = 'Logged: (None specified)'
if expect:
if expect.get('event'):
event = expect['event']
if expect.get('resource'):
target = expect['resource']
summary = ',\n'.join([event, target])
self.short_data = (
expect['id'] if expect else self.res['id'],
expect['enabled'] if expect else self.res['enabled'],
expect['name'] if expect else self.res['name'],
expect['resource_type'] if expect else self.res['resource_type'],
summary
)
def setUp(self):
super(TestListNetworkLog, self).setUp()
self.cmd = network_log.ListNetworkLog(self.app, self.namespace)
self.short_header = (
'ID',
'Enabled',
'Name',
'Type',
'Summary',
)
self._setup_summary()
self.neutronclient.list_network_logs = mock.Mock(
return_value={'logs': [self.res]})
self.mocked = self.neutronclient.list_network_logs
def test_list_with_long_option(self):
arglist = ['--long']
verifylist = [('long', True)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with()
self.assertEqual(list(self.headers), headers)
self.assertEqual([self.data], list(data))
def test_list_with_no_option(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with()
self.assertEqual(list(self.short_header), headers)
self.assertEqual([self.short_data], list(data))
def test_list_with_target_and_resource(self):
arglist = []
verifylist = []
target_id = 'aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaaaaaaa'
resource_id = 'bbbbbbbb-bbbb-bbbb-bbbbbbbbbbbbbbbbb'
log = fakes.NetworkLog().create({
'target_id': target_id,
'resource_id': resource_id})
self.mocked.return_value = {'logs': [log]}
logged = 'Logged: (security_group) %(res_id)s on (port) %(t_id)s' % {
'res_id': resource_id, 't_id': target_id}
expect_log = copy.deepcopy(log)
expect_log.update({
'resource': logged,
'event': 'Event: ALL'})
self._setup_summary(expect=expect_log)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with()
self.assertEqual(list(self.short_header), headers)
self.assertEqual([self.short_data], list(data))
def test_list_with_resource(self):
arglist = []
verifylist = []
resource_id = 'bbbbbbbb-bbbb-bbbb-bbbbbbbbbbbbbbbbb'
log = fakes.NetworkLog().create({'resource_id': resource_id})
self.mocked.return_value = {'logs': [log]}
logged = 'Logged: (security_group) %s' % resource_id
expect_log = copy.deepcopy(log)
expect_log.update({
'resource': logged,
'event': 'Event: ALL'})
self._setup_summary(expect=expect_log)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with()
self.assertEqual(list(self.short_header), headers)
self.assertEqual([self.short_data], list(data))
def test_list_with_target(self):
arglist = []
verifylist = []
target_id = 'aaaaaaaa-aaaa-aaaa-aaaaaaaaaaaaaaaaa'
log = fakes.NetworkLog().create({'target_id': target_id})
self.mocked.return_value = {'logs': [log]}
logged = 'Logged: (port) %s' % target_id
expect_log = copy.deepcopy(log)
expect_log.update({
'resource': logged,
'event': 'Event: ALL'})
self._setup_summary(expect=expect_log)
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with()
self.assertEqual(list(self.short_header), headers)
self.assertEqual([self.short_data], list(data))
class TestShowNetworkLog(TestNetworkLog):
def setUp(self):
super(TestShowNetworkLog, self).setUp()
self.neutronclient.show_network_log = mock.Mock(
return_value={'log': self.res})
self.mocked = self.neutronclient.show_network_log
self.cmd = network_log.ShowNetworkLog(self.app, self.namespace)
def test_show_filtered_by_id_or_name(self):
target = self.res['id']
arglist = [target]
verifylist = [('network_log', target)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(target)
self.assertEqual(self.ordered_headers, headers)
self.assertEqual(self.ordered_data, data)
class TestSetNetworkLog(TestNetworkLog):
def setUp(self):
super(TestSetNetworkLog, self).setUp()
self.neutronclient.update_network_log = mock.Mock(
return_value={'log': self.res})
self.mocked = self.neutronclient.update_network_log
self.cmd = network_log.SetNetworkLog(self.app, self.namespace)
def test_set_name(self):
target = self.res['id']
update = 'change'
arglist = [target, '--name', update]
verifylist = [
('network_log', target),
('name', update),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {'log': {'name': update}})
self.assertIsNone(result)
def test_set_description(self):
target = self.res['id']
update = 'change-desc'
arglist = [target, '--description', update]
verifylist = [
('network_log', target),
('description', update),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {'log': {'description': update}})
self.assertIsNone(result)
def test_set_enable(self):
target = self.res['id']
arglist = [target, '--enable']
verifylist = [
('network_log', target),
('enable', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {'log': {'enabled': True}})
self.assertIsNone(result)
def test_set_disable(self):
target = self.res['id']
arglist = [target, '--disable']
verifylist = [
('network_log', target),
('disable', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(
target, {'log': {'enabled': False}})
self.assertIsNone(result)
# Illegal tests
def test_illegal_set_resource_type(self):
target = self.res['id']
resource_type = 'security_group'
arglist = [target, '--resource-type', resource_type]
verifylist = [
('network_log', target),
('resource_type', resource_type),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_illegal_set_event(self):
target = self.res['id']
for event in ['all', 'accept', 'drop']:
arglist = [target, '--event', event]
verifylist = [
('network_log', target),
('event', event),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_illegal_set_resource_id(self):
target = self.res['id']
resource_id = 'resource-id-for-logged-target'
arglist = [target, '--resource', resource_id]
verifylist = [
('network_log', target),
('resource', resource_id),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_illegal_set_project(self):
target = self.res['id']
arglist = [
target,
'--project',
]
verifylist = [
('network_log', target),
('project', 'other-project'),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_illegal_set_project_domain(self):
target = self.res['id']
arglist = [
target,
'--project-domain',
]
verifylist = [
('network_log', target),
('project_domain', 'other-project-domain'),
]
self.assertRaises(
utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)
def test_illegal_set_and_raises(self):
self.neutronclient.update_network_log = mock.Mock(
side_effect=Exception)
target = self.res['id']
arglist = [target, '--name', 'my-name']
verifylist = [('network_log', target), ('name', 'my-name')]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.assertRaises(
exceptions.CommandError, self.cmd.take_action, parsed_args)
class TestDeleteNetworkLog(TestNetworkLog):
def setUp(self):
super(TestDeleteNetworkLog, self).setUp()
self.neutronclient.delete_network_log = mock.Mock(
return_value={'log': self.res})
self.mocked = self.neutronclient.delete_network_log
self.cmd = network_log.DeleteNetworkLog(self.app, self.namespace)
def test_delete_with_one_resource(self):
target = self.res['id']
arglist = [target]
verifylist = [('network_log', [target])]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with(target)
self.assertIsNone(result)
def test_delete_with_multiple_resources(self):
target1 = 'target1'
target2 = 'target2'
arglist = [target1, target2]
verifylist = [('network_log', [target1, target2])]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.assertIsNone(result)
self.assertEqual(2, self.mocked.call_count)
for idx, reference in enumerate([target1, target2]):
actual = ''.join(self.mocked.call_args_list[idx][0])
self.assertEqual(reference, actual)
def test_delete_with_no_exist_id(self):
self.neutronclient.find_resource.side_effect = Exception
target = 'not_exist'
arglist = [target]
verifylist = [('network_log', [target])]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
self.assertRaises(
exceptions.CommandError, self.cmd.take_action, parsed_args)
class TestLoggableResource(test_fakes.TestNeutronClientOSCV2):
def check_results(self, headers, data, exp_req, is_list=False):
if is_list:
req_body = {'logs': [exp_req]}
else:
req_body = {'log': exp_req}
self.mocked.assert_called_once_with(req_body)
self.assertEqual(self.ordered_headers, headers)
self.assertEqual(self.ordered_data, data)
def setUp(self):
super(TestLoggableResource, self).setUp()
self.headers = ('Supported types',)
self.data = ('security_group', )
class TestListLoggableResource(TestLoggableResource):
def setUp(self):
super(TestListLoggableResource, self).setUp()
self.cmd = network_log.ListLoggableResource(self.app, self.namespace)
loggables = {
"loggable_resources": [{"type": "security_group"}]
}
self.neutronclient.list_network_loggable_resources = mock.Mock(
return_value=loggables)
self.mocked = self.neutronclient.list_network_loggable_resources
def test_list_with_long_option(self):
arglist = ['--long']
verifylist = [('long', True)]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with()
self.assertEqual(list(self.headers), headers)
self.assertEqual([self.data], list(data))
def test_list_with_no_option(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
headers, data = self.cmd.take_action(parsed_args)
self.mocked.assert_called_once_with()
self.assertEqual(list(self.headers), headers)
self.assertEqual([self.data], list(data))

View File

@ -1,5 +1,6 @@
# Copyright 2012 OpenStack Foundation.
# Copyright 2015 Hewlett-Packard Development Company, L.P.
# Copyright 2017 FUJITSU LIMITED
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -649,6 +650,9 @@ class Client(ClientBase):
bgpvpn_router_associations_path = "/bgpvpn/bgpvpns/%s/router_associations"
bgpvpn_router_association_path =\
"/bgpvpn/bgpvpns/%s/router_associations/%s"
network_logs_path = "/log/logs"
network_log_path = "/log/logs/%s"
network_loggables_path = "/log/loggable-resources"
# API has no way to report plurals, so we have to hard code them
EXTED_PLURALS = {'routers': 'router',
@ -707,6 +711,8 @@ class Client(ClientBase):
'port_pair_groups': 'port_pair_group',
'port_chains': 'port_chain',
'service_graphs': 'service_graph',
'logs': 'log',
'loggable_resources': 'loggable_resource',
}
def list_ext(self, collection, path, retrieve_all, **_params):
@ -2286,6 +2292,32 @@ class Client(ClientBase):
return self.get(self.sfc_service_graph_path % service_graph,
params=_params)
def create_network_log(self, body=None):
"""Create a network log."""
return self.post(self.network_logs_path, body=body)
def delete_network_log(self, net_log):
"""Delete a network log."""
return self.delete(self.network_log_path % net_log)
def list_network_logs(self, retrieve_all=True, **_params):
"""Fetch a list of all network logs."""
return self.list(
'logs', self.network_logs_path, retrieve_all, **_params)
def show_network_log(self, net_log, **_params):
"""Fetch information for a certain network log."""
return self.get(self.network_log_path % net_log, params=_params)
def update_network_log(self, net_log, body=None):
"""Update a network log."""
return self.put(self.network_log_path % net_log, body=body)
def list_network_loggable_resources(self, retrieve_all=True, **_params):
"""Fetch a list of supported resource types for network log."""
return self.list('loggable_resources', self.network_loggables_path,
retrieve_all, **_params)
def __init__(self, **kwargs):
"""Initialize a new client for the Neutron v2.0 API."""
super(Client, self).__init__(**kwargs)

View File

@ -0,0 +1,5 @@
---
features:
- |
CLI support for 'Logging' feature, which enable to collect packet logs
for specified resource. Currently, only security-group can be logged.

View File

@ -125,6 +125,13 @@ openstack.neutronclient.v2 =
bgpvpn_router_association_list = neutronclient.osc.v2.networking_bgpvpn.router_association:ListBgpvpnRouterAssoc
bgpvpn_router_association_show = neutronclient.osc.v2.networking_bgpvpn.router_association:ShowBgpvpnRouterAssoc
network_loggable_resources_list = neutronclient.osc.v2.logging.network_log:ListLoggableResource
network_log_create = neutronclient.osc.v2.logging.network_log:CreateNetworkLog
network_log_delete = neutronclient.osc.v2.logging.network_log:DeleteNetworkLog
network_log_list = neutronclient.osc.v2.logging.network_log:ListNetworkLog
network_log_set = neutronclient.osc.v2.logging.network_log:SetNetworkLog
network_log_show = neutronclient.osc.v2.logging.network_log:ShowNetworkLog
neutron.cli.v2 =
bash-completion = neutronclient.shell:BashCompletionCommand
@ -409,7 +416,6 @@ neutron.cli.v2 =
vpn-ikepolicy-update = neutronclient.neutron.v2_0.vpn.ikepolicy:UpdateIKEPolicy
vpn-ikepolicy-delete = neutronclient.neutron.v2_0.vpn.ikepolicy:DeleteIKEPolicy
[build_sphinx]
all_files = 1
build-dir = doc/build