diff --git a/marconi/proxy/__init__.py b/marconi/proxy/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/proxy/app.py b/marconi/proxy/app.py new file mode 100644 index 000000000..4a0fe54ff --- /dev/null +++ b/marconi/proxy/app.py @@ -0,0 +1,59 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""marconi-proxy: maintains a mapping from inserted queues to partitions. + +Supports the following operator API: +- [GET] /v1/partitions - lists registered partitions +- [PUT|GET|DELETE] /v1/partitions/{partition} +- [GET] /v1/catalogue + +Deploy requirements: +- redis-server, default port +- gunicorn +- python >= 2.7 +- falcon +- msgpack +- requests + +Running: +- gunicorn marconi.proxy.app:app +""" +import falcon +import redis + +from marconi.proxy.resources import catalogue +from marconi.proxy.resources import partitions +from marconi.proxy.resources import queues + +app = falcon.API() +client = redis.StrictRedis() + +# TODO(cpp-cabrera): don't encode API version in routes - +# let's handle this elsewhere +# NOTE(cpp-cabrera): Proxy-specific routes +app.add_route('/v1/partitions', + partitions.Listing(client)) +app.add_route('/v1/partitions/{partition}', + partitions.Resource(client)) +app.add_route('/v1/catalogue', + catalogue.Listing(client)) +app.add_route('/v1/catalogue/{queue}', + catalogue.Resource(client)) + +# NOTE(cpp-cabrera): queue handling routes +app.add_route('/v1/queues', + queues.Listing(client)) +app.add_route('/v1/queues/{queue}', + queues.Resource(client)) diff --git a/marconi/proxy/resources/__init__.py b/marconi/proxy/resources/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/proxy/resources/catalogue.py b/marconi/proxy/resources/catalogue.py new file mode 100644 index 000000000..960b44bb6 --- /dev/null +++ b/marconi/proxy/resources/catalogue.py @@ -0,0 +1,92 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""catalogue: maintains a directory of all queues proxied through the system + +Storage maintains an entry for each queue as follows: + +{ + q.{project}.{queue}: {'h': ByteString, 'n': ByteString, 'm': MsgPack} +} + +"m" -> metadata +"n" -> name +"h" -> HTTP host + +A list of all queues is also stored as: + +{ + qs.{project}: [{name}, {name}, {name}] +} +""" +import json + +import falcon +import msgpack + +from marconi.proxy.utils import helpers + + +class Listing(object): + """A listing of all entries in the catalogue.""" + def __init__(self, client): + self.client = client + + def on_get(self, request, response): + project = helpers.get_project(request) + key = 'qs.%s' % project + if not self.client.exists(key): + response.status = falcon.HTTP_204 + return + + resp = {} + for q in self.client.lrange(key, 0, -1): + hkey = 'q.%s.%s' % (project, q.decode('utf8')) + queue = q.decode('utf8') + h, n, m = self.client.hmget(hkey, ['h', 'n', 'm']) + if not all([h, n]): + continue + + resp[queue] = { + 'host': h.decode('utf8'), + 'name': n.decode('utf8') + } + resp[queue]['metadata'] = msgpack.loads(m) if m else {} + + if not resp: + response.status = falcon.HTTP_204 + return + + response.status = falcon.HTTP_200 + response.body = json.dumps(resp, ensure_ascii=False) + + +class Resource(object): + """A single catalogue entry.""" + def __init__(self, client): + self.client = client + + def on_get(self, request, response, queue): + key = 'q.%s.%s' % (helpers.get_project(request), queue) + if not self.client.exists(key): + raise falcon.HTTPNotFound() + h, n, m = self.client.hmget(key, ['h', 'n', 'm']) + resp = { + 'name': n.decode('utf8'), + 'host': h.decode('utf8'), + } + resp['metadata'] = msgpack.loads(m) if m else {} + + response.status = falcon.HTTP_200 + response.body = json.dumps(resp, ensure_ascii=False) diff --git a/marconi/proxy/resources/partitions.py b/marconi/proxy/resources/partitions.py new file mode 100644 index 000000000..b94f9ae7a --- /dev/null +++ b/marconi/proxy/resources/partitions.py @@ -0,0 +1,148 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""partitions: a registry of all marconi partitions this proxy can route to + +A partition is added by an operator by interacting with the +partition-related endpoints. When specifying a partition, the +following fields are required: + +{ + "name": String, + "weight": Integer, + "nodes": [HTTP_EndPoints(:Port), ...] +} + +In storage, a partition entry looks like: + +{ + "p.{name}": {"n": ByteString, "w": ByteString, "n": MsgPack} +} + +Storage also maintains a list of partitions as: +{ + "ps": [{name}, {name}, {name}, ...] +} +""" +import json + +import falcon +import msgpack + + +class Listing(object): + """A listing of all partitions registered.""" + def __init__(self, client): + self.client = client + + def on_get(self, request, response): + partitions = self.client.lrange('ps', 0, -1) + resp = {} + for p in partitions: + key = 'p.%s' % p.decode('utf8') + n, w = self.client.hmget(key, ['n', 'w']) + if not all([n, w]): + continue + resp[p.decode('utf8')] = {'weight': int(w), + 'nodes': [node.decode('utf8') for node + in msgpack.loads(n)]} + + if not resp: + response.status = falcon.HTTP_204 + return + + response.body = json.dumps(resp, ensure_ascii=False) + response.status = falcon.HTTP_200 + + +class Resource(object): + """A means to interact with individual partitions.""" + def __init__(self, client): + self.client = client + + def on_get(self, request, response, partition): + n, w = self.client.hmget('p.%s' % partition, ['n', 'w']) + + if not all([n, w]): # ensure all the data was returned correctly + raise falcon.HTTPNotFound() + + nodes, weight = msgpack.loads(n), int(w) + response.body = json.dumps({ + 'nodes': [node.decode('utf8') for node in nodes], + 'weight': weight, + }, ensure_ascii=False) + + def _validate_put(self, data): + if not isinstance(data, dict): + raise falcon.HTTPBadRequest( + 'Invalid metadata', 'Define a partition as a dict' + ) + + if 'nodes' not in data: + raise falcon.HTTPBadRequest( + 'Missing nodes list', 'Provide a list of nodes' + ) + + if not data['nodes']: + raise falcon.HTTPBadRequest( + 'Empty nodes list', 'Nodes list cannot be empty' + ) + + if not isinstance(data['nodes'], list): + raise falcon.HTTPBadRequest( + 'Invalid nodes', 'Nodes must be a list of URLs' + ) + + # TODO(cpp-cabrera): check [str] + + if 'weight' not in data: + raise falcon.HTTPBadRequest( + 'Missing weight', + 'Provide an integer weight for this partition' + ) + + if not isinstance(data['weight'], int): + raise falcon.HTTPBadRequest( + 'Invalid weight', 'Weight must be an integer' + ) + + def on_put(self, request, response, partition): + if partition.startswith('_'): + raise falcon.HTTPBadRequest( + 'Reserved name', '_names are reserved for internal use' + ) + + key = 'p.%s' % partition + if self.client.exists(key): + response.status = falcon.HTTP_204 + return + + try: + data = json.loads(request.stream.read().decode('utf8')) + except ValueError: + raise falcon.HTTPBadRequest( + 'Invalid JSON', 'This is not a valid JSON stream.' + ) + + self._validate_put(data) + self.client.hmset(key, {'n': msgpack.dumps(data['nodes']), + 'w': data['weight'], + 'c': 0}) + self.client.rpush('ps', partition) + response.status = falcon.HTTP_201 + + def on_delete(self, request, response, partition): + self.client.delete('p.%s' % partition) + self.client.lrem('ps', 1, partition) + response.status = falcon.HTTP_204 diff --git a/marconi/proxy/resources/queues.py b/marconi/proxy/resources/queues.py new file mode 100644 index 000000000..f909befc2 --- /dev/null +++ b/marconi/proxy/resources/queues.py @@ -0,0 +1,149 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""queues: routing and cataloguing queue operations on marconi + +The queues resource performs routing to a marconi partition for +requests targeting queues. + +For the case of a queue listing, the prooxy handles the request in its +entirety, since queues for a given project may be spread across +multiple partitions. This requires the proxy catalogue being +consistent with the state of the entire deployment. + +For the case of accessing a particular queue, the catalogue is updated +based on the operation. A DELETE removes entries from the catalogue. A +PUT adds an entry to the catalogue. A GET asks marconi for an +authoritative response. +""" +import collections +import json + +import falcon +import msgpack +import requests + +from marconi.proxy.utils import helpers +from marconi.proxy.utils import http +from marconi.proxy.utils import node + + +class Listing(object): + """Responsible for constructing a valid marconi queue listing + from the content stored in the catalogue. + """ + def __init__(self, client): + self.client = client + + def on_get(self, request, response): + project = helpers.get_project(request) + key = 'qs.%s' % project + if not self.client.exists(key): + response.status = falcon.HTTP_204 + return + + kwargs = {} + request.get_param('marker', store=kwargs) + request.get_param_as_int('limit', store=kwargs) + request.get_param_as_bool('detailed', store=kwargs) + + resp = collections.defaultdict(list) + for q in sorted(self.client.lrange(key, 0, -1)): + queue = q.decode('utf8') + if queue < kwargs.get('marker', 0): + continue + entry = { + 'href': request.path + '/' + queue, + 'name': queue + } + if kwargs.get('detailed', None): + qkey = 'q.%s.%s' % (project, queue) + data = self.client.hget(qkey, 'm') + metadata = msgpack.loads(data) + entry['metadata'] = metadata + resp['queues'].append(entry) + kwargs['marker'] = queue + if len(resp['queues']) == kwargs.get('limit', None): + break + + if not resp: + response.status = falcon.HTTP_204 + return + + resp['links'].append({ + 'rel': 'next', + 'href': request.path + falcon.to_query_str(kwargs) + }) + + response.content_location = request.relative_uri + response.body = json.dumps(resp, ensure_ascii=False) + + +class Resource(object): + def __init__(self, client): + self.client = client + + def _make_key(self, request, queue): + project = helpers.get_project(request) + return 'q.%s.%s' % (project, queue) + + def on_get(self, request, response, queue): + key = self._make_key(request, queue) + if not self.client.exists(key): + raise falcon.HTTPNotFound() + + h, n = self.client.hmget(key, ['h', 'n']) + if not (h and n): + raise falcon.HTTPNotFound() + + resp = helpers.forward(self.client, request, queue) + response.set_headers(resp.headers) + response.status = http.status(resp.status_code) + response.body = resp.content + + def on_put(self, request, response, queue): + key = self._make_key(request, queue) + project = helpers.get_project(request) + if self.client.exists(key): + response.status = falcon.HTTP_204 + return + + partition = node.weighted_select(self.client) + host = node.round_robin(self.client, partition) + url = '{host}/v1/queues/{queue}'.format(host=host, queue=queue) + resp = requests.put(url, headers=request._headers) + + # NOTE(cpp-cabrera): only catalogue a queue if a request is good + if resp.ok: + self.client.hmset(key, { + 'h': host, + 'n': queue + }) + self.client.rpush('qs.%s' % project, queue) + + response.status = http.status(resp.status_code) + response.body = resp.content + + def on_delete(self, request, response, queue): + key = self._make_key(request, queue) + + project = helpers.get_project(request) + resp = helpers.forward(self.client, request, queue) + response.set_headers(resp.headers) + response.status = http.status(resp.status_code) + + # avoid deleting a queue if the request is bad + if not resp.ok: + self.client.hdel(key, queue) + self.client.lrem('qs.%s' % project, 1, queue) diff --git a/marconi/proxy/utils/__init__.py b/marconi/proxy/utils/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/marconi/proxy/utils/helpers.py b/marconi/proxy/utils/helpers.py new file mode 100644 index 000000000..1f9898c5d --- /dev/null +++ b/marconi/proxy/utils/helpers.py @@ -0,0 +1,69 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""helpers: utilities for performing common operations for resources.""" +import falcon +import msgpack +import requests + + +def get_first_host(client): + """Returns the first host from the first partition.""" + try: + partition = next(p.decode('utf8') for p in + client.lrange('ps', 0, 0)) + except StopIteration: + raise falcon.HTTPNotFound('No partitions registered') + key = 'p.%s' % partition + ns = msgpack.loads(client.hget(key, 'n')) + return next(n.decode('utf8') for n in ns) + + +def get_host_by_project_and_queue(client, project, queue): + """Fetches the host address for a given project and queue. + + :returns: a host address as stored or None if not found + """ + key = 'q.%s.%s' % (project, queue) + if not client.exists(key): + return None + return client.hget(key, 'h').decode('utf8') + + +def get_project(request): + """Retrieves the Project-Id header from a request. + + :returns: The Project-Id value or '_' if not provided + """ + return request.get_header('x_project_id') or '_' + + +def forward(client, request, queue): + """Forwards a request to the appropriate host based on the location + of a given queue. + + :returns: a python-requests response object + :raises: falcon.HTTPNotFound if the queue cannot be found in the catalogue + """ + project = get_project(request) + host = get_host_by_project_and_queue(client, project, queue) + if not host: + raise falcon.HTTPNotFound() + url = host + request.path + if request.query_string: + url += '?' + request.query_string + method = request.method.lower() + resp = requests.request(method, url, headers=request._headers, + data=request.stream.read()) + return resp diff --git a/marconi/proxy/utils/http.py b/marconi/proxy/utils/http.py new file mode 100644 index 000000000..2d3153857 --- /dev/null +++ b/marconi/proxy/utils/http.py @@ -0,0 +1,29 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""http: utilities for handling HTTP details.""" +import falcon + + +_code_map = dict((int(v.split()[0]), v) + for k, v in falcon.status_codes.__dict__.items() + if k.startswith('HTTP_')) + + +def status(code): + """Maps an integer HTTP status code to a friendly HTTP status message + + :raises: KeyError for an unknown HTTP status code + """ + return _code_map[code] diff --git a/marconi/proxy/utils/node.py b/marconi/proxy/utils/node.py new file mode 100644 index 000000000..0ee1a93ff --- /dev/null +++ b/marconi/proxy/utils/node.py @@ -0,0 +1,63 @@ +# Copyright (c) 2013 Rackspace Hosting, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""node: utilities for implementing partition and node selections.""" +import random + +import msgpack + + +def weighted_select(client): + """Select a partition from all the partitions registered using a weighted + selection algorithm. + + :raises: RuntimeError if no partitions are registered + """ + acc = 0 + lookup = [] + + # TODO(cpp-cabrera): the lookup table can be constructed once each time + # an entry is added/removed to/from the catalogue, + # rather than each time a queue is created. + # construct the (partition, weight) lookup table + for p in client.lrange('ps', 0, -1): + key = 'p.%s' % p.decode('utf8') + w = client.hget(key, 'w') + acc += int(w) + lookup.append((p.decode('utf8'), acc)) + + # select a partition from the lookup table + selector = random.randint(0, acc - 1) + last = 0 + for p, w in lookup: + weight = int(w) + if selector >= last and selector < weight: + return p + last = weight + + raise RuntimeError('No partition could be selected - are any registered?') + + +def round_robin(client, partition): + """Select a node in this partition and update the round robin index. + + :returns: the address of a given node + :side-effect: updates the current index in the storage node for + this partition + """ + n, c = client.hmget('p.%s' % partition, ['n', 'c']) + nodes = [entry.decode('utf8') for entry in msgpack.loads(n)] + current = int(c) + client.hset('p.%s' % partition, 'c', (current + 1) % len(nodes)) + return nodes[current]