tacker/neutron/agent/rpc.py
Isaku Yamahata 8aff29509f import neutron master of cba140daccd7c4f715263cda422d5cec27af069d
Merge Neutron master branch to tacker master branch with modification
of tox.ini and .gitreview.
This patch imports the following change set of Neutron.

  > commit cba140daccd7c4f715263cda422d5cec27af069d
  > Merge: 63d8237 6bed4a0
  > Author: Jenkins <jenkins@review.openstack.org>
  > Date:   Sun Jun 22 16:02:56 2014 +0000
  >
  >     Merge "Adding static routes data for members"

Change-Id: I5a0f522bc20530c46e35dc9e03fe72d72ad04577
2014-07-01 17:11:09 +09:00

113 lines
4.1 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from neutron.common import rpc_compat
from neutron.common import topics
from neutron.openstack.common import log as logging
from neutron.openstack.common import timeutils
LOG = logging.getLogger(__name__)
def create_consumers(endpoints, prefix, topic_details):
"""Create agent RPC consumers.
:param endpoints: The list of endpoints to process the incoming messages.
:param prefix: Common prefix for the plugin/agent message queues.
:param topic_details: A list of topics. Each topic has a name, an
operation, and an optional host param keying the
subscription to topic.host for plugin calls.
:returns: A common Connection.
"""
connection = rpc_compat.create_connection(new=True)
for details in topic_details:
topic, operation, node_name = itertools.islice(
itertools.chain(details, [None]), 3)
topic_name = topics.get_topic_name(prefix, topic, operation)
connection.create_consumer(topic_name, endpoints, fanout=True)
if node_name:
node_topic_name = '%s.%s' % (topic_name, node_name)
connection.create_consumer(node_topic_name,
endpoints,
fanout=False)
connection.consume_in_threads()
return connection
class PluginReportStateAPI(rpc_compat.RpcProxy):
BASE_RPC_API_VERSION = '1.0'
def __init__(self, topic):
super(PluginReportStateAPI, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def report_state(self, context, agent_state, use_call=False):
msg = self.make_msg('report_state',
agent_state={'agent_state':
agent_state},
time=timeutils.strtime())
if use_call:
return self.call(context, msg, topic=self.topic)
else:
return self.cast(context, msg, topic=self.topic)
class PluginApi(rpc_compat.RpcProxy):
'''Agent side of the rpc API.
API version history:
1.0 - Initial version.
'''
BASE_RPC_API_VERSION = '1.1'
def __init__(self, topic):
super(PluginApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
def get_device_details(self, context, device, agent_id):
return self.call(context,
self.make_msg('get_device_details', device=device,
agent_id=agent_id),
topic=self.topic)
def update_device_down(self, context, device, agent_id, host=None):
return self.call(context,
self.make_msg('update_device_down', device=device,
agent_id=agent_id, host=host),
topic=self.topic)
def update_device_up(self, context, device, agent_id, host=None):
return self.call(context,
self.make_msg('update_device_up', device=device,
agent_id=agent_id, host=host),
topic=self.topic)
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None):
return self.call(context,
self.make_msg('tunnel_sync', tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type),
topic=self.topic)