Merge "Implement MidoNet Quantum Plugin"

This commit is contained in:
Jenkins 2013-02-20 07:55:20 +00:00 committed by Gerrit Code Review
commit f64c40b47d
8 changed files with 2524 additions and 0 deletions

View File

@ -0,0 +1,41 @@
[DATABASE]
# This line MUST be changed to actually run the plugin.
# Example:
# sql_connection = mysql://root:pass@127.0.0.1:3306/midonet_quantum
# Replace 127.0.0.1 above with the IP address of the database used by the
# main quantum server. (Leave it as is if the database runs on this host.)
sql_connection = sqlite://
# Database reconnection retry times - in event connectivity is lost
# set to -1 implies an infinite retry count
# sql_max_retries = 10
# Database reconnection interval in seconds - if the initial connection to the
# database fails
reconnect_interval = 2
# Enable the use of eventlet's db_pool for MySQL. The flags sql_min_pool_size,
# sql_max_pool_size and sql_idle_timeout are relevant only if this is enabled.
# sql_dbpool_enable = False
# Minimum number of SQL connections to keep open in a pool
# sql_min_pool_size = 1
# Maximum number of SQL connections to keep open in a pool
# sql_max_pool_size = 5
# Timeout in seconds before idle sql connections are reaped
# sql_idle_timeout = 3600
[MIDONET]
# MidoNet API server URI
# midonet_uri = http://localhost:8080/midonet-api
# MidoNet admin username
#username = admin
# MidoNet admin password
#password = passw0rd
# ID of the project that MidoNet admin user belongs to
#project_id = 77777777-7777-7777-7777-777777777777
# Virtual provider router ID
#provider_router_id = 00112233-0011-0011-0011-001122334455
# Virtual metadata router ID
#metadata_router_id = ffeeddcc-ffee-ffee-ffee-ffeeddccbbaa

View File

@ -0,0 +1,17 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Midokura Japan K.K.
# Copyright (C) 2013 Midokura PTE LTD
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,47 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Midokura Japan K.K.
# Copyright (C) 2013 Midokura PTE LTD
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Tomoe Sugihara, Midokura Japan KK
from oslo.config import cfg
midonet_opts = [
cfg.StrOpt('midonet_uri', default='http://localhost:8080/midonet-api',
help=_('MidoNet API server URI.')),
cfg.StrOpt('username', default='admin',
help=_('MidoNet admin username.')),
cfg.StrOpt('password', default='passw0rd',
secret=True,
help=_('MidoNet admin password.')),
cfg.StrOpt('project_id',
default='77777777-7777-7777-7777-777777777777',
help=_('ID of the project that MidoNet admin user'
'belongs to.')),
cfg.StrOpt('provider_router_id',
default=None,
help=_('Virtual provider router ID.')),
cfg.StrOpt('metadata_router_id',
default=None,
help=_('Virtual metadata router ID.')),
cfg.StrOpt('mode',
default='dev',
help=_('Operational mode. Internal dev use only.'))
]
cfg.CONF.register_opts(midonet_opts, "MIDONET")

View File

@ -0,0 +1,305 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Midokura Japan K.K.
# Copyright (C) 2013 Midokura PTE LTD
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Tomoe Sugihara, Midokura Japan KK
# @author: Ryu Ishimoto, Midokura Japan KK
from quantum.openstack.common import log as logging
LOG = logging.getLogger(__name__)
PREFIX = 'OS_SG_'
SUFFIX_IN = '_IN'
SUFFIX_OUT = '_OUT'
OS_ROUTER_IN_CHAIN_NAME_FORMAT = 'OS_ROUTER_IN_%s'
OS_ROUTER_OUT_CHAIN_NAME_FORMAT = 'OS_ROUTER_OUT_%s'
NAME_IDENTIFIABLE_PREFIX_LEN = len(PREFIX) + 36 # 36 = length of uuid
def sg_label(sg_id, sg_name):
"""Construct the security group ID used as chain identifier in MidoNet."""
return PREFIX + str(sg_id) + '_' + sg_name
port_group_name = sg_label
def chain_names(sg_id, sg_name):
"""Get inbound and outbound chain names."""
prefix = sg_label(sg_id, sg_name)
in_chain_name = prefix + SUFFIX_IN
out_chain_name = prefix + SUFFIX_OUT
return {'in': in_chain_name, 'out': out_chain_name}
class ChainManager:
def __init__(self, mido_api):
self.mido_api = mido_api
def create_for_sg(self, tenant_id, sg_id, sg_name):
"""Create a new chain for security group.
Creating a security group creates a pair of chains in MidoNet, one for
inbound and the other for outbound.
"""
LOG.debug(_("ChainManager.create_for_sg called: "
"tenant_id=%(tenant_id)s sg_id=%(sg_id)s "
"sg_name=%(sg_name)s "),
{'tenant_id': tenant_id, 'sg_id': sg_id, 'sg_name': sg_name})
cnames = chain_names(sg_id, sg_name)
self.mido_api.add_chain().tenant_id(tenant_id).name(
cnames['in']).create()
self.mido_api.add_chain().tenant_id(tenant_id).name(
cnames['out']).create()
def delete_for_sg(self, tenant_id, sg_id, sg_name):
"""Delete a chain mapped to a security group.
Delete a SG means deleting all the chains (inbound and outbound)
associated with the SG in MidoNet.
"""
LOG.debug(_("ChainManager.delete_for_sg called: "
"tenant_id=%(tenant_id)s sg_id=%(sg_id)s "
"sg_name=%(sg_name)s "),
{'tenant_id': tenant_id, 'sg_id': sg_id, 'sg_name': sg_name})
cnames = chain_names(sg_id, sg_name)
chains = self.mido_api.get_chains({'tenant_id': tenant_id})
for c in chains:
if c.get_name() == cnames['in'] or c.get_name() == cnames['out']:
LOG.debug(_('ChainManager.delete_for_sg: deleting chain=%r'),
c)
c.delete()
def get_router_chains(self, tenant_id, router_id):
"""Get router chains.
Returns a dictionary that has in/out chain resources key'ed with 'in'
and 'out' respectively, given the tenant_id and the router_id passed
in in the arguments.
"""
LOG.debug(_("ChainManager.get_router_chains called: "
"tenant_id=%(tenant_id)s router_id=%(router_id)s"),
{'tenant_id': tenant_id, 'router_id': router_id})
router_chain_names = self._get_router_chain_names(router_id)
chains = {}
for c in self.mido_api.get_chains({'tenant_id': tenant_id}):
if c.get_name() == router_chain_names['in']:
chains['in'] = c
elif c.get_name() == router_chain_names['out']:
chains['out'] = c
return chains
def create_router_chains(self, tenant_id, router_id):
"""Create a new chain on a router.
Creates chains for the router and returns the same dictionary as
get_router_chains() returns.
"""
LOG.debug(_("ChainManager.create_router_chains called: "
"tenant_id=%(tenant_id)s router_id=%(router_id)s"),
{'tenant_id': tenant_id, 'router_id': router_id})
chains = {}
router_chain_names = self._get_router_chain_names(router_id)
chains['in'] = self.mido_api.add_chain().tenant_id(tenant_id).name(
router_chain_names['in']).create()
chains['out'] = self.mido_api.add_chain().tenant_id(tenant_id).name(
router_chain_names['out']).create()
return chains
def get_sg_chains(self, tenant_id, sg_id):
"""Get a list of chains mapped to a security group."""
LOG.debug(_("ChainManager.get_sg_chains called: "
"tenant_id=%(tenant_id)s sg_id=%(sg_id)s"),
{'tenant_id': tenant_id, 'sg_id': sg_id})
cnames = chain_names(sg_id, sg_name='')
chain_name_prefix_for_id = cnames['in'][:NAME_IDENTIFIABLE_PREFIX_LEN]
chains = {}
for c in self.mido_api.get_chains({'tenant_id': tenant_id}):
if c.get_name().startswith(chain_name_prefix_for_id):
if c.get_name().endswith(SUFFIX_IN):
chains['in'] = c
if c.get_name().endswith(SUFFIX_OUT):
chains['out'] = c
assert 'in' in chains
assert 'out' in chains
return chains
def _get_router_chain_names(self, router_id):
LOG.debug(_("ChainManager.get_router_chain_names called: "
"router_id=%(router_id)s"), {'router_id': router_id})
in_name = OS_ROUTER_IN_CHAIN_NAME_FORMAT % router_id
out_name = OS_ROUTER_OUT_CHAIN_NAME_FORMAT % router_id
router_chain_names = {'in': in_name, 'out': out_name}
return router_chain_names
class PortGroupManager:
def __init__(self, mido_api):
self.mido_api = mido_api
def create(self, tenant_id, sg_id, sg_name):
LOG.debug(_("PortGroupManager.create called: "
"tenant_id=%(tenant_id)s sg_id=%(sg_id)s "
"sg_name=%(sg_name)s"),
{'tenant_id': tenant_id, 'sg_id': sg_id, 'sg_name': sg_name})
pg_name = port_group_name(sg_id, sg_name)
self.mido_api.add_port_group().tenant_id(tenant_id).name(
pg_name).create()
def delete(self, tenant_id, sg_id, sg_name):
LOG.debug(_("PortGroupManager.delete called: "
"tenant_id=%(tenant_id)s sg_id=%(sg_id)s "
"sg_name=%(sg_name)s"),
{'tenant_id': tenant_id, 'sg_id': sg_id, 'sg_name': sg_name})
pg_name = port_group_name(sg_id, sg_name)
pgs = self.mido_api.get_port_groups({'tenant_id': tenant_id})
for pg in pgs:
if pg.get_name() == pg_name:
LOG.debug(_("PortGroupManager.delete: deleting pg=%r"), pg)
pg.delete()
def get_for_sg(self, tenant_id, sg_id):
LOG.debug(_("PortGroupManager.get_for_sg called: "
"tenant_id=%(tenant_id)s sg_id=%(sg_id)s"),
{'tenant_id': tenant_id, 'sg_id': sg_id})
pg_name_prefix = port_group_name(
sg_id, sg_name='')[:NAME_IDENTIFIABLE_PREFIX_LEN]
port_groups = self.mido_api.get_port_groups({'tenant_id': tenant_id})
for pg in port_groups:
if pg.get_name().startswith(pg_name_prefix):
LOG.debug(_("PortGroupManager.get_for_sg exiting: pg=%r"), pg)
return pg
return None
class RuleManager:
OS_SG_KEY = 'os_sg_rule_id'
def __init__(self, mido_api):
self.mido_api = mido_api
self.chain_manager = ChainManager(mido_api)
self.pg_manager = PortGroupManager(mido_api)
def _properties(self, os_sg_rule_id):
return {self.OS_SG_KEY: str(os_sg_rule_id)}
def create_for_sg_rule(self, rule):
LOG.debug(_("RuleManager.create_for_sg_rule called: rule=%r"), rule)
direction = rule['direction']
protocol = rule['protocol']
port_range_max = rule['port_range_max']
rule_id = rule['id']
ethertype = rule['ethertype']
security_group_id = rule['security_group_id']
source_group_id = rule['source_group_id']
source_ip_prefix = rule['source_ip_prefix'] # watch out. not validated
tenant_id = rule['tenant_id']
port_range_min = rule['port_range_min']
external_id = rule['external_id']
# construct a corresponding rule
tp_src_start = tp_src_end = None
tp_dst_start = tp_dst_end = None
nw_src_address = None
nw_src_length = None
port_group_id = None
# handle source
if not source_ip_prefix is None:
nw_src_address, nw_src_length = source_ip_prefix.split('/')
elif not source_group_id is None: # security group as a srouce
source_pg = self.pg_manager.get_for_sg(tenant_id, source_group_id)
port_group_id = source_pg.get_id()
else:
raise Exception(_("Don't know what to do with rule=%r"), rule)
# dst ports
tp_dst_start, tp_dst_end = port_range_min, port_range_max
# protocol
if rule['protocol'] == 'tcp':
nw_proto = 6
elif rule['protocol'] == 'udp':
nw_proto = 17
elif rule['protocol'] == 'icmp':
nw_proto = 1
# extract type and code from reporposed fields
icmp_type = rule['from_port']
icmp_code = rule['to_port']
# translate -1(wildcard in OS) to midonet wildcard
if icmp_type == -1:
icmp_type = None
if icmp_code == -1:
icmp_code = None
# set data for midonet rule
tp_src_start = tp_src_end = icmp_type
tp_dst_start = tp_dst_end = icmp_code
chains = self.chain_manager.get_sg_chains(tenant_id, security_group_id)
chain = None
if direction == 'egress':
chain = chains['in']
elif direction == 'ingress':
chain = chains['out']
else:
raise Exception(_("Don't know what to do with rule=%r"), rule)
# create an accept rule
properties = self._properties(rule_id)
LOG.debug(_("RuleManager.create_for_sg_rule: adding accept rule "
"%(rule_id) in portgroup %(port_group_id)s"),
{'rule_id': rule_id, 'port_group_id': port_group_id})
chain.add_rule().port_group(port_group_id).type('accept').nw_proto(
nw_proto).nw_src_address(nw_src_address).nw_src_length(
nw_src_length).tp_src_start(tp_src_start).tp_src_end(
tp_src_end).tp_dst_start(tp_dst_start).tp_dst_end(
tp_dst_end).properties(properties).create()
def delete_for_sg_rule(self, rule):
LOG.debug(_("RuleManager.delete_for_sg_rule called: rule=%r"), rule)
tenant_id = rule['tenant_id']
security_group_id = rule['security_group_id']
rule_id = rule['id']
properties = self._properties(rule_id)
# search for the chains to find the rule to delete
chains = self.chain_manager.get_sg_chains(tenant_id, security_group_id)
for c in chains['in'], chains['out']:
rules = c.get_rules()
for r in rules:
if r.get_properties() == properties:
LOG.debug(_("RuleManager.delete_for_sg_rule: deleting "
"rule %r"), r)
r.delete()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,17 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Midokura Japan K.K.
# Copyright (C) 2013 Midokura PTE LTD
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,252 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Midokura Japan K.K.
# Copyright (C) 2013 Midokura PTE LTD
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Ryu Ishimoto, Midokura Japan KK
# @author: Tomoe Sugihara, Midokura Japan KK
import unittest2 as unittest
import uuid
import mock
from quantum.plugins.midonet import midonet_lib
class MidonetLibTestCase(unittest.TestCase):
def setUp(self):
self.mock_api = mock.Mock()
def tearDown(self):
self.mock_api = None
def _create_mock_chains(self, sg_id, sg_name):
mock_in_chain = mock.Mock()
mock_in_chain.get_name.return_value = "OS_SG_%s_%s_IN" % (sg_id,
sg_name)
mock_out_chain = mock.Mock()
mock_out_chain.get_name.return_value = "OS_SG_%s_%s_OUT" % (sg_id,
sg_name)
return (mock_in_chain, mock_out_chain)
def _create_mock_router_chains(self, router_id):
mock_in_chain = mock.Mock()
mock_in_chain.get_name.return_value = "OS_ROUTER_IN_%s" % (router_id)
mock_out_chain = mock.Mock()
mock_out_chain.get_name.return_value = "OS_ROUTER_OUT_%s" % (router_id)
return (mock_in_chain, mock_out_chain)
def _create_mock_port_group(self, sg_id, sg_name):
mock_pg = mock.Mock()
mock_pg.get_name.return_value = "OS_SG_%s_%s" % (sg_id, sg_name)
return mock_pg
def _create_mock_rule(self, rule_id):
mock_rule = mock.Mock()
mock_rule.get_properties.return_value = {"os_sg_rule_id": rule_id}
return mock_rule
class MidonetChainManagerTestCase(MidonetLibTestCase):
def setUp(self):
super(MidonetChainManagerTestCase, self).setUp()
self.mgr = midonet_lib.ChainManager(self.mock_api)
def tearDown(self):
self.mgr = None
super(MidonetChainManagerTestCase, self).tearDown()
def test_create_for_sg(self):
tenant_id = 'test_tenant'
sg_id = str(uuid.uuid4())
sg_name = 'test_sg_name'
calls = [mock.call.add_chain().tenant_id(tenant_id)]
self.mgr.create_for_sg(tenant_id, sg_id, sg_name)
self.mock_api.assert_has_calls(calls)
def test_delete_for_sg(self):
tenant_id = 'test_tenant'
sg_id = str(uuid.uuid4())
sg_name = 'test_sg_name'
in_chain, out_chain = self._create_mock_chains(sg_id, sg_name)
# Mock get_chains returned values
self.mock_api.get_chains.return_value = [in_chain, out_chain]
self.mgr.delete_for_sg(tenant_id, sg_id, sg_name)
self.mock_api.assert_has_calls(mock.call.get_chains(
{"tenant_id": tenant_id}))
in_chain.delete.assert_called_once_with()
out_chain.delete.assert_called_once_with()
def test_get_router_chains(self):
tenant_id = 'test_tenant'
router_id = str(uuid.uuid4())
in_chain, out_chain = self._create_mock_router_chains(router_id)
# Mock get_chains returned values
self.mock_api.get_chains.return_value = [in_chain, out_chain]
chains = self.mgr.get_router_chains(tenant_id, router_id)
self.mock_api.assert_has_calls(mock.call.get_chains(
{"tenant_id": tenant_id}))
self.assertEquals(len(chains), 2)
self.assertEquals(chains['in'], in_chain)
self.assertEquals(chains['out'], out_chain)
def test_create_router_chains(self):
tenant_id = 'test_tenant'
router_id = str(uuid.uuid4())
calls = [mock.call.add_chain().tenant_id(tenant_id)]
self.mgr.create_router_chains(tenant_id, router_id)
self.mock_api.assert_has_calls(calls)
def test_get_sg_chains(self):
tenant_id = 'test_tenant'
sg_id = str(uuid.uuid4())
in_chain, out_chain = self._create_mock_chains(sg_id, 'foo')
# Mock get_chains returned values
self.mock_api.get_chains.return_value = [in_chain, out_chain]
chains = self.mgr.get_sg_chains(tenant_id, sg_id)
self.mock_api.assert_has_calls(mock.call.get_chains(
{"tenant_id": tenant_id}))
self.assertEquals(len(chains), 2)
self.assertEquals(chains['in'], in_chain)
self.assertEquals(chains['out'], out_chain)
class MidonetPortGroupManagerTestCase(MidonetLibTestCase):
def setUp(self):
super(MidonetPortGroupManagerTestCase, self).setUp()
self.mgr = midonet_lib.PortGroupManager(self.mock_api)
def tearDown(self):
self.mgr = None
super(MidonetPortGroupManagerTestCase, self).tearDown()
def test_create(self):
tenant_id = 'test_tenant'
sg_id = str(uuid.uuid4())
sg_name = 'test_sg'
pg_mock = self._create_mock_port_group(sg_id, sg_name)
rv = self.mock_api.add_port_group.return_value.tenant_id.return_value
rv.name.return_value = pg_mock
self.mgr.create(tenant_id, sg_id, sg_name)
pg_mock.create.assert_called_once_with()
def test_delete(self):
tenant_id = 'test_tenant'
sg_id = str(uuid.uuid4())
sg_name = 'test_sg'
pg_mock1 = self._create_mock_port_group(sg_id, sg_name)
pg_mock2 = self._create_mock_port_group(sg_id, sg_name)
self.mock_api.get_port_groups.return_value = [pg_mock1, pg_mock2]
self.mgr.delete(tenant_id, sg_id, sg_name)
self.mock_api.assert_has_calls(mock.call.get_port_groups(
{"tenant_id": tenant_id}))
pg_mock1.delete.assert_called_once_with()
pg_mock2.delete.assert_called_once_with()
def test_get_for_sg(self):
tenant_id = 'test_tenant'
sg_id = str(uuid.uuid4())
pg_mock = self._create_mock_port_group(sg_id, 'foo')
self.mock_api.get_port_groups.return_value = [pg_mock]
pg = self.mgr.get_for_sg(tenant_id, sg_id)
self.assertEquals(pg, pg_mock)
class MidonetRuleManagerTestCase(MidonetLibTestCase):
def setUp(self):
super(MidonetRuleManagerTestCase, self).setUp()
self.mgr = midonet_lib.RuleManager(self.mock_api)
self.mgr.chain_manager = mock.Mock()
self.mgr.pg_manager = mock.Mock()
def tearDown(self):
self.mgr = None
super(MidonetRuleManagerTestCase, self).tearDown()
def _create_test_rule(self, tenant_id, sg_id, rule_id, direction="egress",
protocol="tcp", port_min=1, port_max=65535,
src_ip='192.168.1.0/24', src_group_id=None,
ethertype=0x0800):
return {"tenant_id": tenant_id, "security_group_id": sg_id,
"rule_id": rule_id, "direction": direction,
"protocol": protocol,
"source_ip_prefix": src_ip, "source_group_id": src_group_id,
"port_range_min": port_min, "port_range_max": port_max,
"ethertype": ethertype, "id": rule_id, "external_id": None}
def test_create_for_sg_rule(self):
tenant_id = 'test_tenant'
sg_id = str(uuid.uuid4())
rule_id = str(uuid.uuid4())
in_chain, out_chain = self._create_mock_chains(sg_id, 'foo')
self.mgr.chain_manager.get_sg_chains.return_value = {"in": in_chain,
"out": out_chain}
props = {"os_sg_rule_id": rule_id}
rule = self._create_test_rule(tenant_id, sg_id, rule_id)
calls = [mock.call.add_rule().port_group(None).type(
'accept').nw_proto(6).nw_src_address(
'192.168.1.0').nw_src_length(24).tp_src_start(
None).tp_src_end(None).tp_dst_start(1).tp_dst_end(
65535).properties(props).create()]
self.mgr.create_for_sg_rule(rule)
in_chain.assert_has_calls(calls)
def test_delete_for_sg_rule(self):
tenant_id = 'test_tenant'
sg_id = str(uuid.uuid4())
rule_id = str(uuid.uuid4())
in_chain, out_chain = self._create_mock_chains(sg_id, 'foo')
self.mgr.chain_manager.get_sg_chains.return_value = {"in": in_chain,
"out": out_chain}
# Mock the rules returned for each chain
mock_rule_in = self._create_mock_rule(rule_id)
mock_rule_out = self._create_mock_rule(rule_id)
in_chain.get_rules.return_value = [mock_rule_in]
out_chain.get_rules.return_value = [mock_rule_out]
rule = self._create_test_rule(tenant_id, sg_id, rule_id)
self.mgr.delete_for_sg_rule(rule)
mock_rule_in.delete.assert_called_once_with()
mock_rule_out.delete.assert_called_once_with()

View File

@ -0,0 +1,752 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Midokura Japan K.K.
# Copyright (C) 2013 Midokura PTE LTD
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Rossella Sblendido, Midokura Europe SARL
# @author: Ryu Ishimoto, Midokura Japan KK
# @author: Tomoe Sugihara, Midokura Japan KK
import sys
import uuid
import mock
from webob import exc as w_exc
import quantum.common.test_lib as test_lib
import quantum.tests.unit.midonet as midonet
import quantum.tests.unit.test_db_plugin as test_plugin
MIDOKURA_PKG_PATH = "quantum.plugins.midonet.plugin"
# Need to mock the midonetclient module since the plugin will try to load it.
sys.modules["midonetclient"] = mock.Mock()
class MidonetPluginV2TestCase(test_plugin.QuantumDbPluginV2TestCase):
_plugin_name = ('%s.MidonetPluginV2' % MIDOKURA_PKG_PATH)
def setUp(self):
self.mock_api = mock.patch('midonetclient.api.MidonetApi')
self.instance = self.mock_api.start()
super(MidonetPluginV2TestCase, self).setUp(self._plugin_name)
def tearDown(self):
super(MidonetPluginV2TestCase, self).tearDown()
self.mock_api.stop()
def _setup_bridge_mock(self, bridge_id=str(uuid.uuid4()), name='net'):
# Set up mocks needed for the parent network() method
bridge = mock.Mock()
bridge.get_id.return_value = bridge_id
bridge.get_name.return_value = name
self.instance.return_value.add_bridge.return_value.name.return_value\
.tenant_id.return_value.create.return_value = bridge
self.instance.return_value.get_bridges.return_value = [bridge]
self.instance.return_value.get_bridge.return_value = bridge
return bridge
def _setup_subnet_mocks(self, subnet_id=str(uuid.uuid4()),
subnet_prefix='10.0.0.0', subnet_len=int(24)):
# Set up mocks needed for the parent subnet() method
bridge = self._setup_bridge_mock()
subnet = mock.Mock()
subnet.get_subnet_prefix.return_value = subnet_prefix
subnet.get_subnet_length.return_value = subnet_len
subnet.get_id.return_value = subnet_prefix + '/' + str(subnet_len)
bridge.add_dhcp_subnet.return_value.default_gateway\
.return_value.subnet_prefix.return_value.subnet_length\
.return_value.create.return_value = subnet
bridge.get_dhcp_subnets.return_value = [subnet]
return (bridge, subnet)
def _setup_port_mocks(self, port_id=str(uuid.uuid4())):
# Set up mocks needed for the parent port() method
bridge, subnet = self._setup_subnet_mocks()
port = mock.Mock()
port.get_id.return_value = port_id
self.instance.return_value.create_port.return_value = port
self.instance.return_value.get_port.return_value = port
bridge.add_exterior_port.return_value.create.return_value = (
port
)
dhcp_host = mock.Mock()
rv1 = subnet.add_dhcp_host.return_value.ip_addr.return_value
rv1.mac_addr.return_value.create.return_value = dhcp_host
subnet.get_dhcp_hosts.return_value = [dhcp_host]
return (bridge, subnet, port, dhcp_host)
class TestMidonetNetworksV2(test_plugin.TestNetworksV2,
MidonetPluginV2TestCase):
def test_create_network(self):
self._setup_bridge_mock()
super(TestMidonetNetworksV2, self).test_create_network()
def test_create_public_network(self):
self._setup_bridge_mock()
super(TestMidonetNetworksV2, self).test_create_public_network()
def test_create_public_network_no_admin_tenant(self):
self._setup_bridge_mock()
super(TestMidonetNetworksV2,
self).test_create_public_network_no_admin_tenant()
def test_update_network(self):
self._setup_bridge_mock()
super(TestMidonetNetworksV2, self).test_update_network()
def test_list_networks(self):
bridge = self._setup_bridge_mock()
with self.network(name='net1') as net1:
req = self.new_list_request('networks')
res = self.deserialize('json', req.get_response(self.api))
self.assertEquals(res['networks'][0]['name'],
net1['network']['name'])
def test_show_network(self):
self._setup_bridge_mock()
super(TestMidonetNetworksV2, self).test_show_network()
def test_update_shared_network_noadmin_returns_403(self):
self._setup_bridge_mock()
super(TestMidonetNetworksV2,
self).test_update_shared_network_noadmin_returns_403()
def test_update_network_set_shared(self):
pass
def test_update_network_with_subnet_set_shared(self):
pass
def test_update_network_set_not_shared_single_tenant(self):
pass
def test_update_network_set_not_shared_other_tenant_returns_409(self):
pass
def test_update_network_set_not_shared_multi_tenants_returns_409(self):
pass
def test_update_network_set_not_shared_multi_tenants2_returns_409(self):
pass
def test_create_networks_bulk_native(self):
pass
def test_create_networks_bulk_native_quotas(self):
pass
def test_create_networks_bulk_tenants_and_quotas(self):
pass
def test_create_networks_bulk_tenants_and_quotas_fail(self):
pass
def test_create_networks_bulk_emulated(self):
pass
def test_create_networks_bulk_wrong_input(self):
pass
def test_create_networks_bulk_emulated_plugin_failure(self):
pass
def test_create_networks_bulk_native_plugin_failure(self):
pass
def test_list_networks_with_parameters(self):
pass
def test_list_networks_with_fields(self):
pass
def test_list_networks_with_parameters_invalid_values(self):
pass
def test_show_network_with_subnet(self):
pass
def test_invalid_admin_status(self):
pass
def test_list_networks_with_pagination_emulated(self):
pass
def test_list_networks_with_pagination_reverse_emulated(self):
pass
def test_list_networks_with_parameters(self):
pass
def test_list_networks_with_parameters_invalid_values(self):
pass
def test_list_networks_with_sort_emulated(self):
pass
def test_list_networks_without_pk_in_fields_pagination_emulated(self):
pass
class TestMidonetSubnetsV2(test_plugin.TestSubnetsV2,
MidonetPluginV2TestCase):
def test_create_subnet(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet()
def test_create_two_subnets(self):
pass
def test_create_two_subnets_same_cidr_returns_400(self):
pass
def test_create_two_subnets_same_cidr_returns_400(self):
pass
def test_create_subnet_bad_V4_cidr(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_bad_V4_cidr()
def test_create_subnet_bad_V6_cidr(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_bad_V4_cidr()
def test_create_2_subnets_overlapping_cidr_allowed_returns_200(self):
pass
def test_create_2_subnets_overlapping_cidr_not_allowed_returns_400(self):
pass
def test_create_subnets_bulk_native(self):
pass
def test_create_subnets_bulk_emulated(self):
pass
def test_create_subnets_bulk_emulated_plugin_failure(self):
pass
def test_create_subnets_bulk_native_plugin_failure(self):
pass
def test_delete_subnet(self):
_bridge, subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_delete_subnet()
subnet.delete.assert_called_once_with()
def test_delete_subnet_port_exists_owned_by_network(self):
_bridge, subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_delete_subnet_port_exists_owned_by_network()
def test_delete_subnet_port_exists_owned_by_other(self):
pass
def test_delete_network(self):
bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_delete_network()
bridge.delete.assert_called_once_with()
def test_create_subnet_bad_tenant(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_bad_tenant()
def test_create_subnet_bad_ip_version(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_bad_ip_version()
def test_create_subnet_bad_ip_version_null(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_create_subnet_bad_ip_version_null()
def test_create_subnet_bad_uuid(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_bad_uuid()
def test_create_subnet_bad_boolean(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_bad_boolean()
def test_create_subnet_bad_pools(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_bad_pools()
def test_create_subnet_bad_nameserver(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_bad_nameserver()
def test_create_subnet_bad_hostroutes(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_bad_hostroutes()
def test_create_subnet_defaults(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_defaults()
def test_create_subnet_gw_values(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_gw_values()
def test_create_force_subnet_gw_values(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_force_subnet_gw_values()
def test_create_subnet_with_allocation_pool(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_create_subnet_with_allocation_pool()
def test_create_subnet_with_none_gateway(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_create_subnet_with_none_gateway()
def test_create_subnet_with_none_gateway_fully_allocated(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_create_subnet_with_none_gateway_fully_allocated()
def test_subnet_with_allocation_range(self):
pass
def test_create_subnet_with_none_gateway_allocation_pool(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_create_subnet_with_none_gateway_allocation_pool()
def test_create_subnet_with_v6_allocation_pool(self):
pass
def test_create_subnet_with_large_allocation_pool(self):
pass
def test_create_subnet_multiple_allocation_pools(self):
pass
def test_create_subnet_with_dhcp_disabled(self):
pass
def test_create_subnet_default_gw_conflict_allocation_pool_returns_409(
self):
pass
def test_create_subnet_gateway_in_allocation_pool_returns_409(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self)\
.test_create_subnet_gateway_in_allocation_pool_returns_409()
def test_create_subnet_overlapping_allocation_pools_returns_409(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self)\
.test_create_subnet_overlapping_allocation_pools_returns_409()
def test_create_subnet_invalid_allocation_pool_returns_400(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_create_subnet_invalid_allocation_pool_returns_400()
def test_create_subnet_out_of_range_allocation_pool_returns_400(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self)\
.test_create_subnet_out_of_range_allocation_pool_returns_400()
def test_create_subnet_shared_returns_400(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_create_subnet_shared_returns_400()
def test_create_subnet_inconsistent_ipv6_cidrv4(self):
pass
def test_create_subnet_inconsistent_ipv4_cidrv6(self):
pass
def test_create_subnet_inconsistent_ipv4_gatewayv6(self):
pass
def test_create_subnet_inconsistent_ipv6_gatewayv4(self):
pass
def test_create_subnet_inconsistent_ipv6_dns_v4(self):
pass
def test_create_subnet_inconsistent_ipv4_hostroute_dst_v6(self):
pass
def test_create_subnet_inconsistent_ipv4_hostroute_np_v6(self):
pass
def test_update_subnet(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_update_subnet()
def test_update_subnet_shared_returns_400(self):
self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_update_subnet_shared_returns_400()
def test_update_subnet_inconsistent_ipv4_gatewayv6(self):
pass
def test_update_subnet_inconsistent_ipv6_gatewayv4(self):
pass
def test_update_subnet_inconsistent_ipv4_dns_v6(self):
pass
def test_update_subnet_inconsistent_ipv6_hostroute_dst_v4(self):
pass
def test_update_subnet_inconsistent_ipv6_hostroute_np_v4(self):
pass
def test_show_subnet(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_show_subnet()
def test_list_subnets(self):
pass
def test_list_subnets_shared(self):
pass
def test_list_subnets_with_parameter(self):
pass
def test_invalid_ip_version(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_invalid_ip_version()
def test_invalid_subnet(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_invalid_subnet()
def test_invalid_ip_address(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_invalid_ip_address()
def test_invalid_uuid(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_invalid_uuid()
def test_create_subnet_with_one_dns(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_with_one_dns()
def test_create_subnet_with_two_dns(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_create_subnet_with_two_dns()
def test_create_subnet_with_too_many_dns(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_create_subnet_with_too_many_dns()
def test_create_subnet_with_one_host_route(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_create_subnet_with_one_host_route()
def test_create_subnet_with_two_host_routes(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_create_subnet_with_two_host_routes()
def test_create_subnet_with_too_many_routes(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_create_subnet_with_too_many_routes()
def test_update_subnet_dns(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_update_subnet_dns()
def test_update_subnet_dns_to_None(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_update_subnet_dns_to_None()
def test_update_subnet_dns_with_too_many_entries(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_update_subnet_dns_with_too_many_entries()
def test_update_subnet_route(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_update_subnet_route()
def test_update_subnet_route_to_None(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_update_subnet_route_to_None()
def test_update_subnet_route_with_too_many_entries(self):
_bridge, _subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_update_subnet_route_with_too_many_entries()
def test_delete_subnet_with_dns(self):
_bridge, subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_delete_subnet_with_dns()
subnet.delete.assert_called_once_with()
def test_delete_subnet_with_route(self):
_bridge, subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2, self).test_delete_subnet_with_route()
subnet.delete.assert_called_once_with()
def test_delete_subnet_with_dns_and_route(self):
_bridge, subnet = self._setup_subnet_mocks()
super(TestMidonetSubnetsV2,
self).test_delete_subnet_with_dns_and_route()
subnet.delete.assert_called_once_with()
def test_update_subnet_gateway_in_allocation_pool_returns_409(self):
self._setup_port_mocks()
super(TestMidonetSubnetsV2, self)\
.test_update_subnet_gateway_in_allocation_pool_returns_409()
def test_list_subnets_with_pagination_emulated(self):
pass
def test_list_subnets_with_pagination_reverse_emulated(self):
pass
def test_list_subnets_with_sort_emulated(self):
pass
class TestMidonetPortsV2(test_plugin.TestPortsV2,
MidonetPluginV2TestCase):
def test_create_port_json(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_create_port_json()
def test_create_port_bad_tenant(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_create_port_bad_tenant()
def test_create_port_public_network(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_create_port_public_network()
def test_create_port_public_network_with_ip(self):
self._setup_port_mocks()
super(TestMidonetPortsV2,
self).test_create_port_public_network_with_ip()
def test_create_ports_bulk_native(self):
pass
def test_create_ports_bulk_emulated(self):
pass
def test_create_ports_bulk_wrong_input(self):
pass
def test_create_ports_bulk_emulated_plugin_failure(self):
pass
def test_create_ports_bulk_native_plugin_failure(self):
pass
def test_list_ports(self):
pass
def test_list_ports_filtered_by_fixed_ip(self):
pass
def test_list_ports_public_network(self):
pass
def test_show_port(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_show_port()
def test_delete_port(self):
_bridge, _subnet, port, _dhcp = self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_delete_port()
port.delete.assert_called_once_with()
def test_delete_port_public_network(self):
_bridge, _subnet, port, _dhcp = self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_delete_port_public_network()
port.delete.assert_called_once_with()
def test_update_port(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_update_port()
def test_update_device_id_null(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_update_device_id_null()
def test_delete_network_if_port_exists(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_delete_network_if_port_exists()
def test_delete_network_port_exists_owned_by_network(self):
self._setup_port_mocks()
super(TestMidonetPortsV2,
self).test_delete_network_port_exists_owned_by_network()
def test_update_port_delete_ip(self):
pass
def test_no_more_port_exception(self):
pass
def test_update_port_update_ip(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_update_port_update_ip()
def test_update_port_update_ips(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_update_port_update_ips()
def test_update_port_add_additional_ip(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_update_port_add_additional_ip()
def test_requested_duplicate_mac(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_requested_duplicate_mac()
def test_mac_generation(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_mac_generation()
def test_mac_generation_4octet(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_mac_generation_4octet()
def test_bad_mac_format(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_bad_mac_format()
def test_mac_exhaustion(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_mac_exhaustion()
def test_requested_duplicate_ip(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_requested_duplicate_ip()
def test_requested_subnet_delete(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_requested_subnet_delete()
def test_requested_subnet_id(self):
pass
def test_requested_subnet_id_not_on_network(self):
pass
def test_overlapping_subnets(self):
pass
def test_requested_subnet_id_v4_and_v6(self):
pass
def test_range_allocation(self):
pass
def test_requested_invalid_fixed_ips(self):
pass
def test_invalid_ip(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_invalid_ip()
def test_requested_split(self):
pass
def test_duplicate_ips(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_duplicate_ips()
def test_fixed_ip_invalid_subnet_id(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_fixed_ip_invalid_subnet_id()
def test_fixed_ip_invalid_ip(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_fixed_ip_invalid_ip()
def test_requested_ips_only(self):
pass
def test_recycling(self):
pass
def test_invalid_admin_state(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_invalid_admin_state()
def test_invalid_mac_address(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_invalid_mac_address()
def test_default_allocation_expiration(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_default_allocation_expiration()
def test_update_fixed_ip_lease_expiration(self):
self._setup_port_mocks()
super(TestMidonetPortsV2,
self).test_update_fixed_ip_lease_expiration()
def test_port_delete_holds_ip(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_port_delete_holds_ip()
def test_update_fixed_ip_lease_expiration_invalid_address(self):
self._setup_port_mocks()
super(TestMidonetPortsV2,
self).test_update_fixed_ip_lease_expiration_invalid_address()
def test_hold_ip_address(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_hold_ip_address()
def test_recycle_held_ip_address(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_recycle_held_ip_address()
def test_recycle_expired_previously_run_within_context(self):
pass
def test_update_port_not_admin(self):
self._setup_port_mocks()
super(TestMidonetPortsV2, self).test_update_port_not_admin()
def test_list_ports_with_pagination_emulated(self):
pass
def test_list_ports_with_pagination_reverse_emulated(self):
pass
def test_list_ports_with_sort_emulated(self):
pass