Introduce the translator layer

This commit introduces the translation layer to call Neutron API, as
well as the config_map that glues the watcher layer to the translation
layer, and it makes it configurable.

It simplifies a lot the watcher layer, making trivial to create new ones
and relies all the business logic to the translation layer. That
introduces the option to some people extend the project by creating
custom translators by implementing a simple interface and using another
config_map.

Partial-Implements: blueprint kuryr-k8s-integration
Change-Id: I1c8bcfe1f37f28e833dd2d4631d011f044fd9c47
Signed-off-by: Jaume Devesa <devvesa@gmail.com>
This commit is contained in:
Jaume Devesa 2016-08-30 15:11:38 +02:00
parent aa0139e56b
commit 7e550f3936
8 changed files with 289 additions and 150 deletions

View File

@ -12,6 +12,13 @@
import pbr.version
from kuryr_kubernetes.translators import port
from kuryr_kubernetes import watchers
__version__ = pbr.version.VersionInfo(
'kuryr_kubernetes').version_string()
CONFIG_MAP = {
watchers.PodWatcher: [port.PortTranslator]
}

View File

@ -34,6 +34,10 @@ k8s_opts = [
cfg.StrOpt('api_root',
help=_("The root URL of the Kubernetes API"),
default=os.environ.get('K8S_API', 'http://localhost:8080')),
cfg.StrOpt('config_map',
help=_("The dict that stores the relationships between watchers and"
" translators."),
default=('kuryr_kubernetes.CONFIG_MAP'))
]

View File

@ -13,12 +13,13 @@
import asyncio
import sys
from kuryr.lib._i18n import _LI
from kuryr.lib._i18n import _LI, _LE
from oslo_log import log as logging
from oslo_service import service
from oslo_utils import excutils
from oslo_utils import importutils
from kuryr_kubernetes import config
from kuryr_kubernetes.watchers import pod
LOG = logging.getLogger(__name__)
@ -38,20 +39,25 @@ class KuryrK8sService(service.Service):
def __init__(self):
super(KuryrK8sService, self).__init__()
self._event_loop = asyncio.new_event_loop()
self._watchers = [
pod.PodWatcher
]
def start(self):
LOG.info(_LI("Service '%(class_name)s' started"),
{'class_name': self.__class__.__name__})
for watcher in self._watchers:
instance = watcher(self._event_loop)
self._event_loop.create_task(instance.watch())
try:
config_map = importutils.import_class(
config.CONF.kubernetes.config_map)
for watcher, translators in config_map.items():
instance = watcher(self._event_loop, translators)
self._event_loop.create_task(instance.watch())
self._event_loop.run_forever()
self._event_loop.close()
except ImportError:
with excutils.save_and_reraise_exception():
LOG.exception(_LE("Error loading config_map '%(map)s'"),
{'map': config.CONF.kubernetes.config_map})
except Exception:
sys.exit(1)
sys.exit(0)

View File

@ -9,30 +9,33 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kuryr.lib._i18n import _LI
from oslo_log import log as logging
from kuryr_kubernetes.watchers import base
LOG = logging.getLogger(__name__)
import abc
class PodWatcher(base.AbstractBaseWatcher):
class AbstractBaseTranslator(object):
"""Abstract Translator class. """
ENDPOINT = "/api/v1/pods"
def __init__(self):
# TODO(devvesa) initialize neutron client
pass
def __init__(self, event_loop):
super().__init__(event_loop)
@abc.abstractmethod
def get_annotation(self):
"""Kubernetes annotation to update.
def get_api_endpoint(self):
return self.ENDPOINT
Return the kubernetes annotation that we want to update once each
task is finished.
"""
pass
@abc.abstractmethod
async def on_add(self, event): # flake8: noqa
LOG.info(_LI('Received an ADDED event on a Pod'))
async def on_modify(self, event):
LOG.info(_LI('Received a MODIFIED event on a Pod'))
pass
@abc.abstractmethod
async def on_delete(self, event):
LOG.info(_LI('Received a DELETED event on a Pod'))
pass
@abc.abstractmethod
async def on_modify(self, event):
pass

View File

@ -0,0 +1,41 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kuryr.lib._i18n import _LI
from oslo_log import log as logging
from kuryr_kubernetes.translators import base
LOG = logging.getLogger(__name__)
class PortTranslator(base.AbstractBaseTranslator):
def __init__(self):
super().__init__()
def get_annotation(self):
return 'kuryr.kubernetes.org/neutron-port'
async def on_add(self, event): # flake8: noqa
LOG.info(_LI('Creating a port'))
# TODO(devvesa): remove this part ASAP. This statement it only applies
# when checking that the result is serialized on a real K8s. We don't
# have any end-to-end test yet, so it allows reviewers to see that
# works.
return {'port-created': False}
async def on_modify(self, event):
LOG.info(_LI('Modifying a port'))
async def on_delete(self, event):
LOG.info(_LI('Deleting a port'))

View File

@ -0,0 +1,201 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import asyncio
import functools
import requests
from kuryr.lib._i18n import _LI, _LW, _LE
from oslo_log import log as logging
from oslo_serialization import jsonutils
from kuryr_kubernetes.aio import headers as aio_headers
from kuryr_kubernetes.aio import methods as aio_methods
from kuryr_kubernetes import config
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
ADDED_EVENT = 'ADDED'
DELETED_EVENT = 'DELETED'
MODIFIED_EVENT = 'MODIFIED'
class AbstractBaseWatcher(object):
"""Base abstract watcher.
This class implements the default interface for the KuryrK8sService task
scheduler, which is the `watch` (no parameters) interface.
It takes care of the events that receives and it triggers the appropiate
action on the translators configured on the config.kubernetes.config_map
structure.
Actual watchers will only need to define the 'get_api_endpoint' method
that return an String URL in order to suscribe to Kubernetes API events.
(See :class PodWatcher: below).
"""
def __init__(self, event_loop, translators):
self._event_loop = event_loop
self._translators = translators
self._k8s_root = config.CONF.kubernetes.api_root
async def _get_chunked_connection(self): # flake8: noqa
"""Get the connection response from Kubernetes API.
Initializes the connection with Kubernetes API. Since the content type
is Chunked (http://en.wikipedia.org/wiki/Chunked_transfer_encoding), the
connection remains open.
"""
connection = await aio_methods.get(
endpoint=self.api_endpoint,
loop=self._event_loop,
decoder=utils.utf8_json_decoder)
status, reason, hdrs = await connection.read_headers()
if status != requests.codes.ok: # Function returns 200
LOG.error(_LE('GET request to endpoint %(ep)s failed with '
'status %(status)s and reason %(reason)s'),
{'ep': endpoint, 'status': status, 'reason': reason})
raise requests.exceptions.HTTPError('{}: {}. Endpoint {}'.format(
status, reason, endpoint))
if hdrs.get(aio_headers.TRANSFER_ENCODING) != 'chunked':
LOG.error(_LE('watcher GET request to endpoint %(ep)s is not '
'chunked. headers: %(hdrs)s'),
{'ep': endpoint, 'hdrs': hdrs})
raise IOError(_('Can only watch endpoints that returned chunked '
'encoded transfers'))
return connection
def _update_annotation(self, self_link, annotation, future):
"""Update K8s entities' annotations
This method is the callback of all the tasks scheduled in the
'self._on_event' method.
In case the _on_event 'future' returns something different that None, it
will update the annotations in resource defined by 'self_link' with the
key 'annotation' and value 'future.get_result()'
:param self_link: Entity link to update.
:param annotation: Key of the annotation to update.
:param future: Value of the annotation to update.
"""
future_result = future.result()
if not future_result:
return
patch_headers = {
'Content-Type': 'application/strategic-merge-patch+json',
'Accept': 'application/json',
}
# Annotations are supposed to be key=value, being 'value'
# an string. So we need to dump the dict result into the annotation into
# a json
future_result_json = jsonutils.dumps(future_result)
annotations = {annotation: jsonutils.dumps(future_result)}
data = jsonutils.dumps({
'metadata': {
'annotations': annotations}})
url = self._k8s_root + self_link
# TODO(devvesa): Use the aio package to convert this call into an
# asynchornous one. Aio package does not support patch method yet.
result = requests.patch(url, data=data, headers=patch_headers)
if not result.ok:
LOG.warn(_LW("PATCH request to %(url)s for annotation update "
"%(data)s failed with error code %(error_code)s and "
"reason %(reason)s"),
{'url': url,
'data': data,
'error_code': result.status_code,
'reason': result.json()})
LOG.debug("Annotation update %(data)s succeded on resource %(url)s",
{'data': data, 'url': url})
async def _on_event(self, event):
if not 'type' in event:
LOG.warn(_LW('Received an event without "type":\n\n\t%(event)s'),
{'event': event})
return
event_type = event['type']
self_link = event['object']['metadata']['selfLink']
LOG.info(_LI('Received an %(event_type)s event on a '
'%(kind)s with link "%(link)s"'),
{'event_type': event_type,
'kind': event['object']['kind'],
'link': self_link})
# Dispatch the event on its method
dispatch_map = {
ADDED_EVENT: 'on_add',
DELETED_EVENT: 'on_delete',
MODIFIED_EVENT: 'on_modify'}
if not event_type in dispatch_map:
LOG.warning(_LW("Unhandled event type '%(event_type)s'"),
{'event_type': event_type})
return
# Run the method on each of the translators defined on the config_map
tasks = []
for t_class in self._translators:
translator = t_class()
method = getattr(translator, dispatch_map[event_type])
task = self._event_loop.create_task(method(event))
task.add_done_callback(
functools.partial(self._update_annotation, self_link,
translator.get_annotation()))
tasks.append(task)
asyncio.wait(tasks)
@property
def api_endpoint(self):
k8s_root = config.CONF.kubernetes.api_root
return k8s_root + self.get_api_endpoint() + "?watch=true"
@abc.abstractmethod
def get_api_endpoint(self):
pass
async def watch(self):
"""Watches the endpoint and calls the callback with its response.
This is an endless task that keeps the event loop running forever
"""
connection = await self._get_chunked_connection()
while True:
content = await connection.read_line()
LOG.debug('Received new event from %(watcher)s:\n\n\t'
'%(event)s.\n\n',
{'watcher': self.__class__.__name__,
'event': str(content)})
await self._on_event(content)
class PodWatcher(AbstractBaseWatcher):
"""Watch the Pod endpoints on K8s API."""
ENDPOINT = "/api/v1/pods"
def __init__(self, event_loop, translators):
super().__init__(event_loop, translators)
def get_api_endpoint(self):
return self.ENDPOINT

View File

@ -1,123 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import requests
from kuryr.lib._i18n import _LE
from oslo_log import log as logging
from kuryr_kubernetes.aio import headers as aio_headers
from kuryr_kubernetes.aio import methods as aio_methods
from kuryr_kubernetes import config
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
ADDED_EVENT = 'ADDED'
DELETED_EVENT = 'DELETED'
MODIFIED_EVENT = 'MODIFIED'
class AbstractBaseWatcher(object):
"""Base abstract watcher.
This class implements the default interface for the KuryrK8sService task
scheduler, which is the `watch` (no parameters) interface.
It also define a serie of abstract methods that actual watchers have to
implement in order to deal directly with events without worrying about
connection and serialization details.
These methods are:
* get_endpoint(self): return a resource URL exposed on Kubernetes API
as a string (such as "/api/v1/pods")
* on_add/on_modify/on_delete: actions to do according to each event
type.
These methods have to follow the async/await python3.5 syntax
"""
def __init__(self, event_loop):
self._event_loop = event_loop
@abc.abstractmethod
def get_api_endpoint(self):
pass
@property
def api_endpoint(self):
k8s_root = config.CONF.kubernetes.api_root
return k8s_root + self.get_api_endpoint() + "?watch=true"
async def _on_event(self, event): # flake8: noqa
event_type = event['type']
if event_type == ADDED_EVENT:
await self.on_add(event)
elif event_type == DELETED_EVENT:
await self.on_delete(event)
elif event_type == MODIFIED_EVENT:
await self.on_modify(event)
else:
LOG.warning(_LW("Unhandled event type '%(event_type)s'"),
{'event_type': event})
@abc.abstractmethod
async def on_add(self, event):
pass
@abc.abstractmethod
async def on_modify(self, event):
pass
@abc.abstractmethod
async def on_delete(self, event):
pass
async def watch(self):
"""Watches the endpoint and calls the callback with its response.
This is an endless task that keeps the event loop running forever
"""
response = await self._get_chunked_response()
while True:
content = await response.read_line()
LOG.debug('Received new event from %(watcher)s:\n\n\t'
'%(event)s.\n\n',
{'watcher': self.__class__.__name__,
'event': str(content)})
await self._on_event(content)
async def _get_chunked_response(self):
"""Get the response from Kubernetes API."""
response = await aio_methods.get(
endpoint=self.api_endpoint,
loop=self._event_loop,
decoder=utils.utf8_json_decoder)
status, reason, hdrs = await response.read_headers()
if status != requests.codes.ok: # Function returns 200
LOG.error(_LE('GET request to endpoint %(ep)s failed with '
'status %(status)s and reason %(reason)s'),
{'ep': endpoint, 'status': status, 'reason': reason})
raise requests.exceptions.HTTPError('{}: {}. Endpoint {}'.format(
status, reason, endpoint))
if hdrs.get(aio_headers.TRANSFER_ENCODING) != 'chunked':
LOG.error(_LE('watcher GET request to endpoint %(ep)s is not '
'chunked. headers: %(hdrs)s'),
{'ep': endpoint, 'hdrs': hdrs})
raise IOError(_('Can only watch endpoints that returned chunked '
'encoded transfers'))
return response