diff --git a/requirements-py3.txt b/requirements-py3.txt index b047a079..d2e4dc6a 100644 --- a/requirements-py3.txt +++ b/requirements-py3.txt @@ -22,3 +22,4 @@ oslo.i18n>=1.3.0 # Apache-2.0 oslo.utils>=1.2.0 # Apache-2.0 SQLAlchemy>=0.9.7,<=0.9.99 enum34 +autobahn>=0.10.1 diff --git a/requirements.txt b/requirements.txt index df0f1996..51e538fc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -21,3 +21,5 @@ oslo.serialization>=1.2.0 # Apache-2.0 oslo.utils>=1.2.0 # Apache-2.0 SQLAlchemy>=0.9.7,<=0.9.99 enum34 +trollius>=1.0 +autobahn>=0.10.1 diff --git a/setup.cfg b/setup.cfg index 8b728f6c..0a842414 100644 --- a/setup.cfg +++ b/setup.cfg @@ -54,6 +54,7 @@ zaqar.control.storage = zaqar.transport = wsgi = zaqar.transport.wsgi.driver:Driver + websocket = zaqar.transport.websocket.driver:Driver zaqar.openstack.common.cache.backends = memory = zaqar.openstack.common.cache._backends.memory:MemoryBackend @@ -66,6 +67,7 @@ oslo.config.opts = zaqar.storage.redis = zaqar.storage.redis.options:_config_options zaqar.storage.sqlalchemy = zaqar.storage.sqlalchemy.options:_config_options zaqar.transport.wsgi = zaqar.transport.wsgi.driver:_config_options + zaqar.transport.websocket = zaqar.transport.websocket.driver:_config_options zaqar.transport.base = zaqar.transport.base:_config_options zaqar.transport.validation = zaqar.transport.validation:_config_options diff --git a/zaqar/transport/websocket/__init__.py b/zaqar/transport/websocket/__init__.py new file mode 100644 index 00000000..92959c51 --- /dev/null +++ b/zaqar/transport/websocket/__init__.py @@ -0,0 +1,21 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Websocket Transport Driver""" + +from zaqar.transport.websocket import driver + +# Hoist into package namespace +Driver = driver.Driver diff --git a/zaqar/transport/websocket/driver.py b/zaqar/transport/websocket/driver.py new file mode 100644 index 00000000..666a509f --- /dev/null +++ b/zaqar/transport/websocket/driver.py @@ -0,0 +1,80 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from autobahn.asyncio import websocket +from oslo.config import cfg + +try: + import asyncio +except ImportError: + import trollius as asyncio + +from zaqar.i18n import _ +import zaqar.openstack.common.log as logging +from zaqar.transport.websocket import protocol + +_WS_OPTIONS = ( + cfg.StrOpt('bind', default='127.0.0.1', + help='Address on which the self-hosting server will listen.'), + + cfg.IntOpt('port', default=9000, + help='Port on which the self-hosting server will listen.'), + + cfg.BoolOpt('debug', default=False, help='Print debugging output') +) + +_WS_GROUP = 'drivers:transport:websocket' + +LOG = logging.getLogger(__name__) + + +def _config_options(): + return [(_WS_GROUP, _WS_OPTIONS)] + + +class Driver(object): + + def __init__(self, conf, api, cache): + self._conf = conf + self._api = api + self._cache = cache + + self._conf.register_opts(_WS_OPTIONS, group=_WS_GROUP) + self._ws_conf = self._conf[_WS_GROUP] + + def listen(self): + """Self-host using 'bind' and 'port' from the WS config group.""" + + msgtmpl = _(u'Serving on host %(bind)s:%(port)s') + LOG.info(msgtmpl, + {'bind': self._ws_conf.bind, 'port': self._ws_conf.port}) + + uri = 'ws://' + self._ws_conf.bind + ':' + str(self._ws_conf.port) + factory = websocket.WebSocketServerFactory(uri, + debug=self._ws_conf.debug) + factory.protocol = protocol.MessagingProtocol + + loop = asyncio.get_event_loop() + coro = loop.create_server(factory, self._ws_conf.bind, + self._ws_conf.port) + server = loop.run_until_complete(coro) + + try: + loop.run_forever() + except KeyboardInterrupt: + pass + finally: + server.close() + loop.close() diff --git a/zaqar/transport/websocket/protocol.py b/zaqar/transport/websocket/protocol.py new file mode 100644 index 00000000..4ccda433 --- /dev/null +++ b/zaqar/transport/websocket/protocol.py @@ -0,0 +1,35 @@ +# Copyright (c) 2015 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from autobahn.asyncio import websocket + + +class MessagingProtocol(websocket.WebSocketServerProtocol): + + def onConnect(self, request): + print("Client connecting: {0}".format(request.peer)) + + def onOpen(self): + print("WebSocket connection open.") + + def onMessage(self, payload, isBinary): + if isBinary: + print("Binary message received: {0} bytes".format(len(payload))) + else: + print("Text message received: {0}".format(payload.decode('utf8'))) + self.sendMessage(payload, isBinary) + + def onClose(self, wasClean, code, reason): + print("WebSocket connection closed: {0}".format(reason))