Removing Python 3.5-specific parts
This patch removes Python 3.5-specific parts from Kuryr-Kubernetes. The desicion was made [1] to make Kuryr-Kubernetes compatible with Python 2. [1] http://eavesdrop.openstack.org/\ meetings/kuryr/2016/kuryr.2016-10-03-14.00.html Change-Id: Iff66ee2818088b02f259d56d50ce3e994814cc5d Partially-Implements: blueprint kuryr-k8s-integration
This commit is contained in:
parent
378e571dd4
commit
3477c58e4b
@ -1,12 +1,5 @@
|
||||
[[local|localrc]]
|
||||
|
||||
# At the moment kuryr-kubernetes only works with Python35, thus, you should
|
||||
# read https://wiki.openstack.org/wiki/Python3#Enable_Python_3_in_DevStack
|
||||
# This devstack plugin should work with Ubuntu 16.04+ and Fedora 24+
|
||||
# Make sure you have python3.5 and it's development package installed
|
||||
USE_PYTHON3=True
|
||||
PYTHON3_VERSION=3.5
|
||||
|
||||
enable_plugin kuryr-kubernetes \
|
||||
https://git.openstack.org/openstack/kuryr-kubernetes
|
||||
|
||||
|
@ -353,7 +353,7 @@ if is_service_enabled kuryr-kubernetes; then
|
||||
# sure Kuryr can start before neutron-server, so Kuryr start in "extra"
|
||||
# phase. Bug: https://bugs.launchpad.net/kuryr/+bug/1587522
|
||||
run_process kuryr-kubernetes \
|
||||
"python3 ${KURYR_HOME}/scripts/run_server.py \
|
||||
"python ${KURYR_HOME}/scripts/run_server.py \
|
||||
--config-file $KURYR_CONFIG"
|
||||
fi
|
||||
|
||||
|
@ -1,18 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
CONTENT_TYPE = 'CONTENT-TYPE'
|
||||
CONTENT_LENGTH = 'CONTENT-LENGTH'
|
||||
HOST = 'HOST'
|
||||
TRANSFER_ENCODING = 'TRANSFER-ENCODING'
|
||||
USER_AGENT = 'USER-AGENT'
|
@ -1,296 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import asyncio
|
||||
from collections import deque
|
||||
import re
|
||||
import sys
|
||||
from urllib import parse
|
||||
|
||||
from kuryr.lib._i18n import _
|
||||
from oslo_log import log
|
||||
from oslo_serialization import jsonutils
|
||||
import requests
|
||||
|
||||
from kuryr_kubernetes.aio import headers
|
||||
from kuryr_kubernetes.aio import streams
|
||||
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
GET = 'GET'
|
||||
PATCH = 'PATCH'
|
||||
POST = 'POST'
|
||||
|
||||
|
||||
class Response(object):
|
||||
"""HTTP Response class for dealing with HTTP responses in an async way"""
|
||||
_full_line = re.compile(b'(.+)(\r\n|\n|\r)')
|
||||
_line_remainder = re.compile(b'(\r\n|\n|\r)(.+)\Z')
|
||||
|
||||
def __init__(self, reader, writer, decoder=None):
|
||||
self._reader = reader
|
||||
self._writer = writer
|
||||
self.decoder = decoder
|
||||
self.status = None
|
||||
self.headers = None
|
||||
self.content = None
|
||||
self.decoded = None
|
||||
self._remainder = b''
|
||||
self._matches = None
|
||||
|
||||
async def read_headers(self): # flake8: noqa
|
||||
"""Returns HTTP status, reason and headers and updates the object
|
||||
|
||||
One can either get the response doing:
|
||||
|
||||
status, reason, hdrs = await response.read_headers()
|
||||
assert status == 200
|
||||
|
||||
or check the object after it has been updated:
|
||||
|
||||
await response.read_headers()
|
||||
assert response.status == 200
|
||||
"""
|
||||
hdrs = {}
|
||||
# Read status
|
||||
line = await self._reader.readline()
|
||||
if not line:
|
||||
raise IOError(_('No status received'))
|
||||
|
||||
line = line.decode('ascii').rstrip()
|
||||
http_version, status, reason = line.split(' ', maxsplit=2)
|
||||
self.status = int(status)
|
||||
|
||||
while True:
|
||||
line = await self._reader.readline()
|
||||
if not line:
|
||||
break
|
||||
line = line.decode('ascii').rstrip()
|
||||
if line:
|
||||
try:
|
||||
key, value = line.split(': ')
|
||||
hdrs[key.upper()] = value
|
||||
except ValueError:
|
||||
LOG.debug('Failed to read header: %s', line)
|
||||
else:
|
||||
break
|
||||
if self._reader.at_eof():
|
||||
break
|
||||
self.headers = hdrs
|
||||
return self.status, reason, self.headers
|
||||
|
||||
async def read_chunk(self):
|
||||
"""Returns an HTTP chunked response chunk. None when finsihed"""
|
||||
result = await self._reader.readchunk()
|
||||
if result == b'' and self._reader.at_eof():
|
||||
result = None
|
||||
if self._writer.can_write_eof():
|
||||
self._writer.write_eof()
|
||||
self._writer.close()
|
||||
return result
|
||||
|
||||
async def read(self):
|
||||
"""Returns the whole body of a non-chunked HTTP response"""
|
||||
result = await self._reader.readexactly(
|
||||
int(self.headers[headers.CONTENT_LENGTH]))
|
||||
if self._writer.can_write_eof():
|
||||
self._writer.write_eof()
|
||||
self._writer.close()
|
||||
self.content = result
|
||||
if self.decoder is not None:
|
||||
result = self.decoder(result)
|
||||
self.decoded = result
|
||||
return result
|
||||
|
||||
async def read_line(self):
|
||||
"""Returns a line out of HTTP chunked response chunks.
|
||||
|
||||
If there are no more chunks to complete a line, it returns None
|
||||
"""
|
||||
if self._matches is None:
|
||||
self._matches = deque()
|
||||
if self._matches:
|
||||
result = self._matches.pop().group(0)
|
||||
else:
|
||||
while True:
|
||||
chunk = await self._reader.readchunk()
|
||||
if chunk == b'' and self._reader.at_eof():
|
||||
result = None
|
||||
if self._writer.can_write_eof():
|
||||
self._writer.write_eof()
|
||||
self._writer.close()
|
||||
break
|
||||
if self._remainder:
|
||||
chunk = self._remainder + chunk
|
||||
self._remainder = b''
|
||||
for match in self._full_line.finditer(chunk):
|
||||
self._matches.appendleft(match)
|
||||
leftovers = [match.group(2) for match in
|
||||
self._line_remainder.finditer(chunk)]
|
||||
if leftovers:
|
||||
self._remainder, = leftovers
|
||||
if self._matches:
|
||||
result = self._matches.pop().group(0)
|
||||
break
|
||||
|
||||
if None not in (result, self.decoder):
|
||||
result = self.decoder(result)
|
||||
return result
|
||||
|
||||
async def read_all(self):
|
||||
if self.headers.get(headers.TRANSFER_ENCODING) == 'chunked':
|
||||
readings = []
|
||||
while True:
|
||||
read = await self.read_chunk()
|
||||
if read is None:
|
||||
break
|
||||
else:
|
||||
readings.append(read)
|
||||
content = b''.join(readings)
|
||||
if self.decoder is not None:
|
||||
result = self.decoder(content)
|
||||
self.decoded = result
|
||||
else:
|
||||
result = content
|
||||
else:
|
||||
result = await self.read()
|
||||
return result
|
||||
|
||||
|
||||
async def get(endpoint, decoder=None, loop=None):
|
||||
"""Returns a Response object
|
||||
|
||||
This coroutine is intended to be used as a convenience method to perform
|
||||
HTTP GET requests in an asyncronous way. With the response object it
|
||||
returns you can obtain the GET data. Example:
|
||||
|
||||
response = await get('http://httpbin.org/ip')
|
||||
status, reason, hdrs = await response.read_headers()
|
||||
if status == 200: # check that the request is OK
|
||||
content = await response.read() # Read a non chunked response
|
||||
"""
|
||||
parsed_url = parse.urlsplit(endpoint)
|
||||
host = parsed_url.hostname.encode('idna').decode('utf8')
|
||||
# requests does proper path encoding for non ascii chars
|
||||
req = requests.Request(GET, endpoint).prepare()
|
||||
|
||||
if parsed_url.scheme == 'https':
|
||||
port = 443 if parsed_url.port is None else parsed_url.port
|
||||
ssl = True
|
||||
else:
|
||||
port = 80 if parsed_url.port is None else parsed_url.port
|
||||
ssl = False
|
||||
|
||||
if loop is None:
|
||||
loop = asyncio.events.get_event_loop()
|
||||
reader = streams.ChunkedStreamReader(limit=asyncio.streams._DEFAULT_LIMIT,
|
||||
loop=loop)
|
||||
protocol = asyncio.streams.StreamReaderProtocol(reader, loop=loop)
|
||||
transport, _ = await loop.create_connection(
|
||||
lambda: protocol, host, port, ssl=ssl)
|
||||
writer = asyncio.streams.StreamWriter(transport, protocol, reader, loop)
|
||||
|
||||
_write_headers(writer, _auto_headers(parsed_url),
|
||||
_request_line('GET', req.path_url))
|
||||
|
||||
return Response(reader, writer, decoder)
|
||||
|
||||
|
||||
def _auto_headers(parsed_url):
|
||||
return {
|
||||
headers.USER_AGENT:
|
||||
'Python/{0[0]}.{0[1]} raven/1.0'.format(sys.version_info),
|
||||
headers.HOST: parsed_url.netloc}
|
||||
|
||||
|
||||
def _request_line(method, req_uri, http_version='1.1'):
|
||||
req_line = '{method} {path} HTTP/{version}\r\n'.format(
|
||||
method=method, path=req_uri, version=http_version)
|
||||
return req_line
|
||||
|
||||
|
||||
def _write_headers(writer, headers, request_line):
|
||||
content = request_line + ''.join(
|
||||
key + ': ' + val + '\r\n' for key, val in headers.items()) + '\r\n'
|
||||
encoded_content = content.encode('utf8')
|
||||
writer.write(encoded_content)
|
||||
|
||||
if __name__ == '__main__':
|
||||
def term_handler():
|
||||
for task in asyncio.Task.all_tasks():
|
||||
task.cancel()
|
||||
print('Cancelling all threads...')
|
||||
print('Exitting')
|
||||
|
||||
async def print_response(http_func, url, loop, print_headers=False,
|
||||
line_based=True):
|
||||
response = await http_func(url, loop=loop)
|
||||
status, reason, hdrs = await response.read_headers()
|
||||
if hdrs.get(headers.CONTENT_TYPE) == 'application/json':
|
||||
response.decoder = lambda x: jsonutils.loads(x.decode())
|
||||
if status != 200:
|
||||
print('HTTP Status {}: {}. Exiting...'.format(status, reason))
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
if hdrs.get(headers.TRANSFER_ENCODING) == 'chunked':
|
||||
while True:
|
||||
if line_based:
|
||||
content = await response.read_line()
|
||||
else:
|
||||
content = await response.read_chunk()
|
||||
if content is None:
|
||||
break
|
||||
print(content)
|
||||
else:
|
||||
content = await response.read()
|
||||
print(content)
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
async def print_raw(http_func, url, loop):
|
||||
response = await http_func(url, loop=loop)
|
||||
content = await asyncio.shield(response._reader.read(-1))
|
||||
print(content)
|
||||
|
||||
import argparse
|
||||
import signal
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument('method', help='The HTTP Method, i.e. "get"')
|
||||
parser.add_argument('url', help="The URL to do an HTTP Request to")
|
||||
parser.add_argument('-l', '--line-based', help='Process the chunk lines',
|
||||
action='store_true')
|
||||
parser.add_argument('-i', '--header-info', help='print headers',
|
||||
action='store_true')
|
||||
parser.add_argument('-r', '--raw', help='print raw response',
|
||||
action='store_true')
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.method not in ('get',):
|
||||
raise NotImplementedError
|
||||
else:
|
||||
http_func = get
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
if args.raw:
|
||||
task = asyncio.async(print_raw(http_func, args.url, loop=loop))
|
||||
else:
|
||||
task = asyncio.async(print_response(http_func, args.url, loop=loop,
|
||||
print_headers=args.header_info,
|
||||
line_based=args.line_based))
|
||||
loop.run_until_complete(task)
|
||||
|
||||
loop.add_signal_handler(signal.SIGINT, term_handler)
|
||||
loop.add_signal_handler(signal.SIGTERM, term_handler)
|
||||
loop.close()
|
@ -1,68 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
from asyncio import streams
|
||||
|
||||
|
||||
class ChunkedStreamReader(streams.StreamReader):
|
||||
|
||||
async def readchunk(self): # flake8: noqa
|
||||
"""Modified asyncio.streams.readline for http chunks
|
||||
|
||||
Returns an HTTP1.1 chunk transfer encoding chunk. Returns None if it is
|
||||
the trailing chunk.
|
||||
"""
|
||||
if self._exception is not None:
|
||||
raise self._exception
|
||||
|
||||
chunk_size = bytearray()
|
||||
chunk = bytearray()
|
||||
size = None
|
||||
sep = b'\r\n'
|
||||
|
||||
while size != 0:
|
||||
while self._buffer and size is None:
|
||||
ichar = self._buffer.find(sep)
|
||||
if ichar < 0:
|
||||
chunk_size.extend(self._buffer)
|
||||
self._buffer.clear()
|
||||
else: # size present
|
||||
chunk_size.extend(self._buffer[:ichar])
|
||||
size = int(bytes(chunk_size), 16)
|
||||
if size == 0: # Terminal chunk
|
||||
self._buffer.clear()
|
||||
self.feed_eof()
|
||||
return b''
|
||||
else:
|
||||
del self._buffer[:ichar + len(sep)]
|
||||
|
||||
while self._buffer and size > 0:
|
||||
buff_size = len(self._buffer)
|
||||
if buff_size < size:
|
||||
chunk.extend(self._buffer)
|
||||
self._buffer.clear()
|
||||
size -= buff_size
|
||||
else:
|
||||
chunk.extend(self._buffer[:size])
|
||||
del self._buffer[:size + len(sep)] # delete also trailer
|
||||
size = 0
|
||||
|
||||
if self._eof:
|
||||
break
|
||||
|
||||
if size is None or size > 0:
|
||||
await self._wait_for_data('readchunk')
|
||||
elif size < 0:
|
||||
raise ValueError(_('Chunk wrongly encoded'))
|
||||
|
||||
self._maybe_resume_transport()
|
||||
return bytes(chunk)
|
@ -1,82 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
|
||||
from kuryr.lib._i18n import _LI, _LE
|
||||
from oslo_log import log as logging
|
||||
from oslo_service import service
|
||||
from oslo_utils import excutils
|
||||
from oslo_utils import importutils
|
||||
|
||||
from kuryr_kubernetes import config
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KuryrK8sService(service.Service):
|
||||
"""Kuryr-Kubernetes base service.
|
||||
|
||||
This class extends the oslo_service.service.Service class to provide an
|
||||
asynchronous event loop. It assumes that all the elements of the
|
||||
`_watchers` list has a method called `watch` (normally, implemented by the
|
||||
class `kuryr_kubernetes.watchers.base.AbstractBaseWatcher`).
|
||||
|
||||
The event loop is the default used by asyncio (asyncio.SelectorEventLoop)
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
super(KuryrK8sService, self).__init__()
|
||||
self._event_loop = asyncio.new_event_loop()
|
||||
|
||||
def start(self):
|
||||
LOG.info(_LI("Service '%(class_name)s' started"),
|
||||
{'class_name': self.__class__.__name__})
|
||||
try:
|
||||
config_map = importutils.import_class(
|
||||
config.CONF.kubernetes.config_map)
|
||||
|
||||
for watcher, translators in config_map.items():
|
||||
instance = watcher(self._event_loop, translators)
|
||||
self._event_loop.create_task(instance.watch())
|
||||
|
||||
self._event_loop.run_forever()
|
||||
self._event_loop.close()
|
||||
|
||||
except ImportError:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.exception(_LE("Error loading config_map '%(map)s'"),
|
||||
{'map': config.CONF.kubernetes.config_map})
|
||||
except Exception:
|
||||
sys.exit(1)
|
||||
sys.exit(0)
|
||||
|
||||
def wait(self):
|
||||
"""Waits for K8sController to complete."""
|
||||
super(KuryrK8sService, self).wait()
|
||||
|
||||
def stop(self, graceful=False):
|
||||
"""Stops the event loop if it's not stopped already."""
|
||||
super(KuryrK8sService, self).stop(graceful)
|
||||
|
||||
|
||||
def start():
|
||||
config.init(sys.argv[1:])
|
||||
config.setup_logging()
|
||||
kuryrk8s_launcher = service.launch(config.CONF, KuryrK8sService())
|
||||
kuryrk8s_launcher.wait()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
start()
|
@ -1,41 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import abc
|
||||
|
||||
|
||||
class AbstractBaseTranslator(object):
|
||||
"""Abstract Translator class. """
|
||||
|
||||
def __init__(self):
|
||||
# TODO(devvesa) initialize neutron client
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_annotation(self):
|
||||
"""Kubernetes annotation to update.
|
||||
|
||||
Return the kubernetes annotation that we want to update once each
|
||||
task is finished.
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
async def on_add(self, event): # flake8: noqa
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
async def on_delete(self, event):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
async def on_modify(self, event):
|
||||
pass
|
@ -1,41 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from kuryr.lib._i18n import _LI
|
||||
from oslo_log import log as logging
|
||||
|
||||
from kuryr_kubernetes.translators import base
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class PortTranslator(base.AbstractBaseTranslator):
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def get_annotation(self):
|
||||
return 'kuryr.kubernetes.org/neutron-port'
|
||||
|
||||
async def on_add(self, event): # flake8: noqa
|
||||
LOG.info(_LI('Creating a port'))
|
||||
# TODO(devvesa): remove this part ASAP. This statement it only applies
|
||||
# when checking that the result is serialized on a real K8s. We don't
|
||||
# have any end-to-end test yet, so it allows reviewers to see that
|
||||
# works.
|
||||
return {'port-created': False}
|
||||
|
||||
async def on_modify(self, event):
|
||||
LOG.info(_LI('Modifying a port'))
|
||||
|
||||
async def on_delete(self, event):
|
||||
LOG.info(_LI('Deleting a port'))
|
@ -1,201 +0,0 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import abc
|
||||
import asyncio
|
||||
import functools
|
||||
import requests
|
||||
|
||||
from kuryr.lib._i18n import _LI, _LW, _LE
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
from kuryr_kubernetes.aio import headers as aio_headers
|
||||
from kuryr_kubernetes.aio import methods as aio_methods
|
||||
from kuryr_kubernetes import config
|
||||
from kuryr_kubernetes import utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
ADDED_EVENT = 'ADDED'
|
||||
DELETED_EVENT = 'DELETED'
|
||||
MODIFIED_EVENT = 'MODIFIED'
|
||||
|
||||
|
||||
class AbstractBaseWatcher(object):
|
||||
"""Base abstract watcher.
|
||||
|
||||
This class implements the default interface for the KuryrK8sService task
|
||||
scheduler, which is the `watch` (no parameters) interface.
|
||||
|
||||
It takes care of the events that receives and it triggers the appropiate
|
||||
action on the translators configured on the config.kubernetes.config_map
|
||||
structure.
|
||||
|
||||
Actual watchers will only need to define the 'get_api_endpoint' method
|
||||
that return an String URL in order to suscribe to Kubernetes API events.
|
||||
(See :class PodWatcher: below).
|
||||
"""
|
||||
|
||||
def __init__(self, event_loop, translators):
|
||||
self._event_loop = event_loop
|
||||
self._translators = translators
|
||||
self._k8s_root = config.CONF.kubernetes.api_root
|
||||
|
||||
async def _get_chunked_connection(self): # flake8: noqa
|
||||
"""Get the connection response from Kubernetes API.
|
||||
|
||||
Initializes the connection with Kubernetes API. Since the content type
|
||||
is Chunked (http://en.wikipedia.org/wiki/Chunked_transfer_encoding), the
|
||||
connection remains open.
|
||||
"""
|
||||
connection = await aio_methods.get(
|
||||
endpoint=self.api_endpoint,
|
||||
loop=self._event_loop,
|
||||
decoder=utils.utf8_json_decoder)
|
||||
|
||||
status, reason, hdrs = await connection.read_headers()
|
||||
if status != requests.codes.ok: # Function returns 200
|
||||
LOG.error(_LE('GET request to endpoint %(ep)s failed with '
|
||||
'status %(status)s and reason %(reason)s'),
|
||||
{'ep': endpoint, 'status': status, 'reason': reason})
|
||||
raise requests.exceptions.HTTPError('{}: {}. Endpoint {}'.format(
|
||||
status, reason, endpoint))
|
||||
if hdrs.get(aio_headers.TRANSFER_ENCODING) != 'chunked':
|
||||
LOG.error(_LE('watcher GET request to endpoint %(ep)s is not '
|
||||
'chunked. headers: %(hdrs)s'),
|
||||
{'ep': endpoint, 'hdrs': hdrs})
|
||||
raise IOError(_('Can only watch endpoints that returned chunked '
|
||||
'encoded transfers'))
|
||||
|
||||
return connection
|
||||
|
||||
def _update_annotation(self, self_link, annotation, future):
|
||||
"""Update K8s entities' annotations
|
||||
|
||||
This method is the callback of all the tasks scheduled in the
|
||||
'self._on_event' method.
|
||||
|
||||
In case the _on_event 'future' returns something different that None, it
|
||||
will update the annotations in resource defined by 'self_link' with the
|
||||
key 'annotation' and value 'future.get_result()'
|
||||
|
||||
:param self_link: Entity link to update.
|
||||
:param annotation: Key of the annotation to update.
|
||||
:param future: Value of the annotation to update.
|
||||
"""
|
||||
future_result = future.result()
|
||||
if not future_result:
|
||||
return
|
||||
|
||||
patch_headers = {
|
||||
'Content-Type': 'application/strategic-merge-patch+json',
|
||||
'Accept': 'application/json',
|
||||
}
|
||||
|
||||
# Annotations are supposed to be key=value, being 'value'
|
||||
# an string. So we need to dump the dict result into the annotation into
|
||||
# a json
|
||||
future_result_json = jsonutils.dumps(future_result)
|
||||
|
||||
annotations = {annotation: jsonutils.dumps(future_result)}
|
||||
data = jsonutils.dumps({
|
||||
'metadata': {
|
||||
'annotations': annotations}})
|
||||
url = self._k8s_root + self_link
|
||||
|
||||
# TODO(devvesa): Use the aio package to convert this call into an
|
||||
# asynchornous one. Aio package does not support patch method yet.
|
||||
result = requests.patch(url, data=data, headers=patch_headers)
|
||||
if not result.ok:
|
||||
LOG.warn(_LW("PATCH request to %(url)s for annotation update "
|
||||
"%(data)s failed with error code %(error_code)s and "
|
||||
"reason %(reason)s"),
|
||||
{'url': url,
|
||||
'data': data,
|
||||
'error_code': result.status_code,
|
||||
'reason': result.json()})
|
||||
LOG.debug("Annotation update %(data)s succeded on resource %(url)s",
|
||||
{'data': data, 'url': url})
|
||||
|
||||
|
||||
async def _on_event(self, event):
|
||||
|
||||
if not 'type' in event:
|
||||
LOG.warn(_LW('Received an event without "type":\n\n\t%(event)s'),
|
||||
{'event': event})
|
||||
return
|
||||
|
||||
event_type = event['type']
|
||||
self_link = event['object']['metadata']['selfLink']
|
||||
LOG.info(_LI('Received an %(event_type)s event on a '
|
||||
'%(kind)s with link "%(link)s"'),
|
||||
{'event_type': event_type,
|
||||
'kind': event['object']['kind'],
|
||||
'link': self_link})
|
||||
|
||||
# Dispatch the event on its method
|
||||
dispatch_map = {
|
||||
ADDED_EVENT: 'on_add',
|
||||
DELETED_EVENT: 'on_delete',
|
||||
MODIFIED_EVENT: 'on_modify'}
|
||||
|
||||
if not event_type in dispatch_map:
|
||||
LOG.warning(_LW("Unhandled event type '%(event_type)s'"),
|
||||
{'event_type': event_type})
|
||||
return
|
||||
|
||||
# Run the method on each of the translators defined on the config_map
|
||||
tasks = []
|
||||
for t_class in self._translators:
|
||||
translator = t_class()
|
||||
method = getattr(translator, dispatch_map[event_type])
|
||||
task = self._event_loop.create_task(method(event))
|
||||
task.add_done_callback(
|
||||
functools.partial(self._update_annotation, self_link,
|
||||
translator.get_annotation()))
|
||||
tasks.append(task)
|
||||
asyncio.wait(tasks)
|
||||
|
||||
@property
|
||||
def api_endpoint(self):
|
||||
k8s_root = config.CONF.kubernetes.api_root
|
||||
return k8s_root + self.get_api_endpoint() + "?watch=true"
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_api_endpoint(self):
|
||||
pass
|
||||
|
||||
async def watch(self):
|
||||
"""Watches the endpoint and calls the callback with its response.
|
||||
|
||||
This is an endless task that keeps the event loop running forever
|
||||
"""
|
||||
connection = await self._get_chunked_connection()
|
||||
while True:
|
||||
content = await connection.read_line()
|
||||
LOG.debug('Received new event from %(watcher)s:\n\n\t'
|
||||
'%(event)s.\n\n',
|
||||
{'watcher': self.__class__.__name__,
|
||||
'event': str(content)})
|
||||
await self._on_event(content)
|
||||
|
||||
|
||||
class PodWatcher(AbstractBaseWatcher):
|
||||
"""Watch the Pod endpoints on K8s API."""
|
||||
|
||||
ENDPOINT = "/api/v1/pods"
|
||||
|
||||
def __init__(self, event_loop, translators):
|
||||
super().__init__(event_loop, translators)
|
||||
|
||||
def get_api_endpoint(self):
|
||||
return self.ENDPOINT
|
@ -13,6 +13,8 @@ classifier =
|
||||
License :: OSI Approved :: Apache Software License
|
||||
Operating System :: POSIX :: Linux
|
||||
Programming Language :: Python
|
||||
Programming Language :: Python :: 2
|
||||
Programming Language :: Python :: 2.7
|
||||
Programming Language :: Python :: 3
|
||||
Programming Language :: Python :: 3.4
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user