feat: marconi proxy

This patchset introduces the marconi-proxy service. The service makes
it possible to abstract marconi deployments into partitions, as
defined in the referenced blue print.

This reference implementation uses falcon to manage WSGI aspects,
redis for catalogue storage, and recommends gunicorn or uwsgi as the
server.

Includes:
- registering partitions
- cataloguing queues
- forwarding requests to the appropriate marconi partitions
- round-robin selection of partition nodes
- weighted selection of partitions at queue creation time
- queue listing w/ metadata (GET /v1/queues?detailed=true)

Still needed, working on:
- unit tests

A few things that will be handled in later patchsets
- queue metadata handling + metadata endpoint (2)
- request forwarding to marconi (3)
- hierarchical caching with authoritative persistent storage (4)
- regeneration of the catalogue from authoritative cache (5)

Change-Id: I5dabc92497f3edf5bc32d58c8a2c4e43ff9833a3
Implements: blueprint placement-service
This commit is contained in:
Alejandro Cabrera 2013-08-29 16:55:04 -04:00
parent 9b6350afa5
commit 030c309476
10 changed files with 609 additions and 0 deletions

View File

59
marconi/proxy/app.py Normal file
View File

@ -0,0 +1,59 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""marconi-proxy: maintains a mapping from inserted queues to partitions.
Supports the following operator API:
- [GET] /v1/partitions - lists registered partitions
- [PUT|GET|DELETE] /v1/partitions/{partition}
- [GET] /v1/catalogue
Deploy requirements:
- redis-server, default port
- gunicorn
- python >= 2.7
- falcon
- msgpack
- requests
Running:
- gunicorn marconi.proxy.app:app
"""
import falcon
import redis
from marconi.proxy.resources import catalogue
from marconi.proxy.resources import partitions
from marconi.proxy.resources import queues
app = falcon.API()
client = redis.StrictRedis()
# TODO(cpp-cabrera): don't encode API version in routes -
# let's handle this elsewhere
# NOTE(cpp-cabrera): Proxy-specific routes
app.add_route('/v1/partitions',
partitions.Listing(client))
app.add_route('/v1/partitions/{partition}',
partitions.Resource(client))
app.add_route('/v1/catalogue',
catalogue.Listing(client))
app.add_route('/v1/catalogue/{queue}',
catalogue.Resource(client))
# NOTE(cpp-cabrera): queue handling routes
app.add_route('/v1/queues',
queues.Listing(client))
app.add_route('/v1/queues/{queue}',
queues.Resource(client))

View File

View File

@ -0,0 +1,92 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""catalogue: maintains a directory of all queues proxied through the system
Storage maintains an entry for each queue as follows:
{
q.{project}.{queue}: {'h': ByteString, 'n': ByteString, 'm': MsgPack}
}
"m" -> metadata
"n" -> name
"h" -> HTTP host
A list of all queues is also stored as:
{
qs.{project}: [{name}, {name}, {name}]
}
"""
import json
import falcon
import msgpack
from marconi.proxy.utils import helpers
class Listing(object):
"""A listing of all entries in the catalogue."""
def __init__(self, client):
self.client = client
def on_get(self, request, response):
project = helpers.get_project(request)
key = 'qs.%s' % project
if not self.client.exists(key):
response.status = falcon.HTTP_204
return
resp = {}
for q in self.client.lrange(key, 0, -1):
hkey = 'q.%s.%s' % (project, q.decode('utf8'))
queue = q.decode('utf8')
h, n, m = self.client.hmget(hkey, ['h', 'n', 'm'])
if not all([h, n]):
continue
resp[queue] = {
'host': h.decode('utf8'),
'name': n.decode('utf8')
}
resp[queue]['metadata'] = msgpack.loads(m) if m else {}
if not resp:
response.status = falcon.HTTP_204
return
response.status = falcon.HTTP_200
response.body = json.dumps(resp, ensure_ascii=False)
class Resource(object):
"""A single catalogue entry."""
def __init__(self, client):
self.client = client
def on_get(self, request, response, queue):
key = 'q.%s.%s' % (helpers.get_project(request), queue)
if not self.client.exists(key):
raise falcon.HTTPNotFound()
h, n, m = self.client.hmget(key, ['h', 'n', 'm'])
resp = {
'name': n.decode('utf8'),
'host': h.decode('utf8'),
}
resp['metadata'] = msgpack.loads(m) if m else {}
response.status = falcon.HTTP_200
response.body = json.dumps(resp, ensure_ascii=False)

View File

@ -0,0 +1,148 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""partitions: a registry of all marconi partitions this proxy can route to
A partition is added by an operator by interacting with the
partition-related endpoints. When specifying a partition, the
following fields are required:
{
"name": String,
"weight": Integer,
"nodes": [HTTP_EndPoints(:Port), ...]
}
In storage, a partition entry looks like:
{
"p.{name}": {"n": ByteString, "w": ByteString, "n": MsgPack}
}
Storage also maintains a list of partitions as:
{
"ps": [{name}, {name}, {name}, ...]
}
"""
import json
import falcon
import msgpack
class Listing(object):
"""A listing of all partitions registered."""
def __init__(self, client):
self.client = client
def on_get(self, request, response):
partitions = self.client.lrange('ps', 0, -1)
resp = {}
for p in partitions:
key = 'p.%s' % p.decode('utf8')
n, w = self.client.hmget(key, ['n', 'w'])
if not all([n, w]):
continue
resp[p.decode('utf8')] = {'weight': int(w),
'nodes': [node.decode('utf8') for node
in msgpack.loads(n)]}
if not resp:
response.status = falcon.HTTP_204
return
response.body = json.dumps(resp, ensure_ascii=False)
response.status = falcon.HTTP_200
class Resource(object):
"""A means to interact with individual partitions."""
def __init__(self, client):
self.client = client
def on_get(self, request, response, partition):
n, w = self.client.hmget('p.%s' % partition, ['n', 'w'])
if not all([n, w]): # ensure all the data was returned correctly
raise falcon.HTTPNotFound()
nodes, weight = msgpack.loads(n), int(w)
response.body = json.dumps({
'nodes': [node.decode('utf8') for node in nodes],
'weight': weight,
}, ensure_ascii=False)
def _validate_put(self, data):
if not isinstance(data, dict):
raise falcon.HTTPBadRequest(
'Invalid metadata', 'Define a partition as a dict'
)
if 'nodes' not in data:
raise falcon.HTTPBadRequest(
'Missing nodes list', 'Provide a list of nodes'
)
if not data['nodes']:
raise falcon.HTTPBadRequest(
'Empty nodes list', 'Nodes list cannot be empty'
)
if not isinstance(data['nodes'], list):
raise falcon.HTTPBadRequest(
'Invalid nodes', 'Nodes must be a list of URLs'
)
# TODO(cpp-cabrera): check [str]
if 'weight' not in data:
raise falcon.HTTPBadRequest(
'Missing weight',
'Provide an integer weight for this partition'
)
if not isinstance(data['weight'], int):
raise falcon.HTTPBadRequest(
'Invalid weight', 'Weight must be an integer'
)
def on_put(self, request, response, partition):
if partition.startswith('_'):
raise falcon.HTTPBadRequest(
'Reserved name', '_names are reserved for internal use'
)
key = 'p.%s' % partition
if self.client.exists(key):
response.status = falcon.HTTP_204
return
try:
data = json.loads(request.stream.read().decode('utf8'))
except ValueError:
raise falcon.HTTPBadRequest(
'Invalid JSON', 'This is not a valid JSON stream.'
)
self._validate_put(data)
self.client.hmset(key, {'n': msgpack.dumps(data['nodes']),
'w': data['weight'],
'c': 0})
self.client.rpush('ps', partition)
response.status = falcon.HTTP_201
def on_delete(self, request, response, partition):
self.client.delete('p.%s' % partition)
self.client.lrem('ps', 1, partition)
response.status = falcon.HTTP_204

View File

@ -0,0 +1,149 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""queues: routing and cataloguing queue operations on marconi
The queues resource performs routing to a marconi partition for
requests targeting queues.
For the case of a queue listing, the prooxy handles the request in its
entirety, since queues for a given project may be spread across
multiple partitions. This requires the proxy catalogue being
consistent with the state of the entire deployment.
For the case of accessing a particular queue, the catalogue is updated
based on the operation. A DELETE removes entries from the catalogue. A
PUT adds an entry to the catalogue. A GET asks marconi for an
authoritative response.
"""
import collections
import json
import falcon
import msgpack
import requests
from marconi.proxy.utils import helpers
from marconi.proxy.utils import http
from marconi.proxy.utils import node
class Listing(object):
"""Responsible for constructing a valid marconi queue listing
from the content stored in the catalogue.
"""
def __init__(self, client):
self.client = client
def on_get(self, request, response):
project = helpers.get_project(request)
key = 'qs.%s' % project
if not self.client.exists(key):
response.status = falcon.HTTP_204
return
kwargs = {}
request.get_param('marker', store=kwargs)
request.get_param_as_int('limit', store=kwargs)
request.get_param_as_bool('detailed', store=kwargs)
resp = collections.defaultdict(list)
for q in sorted(self.client.lrange(key, 0, -1)):
queue = q.decode('utf8')
if queue < kwargs.get('marker', 0):
continue
entry = {
'href': request.path + '/' + queue,
'name': queue
}
if kwargs.get('detailed', None):
qkey = 'q.%s.%s' % (project, queue)
data = self.client.hget(qkey, 'm')
metadata = msgpack.loads(data)
entry['metadata'] = metadata
resp['queues'].append(entry)
kwargs['marker'] = queue
if len(resp['queues']) == kwargs.get('limit', None):
break
if not resp:
response.status = falcon.HTTP_204
return
resp['links'].append({
'rel': 'next',
'href': request.path + falcon.to_query_str(kwargs)
})
response.content_location = request.relative_uri
response.body = json.dumps(resp, ensure_ascii=False)
class Resource(object):
def __init__(self, client):
self.client = client
def _make_key(self, request, queue):
project = helpers.get_project(request)
return 'q.%s.%s' % (project, queue)
def on_get(self, request, response, queue):
key = self._make_key(request, queue)
if not self.client.exists(key):
raise falcon.HTTPNotFound()
h, n = self.client.hmget(key, ['h', 'n'])
if not (h and n):
raise falcon.HTTPNotFound()
resp = helpers.forward(self.client, request, queue)
response.set_headers(resp.headers)
response.status = http.status(resp.status_code)
response.body = resp.content
def on_put(self, request, response, queue):
key = self._make_key(request, queue)
project = helpers.get_project(request)
if self.client.exists(key):
response.status = falcon.HTTP_204
return
partition = node.weighted_select(self.client)
host = node.round_robin(self.client, partition)
url = '{host}/v1/queues/{queue}'.format(host=host, queue=queue)
resp = requests.put(url, headers=request._headers)
# NOTE(cpp-cabrera): only catalogue a queue if a request is good
if resp.ok:
self.client.hmset(key, {
'h': host,
'n': queue
})
self.client.rpush('qs.%s' % project, queue)
response.status = http.status(resp.status_code)
response.body = resp.content
def on_delete(self, request, response, queue):
key = self._make_key(request, queue)
project = helpers.get_project(request)
resp = helpers.forward(self.client, request, queue)
response.set_headers(resp.headers)
response.status = http.status(resp.status_code)
# avoid deleting a queue if the request is bad
if not resp.ok:
self.client.hdel(key, queue)
self.client.lrem('qs.%s' % project, 1, queue)

View File

View File

@ -0,0 +1,69 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""helpers: utilities for performing common operations for resources."""
import falcon
import msgpack
import requests
def get_first_host(client):
"""Returns the first host from the first partition."""
try:
partition = next(p.decode('utf8') for p in
client.lrange('ps', 0, 0))
except StopIteration:
raise falcon.HTTPNotFound('No partitions registered')
key = 'p.%s' % partition
ns = msgpack.loads(client.hget(key, 'n'))
return next(n.decode('utf8') for n in ns)
def get_host_by_project_and_queue(client, project, queue):
"""Fetches the host address for a given project and queue.
:returns: a host address as stored or None if not found
"""
key = 'q.%s.%s' % (project, queue)
if not client.exists(key):
return None
return client.hget(key, 'h').decode('utf8')
def get_project(request):
"""Retrieves the Project-Id header from a request.
:returns: The Project-Id value or '_' if not provided
"""
return request.get_header('x_project_id') or '_'
def forward(client, request, queue):
"""Forwards a request to the appropriate host based on the location
of a given queue.
:returns: a python-requests response object
:raises: falcon.HTTPNotFound if the queue cannot be found in the catalogue
"""
project = get_project(request)
host = get_host_by_project_and_queue(client, project, queue)
if not host:
raise falcon.HTTPNotFound()
url = host + request.path
if request.query_string:
url += '?' + request.query_string
method = request.method.lower()
resp = requests.request(method, url, headers=request._headers,
data=request.stream.read())
return resp

View File

@ -0,0 +1,29 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""http: utilities for handling HTTP details."""
import falcon
_code_map = dict((int(v.split()[0]), v)
for k, v in falcon.status_codes.__dict__.items()
if k.startswith('HTTP_'))
def status(code):
"""Maps an integer HTTP status code to a friendly HTTP status message
:raises: KeyError for an unknown HTTP status code
"""
return _code_map[code]

View File

@ -0,0 +1,63 @@
# Copyright (c) 2013 Rackspace Hosting, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""node: utilities for implementing partition and node selections."""
import random
import msgpack
def weighted_select(client):
"""Select a partition from all the partitions registered using a weighted
selection algorithm.
:raises: RuntimeError if no partitions are registered
"""
acc = 0
lookup = []
# TODO(cpp-cabrera): the lookup table can be constructed once each time
# an entry is added/removed to/from the catalogue,
# rather than each time a queue is created.
# construct the (partition, weight) lookup table
for p in client.lrange('ps', 0, -1):
key = 'p.%s' % p.decode('utf8')
w = client.hget(key, 'w')
acc += int(w)
lookup.append((p.decode('utf8'), acc))
# select a partition from the lookup table
selector = random.randint(0, acc - 1)
last = 0
for p, w in lookup:
weight = int(w)
if selector >= last and selector < weight:
return p
last = weight
raise RuntimeError('No partition could be selected - are any registered?')
def round_robin(client, partition):
"""Select a node in this partition and update the round robin index.
:returns: the address of a given node
:side-effect: updates the current index in the storage node for
this partition
"""
n, c = client.hmget('p.%s' % partition, ['n', 'c'])
nodes = [entry.decode('utf8') for entry in msgpack.loads(n)]
current = int(c)
client.hset('p.%s' % partition, 'c', (current + 1) % len(nodes))
return nodes[current]