Updated Infoblox backend

This implementation replaces the existing Infoblox driver,
and is based on the official Infoblox Python client.

The following new configuring options are now available.
- wapi_host, wapi_version, cert and key

The original wapi_url option still works, but can now be
alternatively replaced by wapi_host and wapi_version.

Finally, the deprecated configuration options for designate.conf
was removed, and all Infoblox configuration now happens in the pools
configuration.

Depends-On: https://review.opendev.org/c/openstack/requirements/+/905764
Change-Id: I35e03b9818851487686153bc68ad90c081e61966
This commit is contained in:
Erik Olof Gunnar Andersson 2024-01-15 16:35:57 -08:00
parent 05ebd15122
commit 1dbdcac58f
12 changed files with 678 additions and 928 deletions

View File

@ -0,0 +1,208 @@
# Copyright 2015 Infoblox Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from urllib.parse import urlparse
from oslo_log import log as logging
from oslo_utils import importutils
from designate.backend import base
from designate import exceptions
infoblox_connector = importutils.try_import('infoblox_client.connector')
infoblox_exceptions = importutils.try_import('infoblox_client.exceptions')
infoblox_object_manager = importutils.try_import(
'infoblox_client.object_manager'
)
infoblox_objects = importutils.try_import('infoblox_client.objects')
LOG = logging.getLogger(__name__)
class InfobloxBackend(base.Backend):
"""Provides a Designate Backend for Infoblox"""
__backend_status__ = 'untested'
__plugin_name__ = 'infoblox'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not infoblox_connector:
raise exceptions.Backend(
'The infoblox-client library is not available'
)
wapi_host = self.options.get('wapi_host')
wapi_version = self.options.get('wapi_version')
wapi_url = self.options.get('wapi_url')
self.multi_project = self.options.get('multi_tenant')
self.dns_view = self.options.get('dns_view')
self.network_view = self.options.get('network_view')
self.ns_group = self.options.get('ns_group')
if not wapi_host and wapi_url:
wapi_host, wapi_version = self.parse_wapi_url(wapi_url)
options = {
'host': wapi_host,
'username': self.options.get('username'),
'password': self.options.get('password'),
'http_pool_connections': self.options.get('http_pool_connections'),
'http_pool_maxsize': self.options.get('http_pool_maxsize'),
'wapi_version': wapi_version,
'ssl_verify': self.options.get('sslverify'),
'cert': self.options.get('cert'),
'key': self.options.get('key'),
}
self.connection = infoblox_connector.Connector(options)
self.infoblox = infoblox_object_manager.InfobloxObjectManager(
self.connection
)
for master in self.masters:
if master.port != 53:
raise exceptions.ConfigurationError(
'Infoblox only supports mDNS instances on port 53'
)
def create_zone(self, context, zone):
LOG.info('Create Zone %r', zone)
dns_zone = zone['name'][0:-1]
dns_view = self.dns_view
project_id = context.project_id or zone.tenant_id
if dns_zone.endswith('in-addr.arpa'):
zone_format = 'IPV4'
elif dns_zone.endswith('ip6.arpa'):
zone_format = 'IPV6'
else:
zone_format = 'FORWARD'
try:
if self.is_multi_project:
net_view = self.get_or_create_network_view(project_id)
dns_view = self.get_or_create_dns_view(net_view)
if not dns_view:
raise exceptions.Backend(
'Unable to create zone. No DNS View found.'
)
self.infoblox.create_dns_zone(
dns_zone=dns_zone,
dns_view=dns_view,
zone_format=zone_format,
ns_group=self.ns_group,
)
self.restart_if_needed()
except infoblox_exceptions.InfobloxException as e:
raise exceptions.Backend(e)
def delete_zone(self, context, zone, zone_params=None):
LOG.info('Delete Zone %r', zone)
dns_zone_fqdn = zone['name'][0:-1]
dns_view = self.dns_view
project_id = context.project_id or zone.tenant_id
try:
if self.is_multi_project:
net_view = self.get_network_view(project_id)
dns_view = self.get_or_create_dns_view(
net_view, create_if_missing=False
)
if not dns_view:
raise exceptions.Backend(
'Unable to delete zone. No DNS View found.'
)
self.infoblox.delete_dns_zone(dns_view, dns_zone_fqdn)
self.restart_if_needed()
except infoblox_exceptions.InfobloxException as e:
raise exceptions.Backend(e)
@staticmethod
def parse_wapi_url(wapi_url):
url = urlparse(wapi_url)
host = url.netloc
wapi_version = None
for path in url.path.split('/'):
if path.startswith('v'):
wapi_version = path.strip('v')
break
return host, wapi_version
def get_network_view(self, project_id):
network_views = self.connection.get_object(
'networkview',
return_fields=['name'],
extattrs={'TenantID': {'value': project_id}}
)
network_view = None
if network_views:
network_view = network_views[0]['name']
return network_view
def get_or_create_network_view(self, project_id):
network_view = self.get_network_view(project_id)
if not network_view:
network_view = self.infoblox.create_network_view(
f'{self.network_view}.{project_id}',
extattrs={'TenantID': {'value': project_id}}
).name
return network_view
def get_or_create_dns_view(self, net_view, create_if_missing=True):
if not net_view:
return None
dns_view_name = f'{self.dns_view}.{net_view}'
dns_view = infoblox_objects.DNSView.search(
self.connection, name=dns_view_name, return_fields=['name'],
)
if not dns_view and create_if_missing:
dns_view = self.infoblox.create_dns_view(
self.network_view, dns_view_name
)
if not dns_view:
return None
return dns_view.name
@property
def is_multi_project(self):
if not self.multi_project or self.multi_project == '0':
return False
return True
def restart_if_needed(self):
try:
grid = infoblox_objects.Grid(self.connection)
grid.fetch(only_ref=True)
self.connection.call_func(
'restartservices', grid._ref,
{
'restart_option': 'RESTART_IF_NEEDED',
'mode': 'GROUPED',
'services': ['DNS'],
}
)
except infoblox_exceptions.InfobloxException:
LOG.warning('Unable to restart the infoblox dns service.')

View File

@ -1,56 +0,0 @@
# Copyright 2015 Infoblox Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from designate.backend import base
from designate.backend.impl_infoblox import connector
from designate.backend.impl_infoblox import object_manipulator
from designate import exceptions
LOG = logging.getLogger(__name__)
class InfobloxBackend(base.Backend):
"""Provides a Designate Backend for Infoblox"""
__backend_status__ = 'untested'
__plugin_name__ = 'infoblox'
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.infoblox = object_manipulator.InfobloxObjectManipulator(
connector.Infoblox(self.options))
for master in self.masters:
if master.port != 53:
raise exceptions.ConfigurationError(
"Infoblox only supports mDNS instances on port 53")
def create_zone(self, context, zone):
LOG.info('Create Zone %r', zone)
project_id = context.project_id or zone.tenant_id
dns_net_view = self.infoblox.get_dns_view(project_id)
self.infoblox.create_zone_auth(
fqdn=zone['name'][0:-1],
dns_view=dns_net_view
)
def delete_zone(self, context, zone):
LOG.info('Delete Zone %r', zone)
self.infoblox.delete_zone_auth(zone['name'][0:-1])

View File

@ -1,271 +0,0 @@
# Copyright 2015 Infoblox Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from urllib import parse
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_utils import strutils
import requests
from designate.backend.impl_infoblox import ibexceptions as exc
import designate.conf
CFG_GROUP_NAME = 'backend:infoblox'
CONF = designate.conf.CONF
LOG = log.getLogger(__name__)
class Infoblox:
"""Infoblox class
Defines methods for getting, creating, updating and
removing objects from an Infoblox server instance.
"""
def __init__(self, options):
"""Initialize a new Infoblox object instance
Args:
options (dict): Target options dictionary
"""
config = CONF[CFG_GROUP_NAME]
reqd_opts = ['wapi_url', 'username', 'password', 'ns_group']
other_opts = ['sslverify', 'network_view', 'dns_view', 'multi_tenant']
for opt in reqd_opts + other_opts:
if opt == 'sslverify' or opt == 'multi_tenant':
# NOTE(selvakumar): This check is for sslverify option.
# type of sslverify is unicode string from designate DB
# if the value is 0 getattr called for setting default values.
# to avoid setting default values we use oslo strutils
if not strutils.is_int_like(options.get(opt)):
option_value = options.get(opt)
else:
option_value = strutils.bool_from_string(options.get(opt),
default=True)
setattr(self, opt, option_value)
continue
setattr(self, opt, options.get(opt) or getattr(config, opt))
for opt in reqd_opts:
LOG.debug("self.%s = %s", opt, getattr(self, opt))
if not getattr(self, opt):
raise exc.InfobloxIsMisconfigured(option=opt)
self.session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=config.http_pool_connections,
pool_maxsize=config.http_pool_maxsize)
self.session.mount('http://', adapter)
self.session.mount('https://', adapter)
self.session.auth = (self.username, self.password)
self.session.verify = self.sslverify
def _construct_url(self, relative_path, query_params=None, extattrs=None):
if query_params is None:
query_params = {}
if extattrs is None:
extattrs = {}
if not relative_path or relative_path[0] == '/':
raise ValueError('Path in request must be relative.')
query = ''
if query_params or extattrs:
query = '?'
if extattrs:
attrs_queries = []
for key, value in extattrs.items():
LOG.debug("key: %s, value: %s", key, value)
attrs_queries.append('*' + key + '=' + value['value'])
query += '&'.join(attrs_queries)
if query_params:
if len(query) > 1:
query += '&'
query += parse.urlencode(query_params)
baseurl = parse.urljoin(self.wapi_url, parse.quote(relative_path))
return baseurl + query
def _validate_objtype_or_die(self, objtype):
if not objtype:
raise ValueError('WAPI object type can\'t be empty.')
if '/' in objtype:
raise ValueError('WAPI object type can\'t contains slash.')
def get_object(self, objtype, payload=None, return_fields=None,
extattrs=None):
"""Retrieve a list of Infoblox objects of type 'objtype'
Args:
objtype (str): Infoblox object type, e.g. 'view', 'tsig', etc.
payload (dict): Payload with data to send
Returns:
A list of the Infoblox objects requested
Raises:
InfobloxObjectNotFound
"""
if return_fields is None:
return_fields = []
if extattrs is None:
extattrs = {}
self._validate_objtype_or_die(objtype)
query_params = dict()
if return_fields:
query_params['_return_fields'] = ','.join(return_fields)
headers = {'Content-type': 'application/json'}
# NOTE (scottsol): This can trigger an internal error in Infoblox if
# jsonutils sets it to 'null' (a string with quotes). Setting to None
# works around this and returns a valid response from Infoblox
data = jsonutils.dump_as_bytes(payload) if payload else None
url = self._construct_url(objtype, query_params, extattrs)
r = self.session.get(url,
data=data,
verify=self.sslverify,
headers=headers)
if r.status_code != requests.codes.ok:
raise exc.InfobloxSearchError(
response=jsonutils.loads(r.content),
objtype=objtype,
content=r.content,
code=r.status_code)
return jsonutils.loads(r.content)
def create_object(self, objtype, payload, return_fields=None):
"""Create an Infoblox object of type 'objtype'
Args:
objtype (str): Infoblox object type, e.g. 'network', 'range', etc.
payload (dict): Payload with data to send
Returns:
The object reference of the newly create object
Raises:
InfobloxException
"""
if not return_fields:
return_fields = []
self._validate_objtype_or_die(objtype)
query_params = dict()
if return_fields:
query_params['_return_fields'] = ','.join(return_fields)
url = self._construct_url(objtype, query_params)
headers = {'Content-type': 'application/json'}
r = self.session.post(url,
data=jsonutils.dump_as_bytes(payload),
verify=self.sslverify,
headers=headers)
if r.status_code != requests.codes.CREATED:
raise exc.InfobloxCannotCreateObject(
response=jsonutils.loads(r.content),
objtype=objtype,
content=r.content,
args=payload,
code=r.status_code)
return jsonutils.loads(r.content)
def call_func(self, func_name, ref, payload, return_fields=None):
if not return_fields:
return_fields = []
query_params = dict()
query_params['_function'] = func_name
if return_fields:
query_params['_return_fields'] = ','.join(return_fields)
url = self._construct_url(ref, query_params)
headers = {'Content-type': 'application/json'}
r = self.session.post(url,
data=jsonutils.dump_as_bytes(payload),
verify=self.sslverify,
headers=headers)
if r.status_code not in (requests.codes.CREATED,
requests.codes.ok):
raise exc.InfobloxFuncException(
response=jsonutils.loads(r.content),
ref=ref,
func_name=func_name,
content=r.content,
code=r.status_code)
return jsonutils.loads(r.content)
def update_object(self, ref, payload):
"""Update an Infoblox object
Args:
ref (str): Infoblox object reference
payload (dict): Payload with data to send
Returns:
The object reference of the updated object
Raises:
InfobloxException
"""
headers = {'Content-type': 'application/json'}
r = self.session.put(self._construct_url(ref),
data=jsonutils.dump_as_bytes(payload),
verify=self.sslverify,
headers=headers)
if r.status_code != requests.codes.ok:
raise exc.InfobloxCannotUpdateObject(
response=jsonutils.loads(r.content),
ref=ref,
content=r.content,
code=r.status_code)
return jsonutils.loads(r.content)
def delete_object(self, ref):
"""Remove an Infoblox object
Args:
ref (str): Object reference
Returns:
The object reference of the removed object
Raises:
InfobloxException
"""
r = self.session.delete(self._construct_url(ref),
verify=self.sslverify)
if r.status_code != requests.codes.ok:
raise exc.InfobloxCannotDeleteObject(
response=jsonutils.loads(r.content),
ref=ref,
content=r.content,
code=r.status_code)
return jsonutils.loads(r.content)

View File

@ -1,96 +0,0 @@
# Copyright 2015 Infoblox Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from designate import exceptions
from designate.i18n import _
class InfobloxExceptionBase(exceptions.Backend):
"""Base IB Exception.
To correctly use this class, inherit from it and define
a 'message' property. That message will get printf'd
with the keyword arguments provided to the constructor.
"""
message = _("An unknown exception occurred.")
def __init__(self, **kwargs):
try:
super().__init__(self.message % kwargs)
self.msg = self.message % kwargs
except Exception:
if self.use_fatal_exceptions():
raise
else:
# at least get the core message out if something happened
super().__init__(self.message)
def __unicode__(self):
return str(self.msg)
def use_fatal_exceptions(self):
return False
class ServiceUnavailable(InfobloxExceptionBase):
message = _("The service is unavailable")
class ResourceExhausted(ServiceUnavailable):
pass
class InfobloxException(InfobloxExceptionBase):
"""Generic Infoblox Exception."""
def __init__(self, response, **kwargs):
self.response = response
super().__init__(**kwargs)
class InfobloxIsMisconfigured(InfobloxExceptionBase):
message = _(
"Infoblox backend is misconfigured: '%(option)s' must be defined.")
class InfobloxSearchError(InfobloxException):
message = _("Cannot search '%(objtype)s' object(s): "
"%(content)s [code %(code)s]")
class InfobloxCannotCreateObject(InfobloxException):
message = _("Cannot create '%(objtype)s' object(s): "
"%(content)s [code %(code)s]")
class InfobloxCannotDeleteObject(InfobloxException):
message = _("Cannot delete object with ref %(ref)s: "
"%(content)s [code %(code)s]")
class InfobloxCannotUpdateObject(InfobloxException):
message = _("Cannot update object with ref %(ref)s: "
"%(content)s [code %(code)s]")
class InfobloxFuncException(InfobloxException):
message = _("Error occurred during function's '%(func_name)s' call: "
"ref %(ref)s: %(content)s [code %(code)s]")
class NoInfobloxMemberAvailable(ResourceExhausted):
message = _("No Infoblox Member is available.")
class InfobloxObjectParsingError(InfobloxExceptionBase):
message = _("Infoblox object cannot be parsed from dict: %(data)s")

View File

@ -1,226 +0,0 @@
# Copyright 2015 Infoblox Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from designate.backend.impl_infoblox import ibexceptions as exc
from designate.i18n import _
LOG = log.getLogger(__name__)
class InfobloxObjectManipulator:
FIELDS = ['ttl', 'use_ttl']
def __init__(self, connector):
self.connector = connector
def get_member(self, member_name):
obj = {'host_name': member_name[0:-1]}
return self.connector.get_object('member', obj)
def create_dns_view(self, net_view_name, dns_view_name):
dns_view_data = {'name': dns_view_name,
'network_view': net_view_name}
return self._create_infoblox_object('view', dns_view_data)
def delete_dns_view(self, net_view_name):
net_view_data = {'name': net_view_name}
self._delete_infoblox_object('view', net_view_data)
def create_network_view(self, net_view_name, tenant_id):
net_view_data = {'name': net_view_name}
extattrs = {'extattrs': {'TenantID': {'value': tenant_id}}}
return self._create_infoblox_object('networkview',
net_view_data, extattrs)
def delete_network_view(self, net_view_name):
if net_view_name == 'default':
# never delete default network view
return
net_view_data = {'name': net_view_name}
self._delete_infoblox_object('networkview', net_view_data)
def create_tsig(self, name, algorithm, secret):
tsig = {
'name': name,
'key': secret
}
self._create_infoblox_object(
'tsig', tsig,
check_if_exists=True)
def delete_tsig(self, name, algorithm, secret):
tsig = {
'name': name,
'key': secret
}
self._delete_infoblox_object(
'tsig', tsig,
check_if_exists=True)
def create_multi_tenant_dns_view(self, net_view, tenant):
if not net_view:
net_view = f"{self.connector.network_view}.{tenant}"
dns_view = f"{self.connector.dns_view}.{net_view}"
try:
self.create_network_view(
net_view_name=net_view,
tenant_id=tenant)
self.create_dns_view(
net_view_name=net_view,
dns_view_name=dns_view)
except exc.InfobloxException as e:
LOG.warning("Issue happens during views creating: %s", e)
LOG.debug("net_view: %s, dns_view: %s", net_view, dns_view)
return dns_view
def get_dns_view(self, tenant):
if (not self.connector.multi_tenant or
self.connector.multi_tenant == '0'):
return self.connector.dns_view
else:
# Look for the network view with the specified TenantID EA
net_view = self._get_infoblox_object_or_none(
'networkview',
return_fields=['name'],
extattrs={'TenantID': {'value': tenant}})
if net_view:
net_view = net_view['name']
return self.create_multi_tenant_dns_view(net_view, tenant)
def create_zone_auth(self, fqdn, dns_view):
try:
if fqdn.endswith("in-addr.arpa"):
zone_format = 'IPV4'
elif fqdn.endswith("ip6.arpa"):
zone_format = 'IPV6'
else:
zone_format = 'FORWARD'
self._create_infoblox_object(
'zone_auth',
{'fqdn': fqdn, 'view': dns_view},
{'ns_group': self.connector.ns_group,
'zone_format': zone_format},
check_if_exists=True)
self._restart_if_needed()
except exc.InfobloxCannotCreateObject as e:
LOG.warning(e)
def delete_zone_auth(self, fqdn):
self._delete_infoblox_object(
'zone_auth', {'fqdn': fqdn})
self._restart_if_needed()
def _create_infoblox_object(self, obj_type, payload,
additional_create_kwargs=None,
check_if_exists=True,
return_fields=None):
if additional_create_kwargs is None:
additional_create_kwargs = {}
ib_object = None
if check_if_exists:
ib_object = self._get_infoblox_object_or_none(obj_type, payload)
if ib_object:
LOG.info(
"Infoblox %(obj_type)s already exists: %(ib_object)s",
{'obj_type': obj_type, 'ib_object': ib_object})
if not ib_object:
payload.update(additional_create_kwargs)
ib_object = self.connector.create_object(obj_type, payload,
return_fields)
LOG.info("Infoblox %(obj_type)s was created: %(ib_object)s",
{'obj_type': obj_type, 'ib_object': ib_object})
return ib_object
def _get_infoblox_object_or_none(self, obj_type, payload=None,
return_fields=None, extattrs=None):
ib_object = self.connector.get_object(obj_type, payload, return_fields,
extattrs=extattrs)
if ib_object:
if return_fields:
return ib_object[0]
else:
return ib_object[0]['_ref']
return None
def _update_infoblox_object(self, obj_type, payload, update_kwargs):
ib_object_ref = None
warn_msg = _('Infoblox %(obj_type)s will not be updated because'
' it cannot be found: %(payload)s')
try:
ib_object_ref = self._get_infoblox_object_or_none(obj_type,
payload)
if not ib_object_ref:
LOG.warning(warn_msg % {'obj_type': obj_type,
'payload': payload})
except exc.InfobloxSearchError as e:
LOG.warning(warn_msg, {'obj_type': obj_type, 'payload': payload})
LOG.info(e)
if ib_object_ref:
self._update_infoblox_object_by_ref(ib_object_ref, update_kwargs)
def _update_infoblox_object_by_ref(self, ref, update_kwargs):
self.connector.update_object(ref, update_kwargs)
LOG.info('Infoblox object was updated: %s', ref)
def _delete_infoblox_object(self, obj_type, payload):
ib_object_ref = None
warn_msg = _('Infoblox %(obj_type)s will not be deleted because'
' it cannot be found: %(payload)s')
try:
ib_object_ref = self._get_infoblox_object_or_none(obj_type,
payload)
if not ib_object_ref:
LOG.warning(warn_msg, {'obj_type': obj_type,
'payload': payload})
except exc.InfobloxSearchError as e:
LOG.warning(warn_msg, {'obj_type': obj_type, 'payload': payload})
LOG.info(e)
if ib_object_ref:
self.connector.delete_object(ib_object_ref)
LOG.info('Infoblox object was deleted: %s', ib_object_ref)
def _restart_if_needed(self):
ib_object_ref = None
obj_type = 'grid'
warn_msg = ('Infoblox %(obj_type)s will not be restarted because'
' the API object reference cannot be found')
try:
ib_object_ref = self._get_infoblox_object_or_none(obj_type)
if not ib_object_ref:
LOG.warning(warn_msg, {'obj_type': obj_type})
except exc.InfobloxSearchError as e:
LOG.warning(warn_msg, {'obj_type': obj_type})
LOG.info(e)
if ib_object_ref:
payload = {
"restart_option": "RESTART_IF_NEEDED",
"mode": "GROUPED",
"services": ["DNS"],
}
self.connector.call_func(
'restartservices', ib_object_ref, payload)

View File

@ -19,7 +19,6 @@ from designate.conf import central
from designate.conf import coordination
from designate.conf import dynect
from designate.conf import heartbeat_emitter
from designate.conf import infoblox
from designate.conf import keystone
from designate.conf import mdns
from designate.conf import network_api
@ -37,7 +36,6 @@ central.register_opts(CONF)
coordination.register_opts(CONF)
dynect.register_opts(CONF)
heartbeat_emitter.register_opts(CONF)
infoblox.register_opts(CONF)
keystone.register_opts(CONF)
mdns.register_opts(CONF)
network_api.register_opts(CONF)

View File

@ -1,101 +0,0 @@
# Copyright 2015 Infoblox Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
INFOBLOX_GROUP = cfg.OptGroup(
name='backend:infoblox',
title="Configuration for Infoblox Backend"
)
INFOBLOX_OPTS = [
cfg.StrOpt(
'wapi_url',
deprecated_for_removal=True,
deprecated_reason="All backend options have been migrated to options "
"in the pools.yaml file",
help='DEPRECATED: wapi_url'),
cfg.StrOpt(
'username',
deprecated_for_removal=True,
deprecated_reason="All backend options have been migrated to options "
"in the pools.yaml file",
help='DEPRECATED: username'),
cfg.StrOpt(
'password',
deprecated_for_removal=True,
secret=True,
deprecated_reason="All backend options have been migrated to options "
"in the pools.yaml file",
help='DEPRECATED: password'),
cfg.BoolOpt(
'sslverify',
default=True,
deprecated_for_removal=True,
deprecated_reason="All backend options have been migrated to options "
"in the pools.yaml file",
help='DEPRECATED: sslverify'),
cfg.BoolOpt(
'multi_tenant',
default=False,
deprecated_for_removal=True,
deprecated_reason="All backend options have been migrated to options "
"in the pools.yaml file",
help='DEPRECATED: multi_tenant'),
cfg.IntOpt(
'http_pool_connections',
default=100,
deprecated_for_removal=True,
deprecated_reason="All backend options have been migrated to options "
"in the pools.yaml file",
help='DEPRECATED: http_pool_connections'),
cfg.IntOpt(
'http_pool_maxsize',
default=100,
deprecated_for_removal=True,
deprecated_reason="All backend options have been migrated to options "
"in the pools.yaml file",
help='DEPRECATED: http_pool_maxsize'),
cfg.StrOpt(
'dns_view',
default='default',
deprecated_for_removal=True,
deprecated_reason="All backend options have been migrated to options "
"in the pools.yaml file",
help='DEPRECATED: dns_view'),
cfg.StrOpt(
'network_view',
default='default',
deprecated_for_removal=True,
deprecated_reason="All backend options have been migrated to options "
"in the pools.yaml file",
help='DEPRECATED: network_view'),
cfg.StrOpt(
'ns_group',
deprecated_for_removal=True,
deprecated_reason="All backend options have been migrated to options "
"in the pools.yaml file",
help='DEPRECATED: ns_group'),
]
def register_opts(conf):
conf.register_group(INFOBLOX_GROUP)
conf.register_opts(INFOBLOX_OPTS, group=INFOBLOX_GROUP)
def list_opts():
return {
INFOBLOX_GROUP: INFOBLOX_OPTS
}

View File

@ -13,95 +13,31 @@
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from infoblox_client import connector as infoblox_connector
from infoblox_client import exceptions as infoblox_exceptions
from infoblox_client import objects as infoblox_objects
import oslotest.base
import requests_mock
from designate.backend import impl_infoblox
from designate.backend.impl_infoblox import connector
from designate.backend.impl_infoblox import ibexceptions
from designate import context
from designate import exceptions
from designate import objects
class InfobloxConnectorTestCase(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.options = {
'wapi_url': 'https://203.0.113.1/wapi/v2.0/',
'username': 'username',
'password': 'password',
'ns_group': 'ns_group',
'sslverify': '1'
}
self.infoblox = connector.Infoblox(self.options)
def test_infoblox_constructor(self):
options = {
'wapi_url': 'https://203.0.113.1/wapi/v2.0/',
'username': 'username',
'password': 'password',
'ns_group': 'ns_group',
'sslverify': '0'
}
infoblox = connector.Infoblox(options)
self.assertIsInstance(infoblox, connector.Infoblox)
self.assertFalse(infoblox.sslverify)
def test_construct_url(self):
self.assertEqual(
'https://203.0.113.1/wapi/v2.0/test',
self.infoblox._construct_url('test')
)
self.assertEqual(
'https://203.0.113.1/wapi/v2.0/test?*foo=bar&foo=0&bar=1',
self.infoblox._construct_url(
'test', {'foo': 0, 'bar': 1}, {'foo': {'value': 'bar'}}
)
)
self.assertEqual(
'https://203.0.113.1/wapi/v2.0/test?*foo=bar&foo=0',
self.infoblox._construct_url(
'test', {'foo': 0}, {'foo': {'value': 'bar'}}
)
)
self.assertEqual(
'https://203.0.113.1/wapi/v2.0/test?foo=0',
self.infoblox._construct_url(
'test', {'foo': 0}
)
)
def test_construct_url_no_relative_path(self):
self.assertRaisesRegex(
ValueError,
'Path in request must be relative.',
self.infoblox._construct_url, None
)
def test_validate_objtype_or_die(self):
self.assertRaisesRegex(
ValueError,
'WAPI object type can\'t be empty.',
self.infoblox._validate_objtype_or_die, None
)
self.assertRaisesRegex(
ValueError,
'WAPI object type can\'t contains slash.',
self.infoblox._validate_objtype_or_die, '/'
)
from designate.tests import base_fixtures
class InfobloxBackendTestCase(oslotest.base.BaseTestCase):
def setUp(self):
super().setUp()
self.base_address = 'https://203.0.113.1/wapi'
self.stdlog = base_fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.context = mock.Mock()
self.project_id = 'f532f66e-0fea-4698-895c-bb7caef815ef'
self.admin_context = mock.Mock()
self.admin_context.project_id = self.project_id
mock.patch.object(
context.DesignateContext, 'get_admin_context',
return_value=self.admin_context).start()
@ -111,131 +47,463 @@ class InfobloxBackendTestCase(oslotest.base.BaseTestCase):
name='example.com.',
email='example@example.com',
)
self.base_address = 'https://192.0.2.1/wapi/v2.10/'
self.dns_view = 'my_dns_view'
self.network_view = 'my_net_view'
self.ns_group = 'my_ns_group'
self.target = {
'id': '4588652b-50e7-46b9-b688-a9bad40a873e',
'type': 'infoblox',
'type': 'designate',
'masters': [
{'host': '1.1.1.1', 'port': 53},
{'host': '192.0.2.1', 'port': 53},
],
'options': [
{'key': 'wapi_url', 'value': 'https://203.0.113.1/wapi/v2.0/'},
{'key': 'username', 'value': 'test'},
{'key': 'password', 'value': 'test'},
{'key': 'ns_group', 'value': 'test'},
]
{'key': 'wapi_url', 'value': self.base_address},
{'key': 'username', 'value': 'user'},
{'key': 'password', 'value': 'secret'},
{'key': 'dns_view', 'value': self.dns_view},
{'key': 'network_view', 'value': self.network_view},
{'key': 'ns_group', 'value': self.ns_group},
],
}
class BasicInfobloxBackendTestCase(InfobloxBackendTestCase):
def setUp(self):
super().setUp()
self.target['options'].append(
{'key': 'multi_tenant', 'value': '0'},
)
self.backend = impl_infoblox.InfobloxBackend(
objects.PoolTarget.from_dict(self.target)
)
self.backend.connection = mock.Mock()
self.backend.infoblox = mock.Mock()
@mock.patch.object(impl_infoblox, 'infoblox_connector', None)
def test_no_library_installed(self):
pool_target = objects.PoolTarget.from_dict(self.target)
self.assertRaisesRegex(
exceptions.Backend,
'The infoblox-client library is not available',
impl_infoblox.InfobloxBackend, pool_target
)
def test_get_options(self):
self.assertEqual('my_dns_view', self.backend.dns_view)
self.assertEqual('my_net_view', self.backend.network_view)
self.assertEqual('my_ns_group', self.backend.ns_group)
self.assertEqual('0', self.backend.multi_project)
@mock.patch.object(infoblox_connector, 'Connector', mock.Mock())
def test_backend_with_invalid_master_port(self):
self.target['masters'] = [
{'host': '192.0.2.1', 'port': 5354},
]
pool_target = objects.PoolTarget.from_dict(self.target)
self.assertRaisesRegex(
exceptions.ConfigurationError,
'Infoblox only supports mDNS instances on port 53',
impl_infoblox.InfobloxBackend, pool_target
)
@mock.patch.object(infoblox_connector, 'Connector')
def test_backend_with_host(self, mock_infoblox_connector):
self.target['options'] = [
{'key': 'wapi_host', 'value': '192.0.2.100'},
{'key': 'wapi_version', 'value': '1'},
{'key': 'username', 'value': 'user'},
{'key': 'password', 'value': 'secret'},
]
impl_infoblox.InfobloxBackend(
objects.PoolTarget.from_dict(self.target)
)
mock_infoblox_connector.assert_called_with(
{
'host': '192.0.2.100',
'username': 'user',
'password': 'secret',
'http_pool_connections': None,
'http_pool_maxsize': None,
'wapi_version': '1',
'ssl_verify': None,
'cert': None,
'key': None
}
)
@mock.patch.object(infoblox_connector, 'Connector')
def test_backend_with_wapi_url(self, mock_infoblox_connector):
impl_infoblox.InfobloxBackend(
objects.PoolTarget.from_dict(self.target)
)
mock_infoblox_connector.assert_called_with(
{
'host': '192.0.2.1',
'username': 'user',
'password': 'secret',
'http_pool_connections': None,
'http_pool_maxsize': None,
'wapi_version': '2.10',
'ssl_verify': None,
'cert': None,
'key': None
}
)
def test_is_multi_project(self):
self.backend.multi_project = True
self.assertTrue(self.backend.is_multi_project)
self.backend.multi_project = 1
self.assertTrue(self.backend.is_multi_project)
self.backend.multi_project = '1'
self.assertTrue(self.backend.is_multi_project)
self.backend.multi_project = False
self.assertFalse(self.backend.is_multi_project)
self.backend.multi_project = 0
self.assertFalse(self.backend.is_multi_project)
self.backend.multi_project = '0'
self.assertFalse(self.backend.is_multi_project)
def test_parse_wapi_url(self):
self.assertEqual(
('192.0.2.1', None),
self.backend.parse_wapi_url('https://192.0.2.1/')
)
self.assertEqual(
('192.0.2.2', '1'),
self.backend.parse_wapi_url('https://192.0.2.2/wapi/v1/')
)
self.assertEqual(
('192.0.2.3', '2.10'),
self.backend.parse_wapi_url('https://192.0.2.3/wapi/v2.10/')
)
self.assertEqual(
('192.0.2.3:443', '2.10'),
self.backend.parse_wapi_url('https://192.0.2.3:443/wapi/v2.10/')
)
def test_get_network_view(self):
self.backend.connection.get_object.return_value = [{'name': 'fake'}]
self.assertEqual('fake', self.backend.get_network_view('project_id'))
def test_get_network_view_no_result(self):
self.backend.connection.get_object.return_value = []
self.assertIsNone(self.backend.get_network_view('project_id'))
def test_get_or_create_network_view(self):
mock_network_view = mock.Mock()
mock_network_view.name = 'fake'
self.backend.connection.get_object.return_value = []
self.backend.infoblox.create_network_view.return_value = (
mock_network_view
)
self.assertEqual(
'fake', self.backend.get_or_create_network_view('project_id')
)
def test_get_or_create_network_view_not_found(self):
self.backend.connection.get_object.return_value = [{'name': 'fake'}]
self.assertEqual(
'fake', self.backend.get_or_create_network_view('project_id')
)
def test_get_or_create_network_view_already_found(self):
self.backend.connection.get_object.return_value = [{'name': 'fake'}]
self.assertEqual(
'fake', self.backend.get_or_create_network_view('project_id')
)
@mock.patch.object(infoblox_objects.DNSView, 'search')
def test_get_or_create_dns_view(self, mock_search):
mock_dns_view = mock.Mock()
mock_dns_view.name = 'fake'
mock_search.return_value = None
self.backend.infoblox.create_dns_view.return_value = mock_dns_view
self.assertEqual(
'fake', self.backend.get_or_create_dns_view('net_view')
)
def test_get_or_create_dns_view_no_network_provided(self):
self.assertFalse(self.backend.get_or_create_dns_view(None))
@mock.patch.object(infoblox_objects.DNSView, 'search')
def test_get_or_create_dns_view_not_found(self, mock_search):
mock_search.return_value = None
self.assertFalse(
self.backend.get_or_create_dns_view(
'net_view', create_if_missing=False
)
)
@mock.patch.object(infoblox_objects.DNSView, 'search')
def test_get_or_create_dns_view_already_found(self, mock_search):
mock_dns_view = mock.Mock()
mock_dns_view.name = 'fake'
mock_search.return_value = mock_dns_view
self.assertEqual(
'fake', self.backend.get_or_create_dns_view('net_view')
)
@mock.patch.object(infoblox_objects, 'Grid', mock.Mock())
def test_restart_if_needed_unable_to_restart(self):
self.backend.connection.call_func.side_effect = (
infoblox_exceptions.InfobloxException('')
)
self.backend.restart_if_needed()
self.assertIn(
'Unable to restart the infoblox dns service.',
self.stdlog.logger.output
)
def test_create_zone(self):
self.backend.restart_if_needed = mock.Mock()
self.backend.create_zone(self.admin_context, self.zone)
self.backend.infoblox.create_dns_zone.assert_called_with(
dns_zone='example.com',
dns_view='my_dns_view',
zone_format='FORWARD',
ns_group='my_ns_group'
)
self.backend.restart_if_needed.assert_called()
def test_create_zone_handle_error(self):
self.backend.infoblox.create_dns_zone.side_effect = (
infoblox_exceptions.InfobloxTimeoutError('error')
)
self.assertRaisesRegex(
exceptions.Backend,
'Connection to NIOS timed out',
self.backend.create_zone, self.admin_context, self.zone
)
def test_create_zone_ptr(self):
zone = objects.Zone(
id='e2bed4dc-9d01-11e4-89d3-123b93f75cba',
name='example.in-addr.arpa.',
email='example@example.com',
)
self.backend.restart_if_needed = mock.Mock()
self.backend.create_zone(self.admin_context, zone)
self.backend.infoblox.create_dns_zone.assert_called_with(
dns_zone='example.in-addr.arpa',
dns_view='my_dns_view',
zone_format='IPV4',
ns_group='my_ns_group'
)
self.backend.restart_if_needed.assert_called()
def test_create_zone_ipv6_ptr(self):
zone = objects.Zone(
id='e2bed4dc-9d01-11e4-89d3-123b93f75cba',
name='example.ip6.arpa.',
email='example@example.com',
)
self.backend.restart_if_needed = mock.Mock()
self.backend.create_zone(self.admin_context, zone)
self.backend.infoblox.create_dns_zone.assert_called_with(
dns_zone='example.ip6.arpa',
dns_view='my_dns_view',
zone_format='IPV6',
ns_group='my_ns_group'
)
self.backend.restart_if_needed.assert_called()
def test_create_zone_no_dns_view(self):
self.backend.dns_view = None
self.assertRaisesRegex(
exceptions.Backend,
'Unable to create zone. No DNS View found',
self.backend.create_zone, self.admin_context, self.zone
)
def test_delete_zone(self):
self.backend.restart_if_needed = mock.Mock()
self.backend.delete_zone(self.admin_context, self.zone)
self.backend.infoblox.delete_dns_zone.assert_called_with(
'my_dns_view',
'example.com'
)
self.backend.restart_if_needed.assert_called()
def test_delete_zone_handle_error(self):
self.backend.infoblox.delete_dns_zone.side_effect = (
infoblox_exceptions.InfobloxTimeoutError('error')
)
self.assertRaisesRegex(
exceptions.Backend,
'Connection to NIOS timed out',
self.backend.delete_zone, self.admin_context, self.zone
)
def test_delete_zone_no_dns_view(self):
self.backend.dns_view = None
self.assertRaisesRegex(
exceptions.Backend,
'Unable to delete zone. No DNS View found',
self.backend.delete_zone, self.admin_context, self.zone
)
class AdvancedInfobloxBackendTestCase(InfobloxBackendTestCase):
def setUp(self):
super().setUp()
self.target['options'].append(
{'key': 'multi_tenant', 'value': '1'},
)
self.backend = impl_infoblox.InfobloxBackend(
objects.PoolTarget.from_dict(self.target)
)
@requests_mock.mock()
def test_create_zone(self, req_mock):
req_mock.post(
'%s/v2.0/zone_auth' % self.base_address,
json={},
)
zone_name = self.zone['name'][0:-1]
network_view = f'{self.network_view}.{self.project_id}'
view_name = f'{self.dns_view}.{self.network_view}.{self.project_id}'
req_mock.get(
'%s/v2.0/zone_auth' % self.base_address,
json={},
f'{self.base_address}networkview?*TenantID={self.project_id}'
'&_return_fields=name',
json=[{
'_ref': f'networkview/mock:{network_view}/false',
'name': f'{network_view}'
}]
)
req_mock.get(
f'{self.base_address}view?name={view_name}&_return_fields=name',
json=[{
'_ref': f'view/mock:{view_name}/false',
'name': f'{view_name}'
}]
)
req_mock.get(
f'{self.base_address}zone_auth?fqdn={zone_name}&view={view_name}',
json=[{
'_ref': f'zone_auth/mock:{zone_name}/{view_name}',
'extattrs': {},
'fqdn': zone_name,
'grid_primary': [],
'grid_secondaries': [],
'ns_group': self.ns_group,
'view': view_name
}]
)
req_mock.get(
f'{self.base_address}grid',
json=[{'_ref': 'grid/mock:Infoblox'}]
)
req_mock.post(
f'{self.base_address}grid/mock%3AInfoblox?'
'_function=restartservices',
json=[]
)
self.backend.create_zone(self.context, self.zone)
self.backend.create_zone(self.admin_context, self.zone)
def test_update_zone(self):
self.backend.update_zone(self.context, self.zone)
self.assertEqual(
req_mock.last_request.json(),
{
'mode': 'GROUPED',
'restart_option': 'RESTART_IF_NEEDED',
'services': ['DNS']
}
)
self.assertIn('Create Zone', self.stdlog.logger.output)
@requests_mock.mock()
def test_delete_zone(self, req_mock):
zone_name = self.zone['name'][0:-1]
network_view = f'{self.network_view}.{self.project_id}'
view_name = f'{self.dns_view}.{self.network_view}.{self.project_id}'
req_mock.get(
f'{self.base_address}networkview?*TenantID={self.project_id}'
'&_return_fields=name',
json=[{
'_ref': f'networkview/mock:{network_view}/false',
'name': f'{network_view}'
}]
)
req_mock.get(
f'{self.base_address}view?name={view_name}&_return_fields=name',
json=[{
'_ref': f'view/mock:{view_name}/false',
'name': f'{view_name}'
}]
)
req_mock.get(
f'{self.base_address}zone_auth?fqdn={zone_name}&view={view_name}',
json=[{
'_ref': f'zone_auth/mock:{zone_name}/{view_name}',
'extattrs': {},
'fqdn': zone_name,
'grid_primary': [],
'grid_secondaries': [],
'ns_group': self.ns_group,
'view': view_name
}]
)
req_mock.delete(
f'{self.base_address}zone_auth/mock%3A{zone_name}/{view_name}',
json=[]
)
req_mock.get(
f'{self.base_address}grid',
json=[{'_ref': 'grid/mock:Infoblox'}]
)
req_mock.post(
'%s/v2.0/zone_auth' % self.base_address,
json={},
f'{self.base_address}grid/mock%3AInfoblox?'
'_function=restartservices',
json=[]
)
req_mock.get(
'%s/v2.0/zone_auth' % self.base_address,
json={},
self.backend.delete_zone(self.admin_context, self.zone)
self.assertEqual(
req_mock.last_request.json(),
{
'mode': 'GROUPED',
'restart_option': 'RESTART_IF_NEEDED',
'services': ['DNS']
}
)
req_mock.get(
'%s/v2.0/grid' % self.base_address,
json={},
)
self.backend.create_zone(self.context, self.zone)
self.backend.delete_zone(self.context, self.zone)
def test_missing_wapi_url(self):
target = dict(self.target)
target['options'] = [
{'key': 'username', 'value': 'test'},
{'key': 'password', 'value': 'test'},
{'key': 'ns_group', 'value': 'test'},
]
pool_target = objects.PoolTarget.from_dict(target)
self.assertRaisesRegex(
ibexceptions.InfobloxIsMisconfigured, "wapi_url",
impl_infoblox.InfobloxBackend, pool_target,
)
def test_missing_username(self):
target = dict(self.target)
target['options'] = [
{'key': 'wapi_url', 'value': 'test'},
{'key': 'password', 'value': 'test'},
{'key': 'ns_group', 'value': 'test'}
]
pool_target = objects.PoolTarget.from_dict(target)
self.assertRaisesRegex(
ibexceptions.InfobloxIsMisconfigured, "username",
impl_infoblox.InfobloxBackend, pool_target,
)
def test_missing_password(self):
target = dict(self.target)
target['options'] = [
{'key': 'wapi_url', 'value': 'test'},
{'key': 'username', 'value': 'test'},
{'key': 'ns_group', 'value': 'test'},
]
pool_target = objects.PoolTarget.from_dict(target)
self.assertRaisesRegex(
ibexceptions.InfobloxIsMisconfigured, "password",
impl_infoblox.InfobloxBackend, pool_target,
)
def test_missing_ns_group(self):
target = dict(self.target)
target['options'] = [
{'key': 'wapi_url', 'value': 'test'},
{'key': 'username', 'value': 'test'},
{'key': 'password', 'value': 'test'},
]
pool_target = objects.PoolTarget.from_dict(target)
self.assertRaisesRegex(
ibexceptions.InfobloxIsMisconfigured, "ns_group",
impl_infoblox.InfobloxBackend, pool_target,
)
def test_wrong_port(self):
target = dict(self.target)
target['masters'] = [
{'host': '1.1.1.1', 'port': 100},
]
pool_target = objects.PoolTarget.from_dict(target)
self.assertRaisesRegex(
exceptions.ConfigurationError,
'Infoblox only supports mDNS instances on port 53',
impl_infoblox.InfobloxBackend, pool_target,
)
self.assertIn('Delete Zone', self.stdlog.logger.output)

View File

@ -36,7 +36,8 @@
port: 53
http_pool_maxsize: 100
http_pool_connections: 100
wapi_url: https://192.0.2.2/wapi/v2.1/
wapi_host: 192.0.2.2
wapi_version: 2.1
sslverify: false
password: infoblox
username: admin

View File

@ -0,0 +1,22 @@
---
features:
- |
The existing Infoblox driver has been updated and is now based on the
official Infoblox client.
The following new options were added to the pool config for Infoblox.
- ``wapi_host``
- ``wapi_version``
- ``cert``
- ``key``
upgrade:
- |
The deprecated Infoblox configuration options in ``desginate.conf``
has now been removed. All Infoblox configuration should now be in the
``pools.yaml`` configuration file.
In addition, the ``infoblox-client`` library is now a dependency for
the Infoblox driver and should be installed additionally if the
Infoblox Backend is used.

View File

@ -36,6 +36,8 @@ data_files =
[extras]
edgegrid =
edgegrid-python>=1.1.1 # Apache-2.0
infoblox =
infoblox-client>=0.6.0 # Apache-2.0
[entry_points]
oslo.config.opts =

View File

@ -16,3 +16,4 @@ Pygments>=2.2.0 # BSD license
pymemcache!=1.3.0,>=1.2.9 # Apache 2.0 License
PyMySQL>=0.8.0 # MIT License
edgegrid-python>=1.1.1 # Apache-2.0
infoblox-client>=0.6.0 # Apache-2.0