Add asyncio eventloop.

This commits introduces the asyncio event loop as well as the base
abstract class to define watchers.

It is in a very simple approach (it does not reschedule watchers if
they fail) but it lets you to see the proposal of hierarchy in watchers
as well as the methods that a watcher has to implement (see the pod
module).

Partial-Implements: blueprint kuryr-k8s-integration
Co-Authored-By: Taku Fukushima <f.tac.mac@gmail.com>
Co-Authored-By: Antoni Segura Puimedon
Change-Id: I91975dd197213c1a6b0e171c1ae218a547722eeb
This commit is contained in:
Jaume Devesa 2016-08-24 15:19:26 +02:00
parent a8a91ef38c
commit 82dce858cf
5 changed files with 208 additions and 8 deletions

View File

@ -10,34 +10,51 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import asyncio
import sys import sys
import time
from kuryr.lib._i18n import _LI from kuryr.lib._i18n import _LI
from oslo_log import log as logging from oslo_log import log as logging
from oslo_service import service from oslo_service import service
from kuryr_kubernetes import config from kuryr_kubernetes import config
from kuryr_kubernetes.watchers import pod
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class KuryrK8sService(service.Service): class KuryrK8sService(service.Service):
"""Kuryr-Kubernetes base service.
This class extends the oslo_service.service.Service class to provide an
asynchronous event loop. It assumes that all the elements of the
`_watchers` list has a method called `watch` (normally, implemented by the
class `kuryr_kubernetes.watchers.base.AbstractBaseWatcher`).
The event loop is the default used by asyncio (asyncio.SelectorEventLoop)
"""
def __init__(self): def __init__(self):
super(KuryrK8sService, self).__init__() super(KuryrK8sService, self).__init__()
self._event_loop = asyncio.new_event_loop()
self._watchers = [
pod.PodWatcher
]
def start(self): def start(self):
# TODO(devvesa): Remove this line as soon as it does anything LOG.info(_LI("Service '%(class_name)s' started"),
LOG.info(_LI("I am doing nothing")) {'class_name': self.__class__.__name__})
for watcher in self._watchers:
instance = watcher(self._event_loop)
self._event_loop.create_task(instance.watch())
try: try:
while(True): self._event_loop.run_forever()
time.sleep(5) self._event_loop.close()
# TODO(devvesa): Remove this line as soon as does anything except Exception:
LOG.info(_LI("Keep doing nothing"))
finally:
sys.exit(1) sys.exit(1)
sys.exit(0)
def wait(self): def wait(self):
"""Waits for K8sController to complete.""" """Waits for K8sController to complete."""

22
kuryr_kubernetes/utils.py Normal file
View File

@ -0,0 +1,22 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_serialization import jsonutils
def utf8_json_decoder(byte_data):
"""Deserializes the bytes into UTF-8 encoded JSON.
:param byte_data: The bytes to be converted into the UTF-8 encoded JSON.
:returns: The UTF-8 encoded JSON represented by Python dictionary format.
"""
return jsonutils.loads(byte_data.decode('utf8'))

View File

View File

@ -0,0 +1,123 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import requests
from kuryr.lib._i18n import _LE
from oslo_log import log as logging
from kuryr_kubernetes.aio import headers as aio_headers
from kuryr_kubernetes.aio import methods as aio_methods
from kuryr_kubernetes import config
from kuryr_kubernetes import utils
LOG = logging.getLogger(__name__)
ADDED_EVENT = 'ADDED'
DELETED_EVENT = 'DELETED'
MODIFIED_EVENT = 'MODIFIED'
class AbstractBaseWatcher(object):
"""Base abstract watcher.
This class implements the default interface for the KuryrK8sService task
scheduler, which is the `watch` (no parameters) interface.
It also define a serie of abstract methods that actual watchers have to
implement in order to deal directly with events without worrying about
connection and serialization details.
These methods are:
* get_endpoint(self): return a resource URL exposed on Kubernetes API
as a string (such as "/api/v1/pods")
* on_add/on_modify/on_delete: actions to do according to each event
type.
These methods have to follow the async/await python3.5 syntax
"""
def __init__(self, event_loop):
self._event_loop = event_loop
@abc.abstractmethod
def get_api_endpoint(self):
pass
@property
def api_endpoint(self):
k8s_root = config.CONF.kubernetes.api_root
return k8s_root + self.get_api_endpoint() + "?watch=true"
async def _on_event(self, event): # flake8: noqa
event_type = event['type']
if event_type == ADDED_EVENT:
await self.on_add(event)
elif event_type == DELETED_EVENT:
await self.on_delete(event)
elif event_type == MODIFIED_EVENT:
await self.on_modify(event)
else:
LOG.warning(_LW("Unhandled event type '%(event_type)s'"),
{'event_type': event})
@abc.abstractmethod
async def on_add(self, event):
pass
@abc.abstractmethod
async def on_modify(self, event):
pass
@abc.abstractmethod
async def on_delete(self, event):
pass
async def watch(self):
"""Watches the endpoint and calls the callback with its response.
This is an endless task that keeps the event loop running forever
"""
response = await self._get_chunked_response()
while True:
content = await response.read_line()
LOG.debug('Received new event from %(watcher)s:\n\n\t'
'%(event)s.\n\n',
{'watcher': self.__class__.__name__,
'event': str(content)})
await self._on_event(content)
async def _get_chunked_response(self):
"""Get the response from Kubernetes API."""
response = await aio_methods.get(
endpoint=self.api_endpoint,
loop=self._event_loop,
decoder=utils.utf8_json_decoder)
status, reason, hdrs = await response.read_headers()
if status != requests.codes.ok: # Function returns 200
LOG.error(_LE('GET request to endpoint %(ep)s failed with '
'status %(status)s and reason %(reason)s'),
{'ep': endpoint, 'status': status, 'reason': reason})
raise requests.exceptions.HTTPError('{}: {}. Endpoint {}'.format(
status, reason, endpoint))
if hdrs.get(aio_headers.TRANSFER_ENCODING) != 'chunked':
LOG.error(_LE('watcher GET request to endpoint %(ep)s is not '
'chunked. headers: %(hdrs)s'),
{'ep': endpoint, 'hdrs': hdrs})
raise IOError(_('Can only watch endpoints that returned chunked '
'encoded transfers'))
return response

View File

@ -0,0 +1,38 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from kuryr.lib._i18n import _LI
from oslo_log import log as logging
from kuryr_kubernetes.watchers import base
LOG = logging.getLogger(__name__)
class PodWatcher(base.AbstractBaseWatcher):
ENDPOINT = "/api/v1/pods"
def __init__(self, event_loop):
super().__init__(event_loop)
def get_api_endpoint(self):
return self.ENDPOINT
async def on_add(self, event): # flake8: noqa
LOG.info(_LI('Received an ADDED event on a Pod'))
async def on_modify(self, event):
LOG.info(_LI('Received a MODIFIED event on a Pod'))
async def on_delete(self, event):
LOG.info(_LI('Received a DELETED event on a Pod'))