Add Zone Manager service shim
Change-Id: Ia4b19ed1529b903c7054ea05ade6f1e2d1f1a6aa Depends-On: Ie474d02c3cb3b610d1fad64e88f863f37d9c01fe
This commit is contained in:
parent
2e4e04fc37
commit
49da952c08
@ -2,7 +2,7 @@
|
||||
# Install and start **Designate** service
|
||||
|
||||
# To enable Designate services, add the following to localrc
|
||||
# enable_service designate,designate-central,designate-api,designate-pool-manager,designate-mdns,designate-agent,designate-sink
|
||||
# enable_service designate,designate-central,designate-api,designate-pool-manager,designate-zone-manager,designate-mdns,designate-agent,designate-sink
|
||||
|
||||
# stack.sh
|
||||
# ---------
|
||||
@ -295,6 +295,7 @@ function start_designate {
|
||||
run_process designate-central "$DESIGNATE_BIN_DIR/designate-central --config-file $DESIGNATE_CONF"
|
||||
run_process designate-api "$DESIGNATE_BIN_DIR/designate-api --config-file $DESIGNATE_CONF"
|
||||
run_process designate-pool-manager "$DESIGNATE_BIN_DIR/designate-pool-manager --config-file $DESIGNATE_CONF"
|
||||
run_process designate-zone-manager "$DESIGNATE_BIN_DIR/designate-zone-manager --config-file $DESIGNATE_CONF"
|
||||
run_process designate-mdns "$DESIGNATE_BIN_DIR/designate-mdns --config-file $DESIGNATE_CONF"
|
||||
run_process designate-agent "$DESIGNATE_BIN_DIR/designate-agent --config-file $DESIGNATE_CONF"
|
||||
run_process designate-sink "$DESIGNATE_BIN_DIR/designate-sink --config-file $DESIGNATE_CONF"
|
||||
@ -315,6 +316,7 @@ function stop_designate {
|
||||
stop_process designate-central
|
||||
stop_process designate-api
|
||||
stop_process designate-pool-manager
|
||||
stop_process designate-zone-manager
|
||||
stop_process designate-mdns
|
||||
stop_process designate-agent
|
||||
stop_process designate-sink
|
||||
|
@ -21,7 +21,7 @@ ENABLED_SERVICES=rabbit,mysql,key
|
||||
# Designate Devstack Config
|
||||
# =========================
|
||||
# Enable core Designate services
|
||||
ENABLED_SERVICES+=,designate,designate-central,designate-api,designate-pool-manager,designate-mdns
|
||||
ENABLED_SERVICES+=,designate,designate-central,designate-api,designate-pool-manager,designate-zone-manager,designate-mdns
|
||||
|
||||
# Optional Designate services
|
||||
#ENABLED_SERVICES+=,designate-agent
|
||||
|
41
designate/cmd/zone_manager.py
Normal file
41
designate/cmd/zone_manager.py
Normal file
@ -0,0 +1,41 @@
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Author: Endre Karlson <endre.karlson@hp.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import sys
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from designate import service
|
||||
from designate import utils
|
||||
from designate.zone_manager import service as zone_manager_service
|
||||
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.import_opt('workers', 'designate.zone_manager',
|
||||
group='service:zone_manager')
|
||||
CONF.import_opt('threads', 'designate.zone_manager',
|
||||
group='service:zone_manager')
|
||||
|
||||
|
||||
def main():
|
||||
utils.read_config('designate', sys.argv)
|
||||
logging.setup(CONF, 'designate')
|
||||
utils.setup_gmr(log_dir=cfg.CONF.log_dir)
|
||||
|
||||
server = zone_manager_service.Service(
|
||||
threads=CONF['service:zone_manager'].threads)
|
||||
service.serve(server, workers=CONF['service:zone_manager'].workers)
|
||||
service.wait()
|
@ -16,6 +16,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import math
|
||||
import uuid
|
||||
|
||||
from oslo_config import cfg
|
||||
@ -109,3 +110,93 @@ class CoordinationMixin(object):
|
||||
return
|
||||
|
||||
self._coordinator.run_watchers()
|
||||
|
||||
|
||||
class Partitioner(object):
|
||||
def __init__(self, coordinator, group_id, my_id, partitions):
|
||||
self._coordinator = coordinator
|
||||
self._group_id = group_id
|
||||
self._my_id = my_id
|
||||
self._partitions = partitions
|
||||
|
||||
self._started = False
|
||||
self._my_partitions = None
|
||||
self._callbacks = []
|
||||
|
||||
def _warn_no_backend(self):
|
||||
LOG.warning(_LW('No coordination backend configure, assuming we are '
|
||||
'the only worker. Please configure a coordination '
|
||||
'backend'))
|
||||
|
||||
def _get_members(self, group_id):
|
||||
get_members_req = self._coordinator.get_members(group_id)
|
||||
try:
|
||||
return get_members_req.get()
|
||||
except tooz.ToozError:
|
||||
self.join_group(group_id)
|
||||
|
||||
def _on_group_change(self, event):
|
||||
LOG.debug("Received member change %s" % event)
|
||||
members, self._my_partitions = self._update_partitions()
|
||||
|
||||
self._run_callbacks(members, event)
|
||||
|
||||
def _partition(self, members, me, partitions):
|
||||
member_count = len(members)
|
||||
partition_count = len(partitions)
|
||||
partition_size = int(
|
||||
math.ceil(float(partition_count) / float(member_count)))
|
||||
|
||||
my_index = members.index(me)
|
||||
my_start = partition_size * my_index
|
||||
my_end = my_start + partition_size
|
||||
|
||||
return partitions[my_start:my_end]
|
||||
|
||||
def _run_callbacks(self, members, event):
|
||||
for cb in self._callbacks:
|
||||
cb(self.my_partitions, members, event)
|
||||
|
||||
def _update_partitions(self):
|
||||
# Recalculate partitions - we need to sort the list of members
|
||||
# alphabetically so that it's the same order across all nodes.
|
||||
members = sorted(list(self._get_members(self._group_id)))
|
||||
partitions = self._partition(
|
||||
members, self._my_id, self._partitions)
|
||||
return members, partitions
|
||||
|
||||
@property
|
||||
def my_partitions(self):
|
||||
return self._my_partitions
|
||||
|
||||
def start(self):
|
||||
"""Allow the partitioner to start timers after the coordinator has
|
||||
gotten it's connections up.
|
||||
"""
|
||||
LOG.debug("Starting partitioner")
|
||||
if self._coordinator:
|
||||
self._coordinator.watch_join_group(
|
||||
self._group_id, self._on_group_change)
|
||||
self._coordinator.watch_leave_group(
|
||||
self._group_id, self._on_group_change)
|
||||
|
||||
# We need to get our partitions now. Events doesn't help in this
|
||||
# case since they require state change in the group that we wont
|
||||
# get when starting initially
|
||||
self._my_partitions = self._update_partitions()[1]
|
||||
else:
|
||||
self._my_partitions = self._partitions
|
||||
self._run_callbacks(None, None)
|
||||
|
||||
self._started = True
|
||||
|
||||
def watch_partition_change(self, callback):
|
||||
LOG.debug("Watching for change %s" % self._group_id)
|
||||
self._callbacks.append(callback)
|
||||
if self._started:
|
||||
if not self._coordinator:
|
||||
self._warn_no_backend()
|
||||
callback(self._my_partitions, None, None)
|
||||
|
||||
def unwatch_partition_change(self, callback):
|
||||
self._callbacks.remove(callback)
|
||||
|
@ -22,6 +22,7 @@ import fixtures
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import importutils
|
||||
from oslo_config import cfg
|
||||
import tooz.coordination
|
||||
|
||||
from designate import policy
|
||||
from designate import network_api
|
||||
@ -33,6 +34,20 @@ from designate.sqlalchemy import utils as sqlalchemy_utils
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CoordinatorFixture(fixtures.Fixture):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self._args = args
|
||||
self._kwargs = kwargs
|
||||
|
||||
def setUp(self):
|
||||
super(CoordinatorFixture, self).setUp()
|
||||
self.coordinator = tooz.coordination.get_coordinator(
|
||||
*self._args, **self._kwargs)
|
||||
|
||||
self.coordinator.start()
|
||||
self.addCleanup(self.coordinator.stop)
|
||||
|
||||
|
||||
class RPCFixture(fixtures.Fixture):
|
||||
|
||||
def __init__(self, conf):
|
||||
|
@ -13,10 +13,12 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
|
||||
from designate import coordination
|
||||
from designate import service
|
||||
from designate.tests import fixtures
|
||||
from designate.tests import TestCase
|
||||
|
||||
cfg.CONF.register_group(cfg.OptGroup("service:dummy"))
|
||||
@ -46,6 +48,7 @@ class CoordinationMixinTests(TestCase):
|
||||
self.assertIn(service._coordination_id,
|
||||
service._coordinator.get_members(
|
||||
service.service_name).get())
|
||||
service.stop()
|
||||
|
||||
def test_stop(self):
|
||||
service = CoordinatedService()
|
||||
@ -65,3 +68,134 @@ class CoordinationMixinTests(TestCase):
|
||||
self.assertEqual(None, service._coordinator)
|
||||
service.start()
|
||||
service.stop()
|
||||
|
||||
|
||||
class PartitionerTests(TestCase):
|
||||
def _get_partitioner(self, partitions, host='a'):
|
||||
fixture = self.useFixture(fixtures.CoordinatorFixture(
|
||||
'zake://', host))
|
||||
group = 'group'
|
||||
fixture.coordinator.create_group(group)
|
||||
fixture.coordinator.join_group(group)
|
||||
|
||||
return coordination.Partitioner(fixture.coordinator, group, host,
|
||||
partitions), fixture.coordinator
|
||||
|
||||
def test_callbacks(self):
|
||||
cb1 = mock.Mock()
|
||||
cb2 = mock.Mock()
|
||||
partitions = range(0, 10)
|
||||
|
||||
p_one, c_one = self._get_partitioner(partitions)
|
||||
p_one.start()
|
||||
p_one.watch_partition_change(cb1)
|
||||
p_one.watch_partition_change(cb2)
|
||||
|
||||
# Initial partitions are calucated upon service bootup
|
||||
cb1.assert_called_with(partitions, None, None)
|
||||
cb2.assert_called_with(partitions, None, None)
|
||||
|
||||
cb1.reset_mock()
|
||||
cb2.reset_mock()
|
||||
|
||||
# Startup a new partioner that will cause the cb's to be called
|
||||
p_two, c_two = self._get_partitioner(partitions, host='b')
|
||||
p_two.start()
|
||||
|
||||
# We'll get the 5 first partition ranges
|
||||
c_one.run_watchers()
|
||||
cb1.assert_called_with(partitions[:5], ['a', 'b'], mock.ANY)
|
||||
cb2.assert_called_with(partitions[:5], ['a', 'b'], mock.ANY)
|
||||
|
||||
def test_two_even_partitions(self):
|
||||
partitions = range(0, 10)
|
||||
|
||||
p_one, c_one = self._get_partitioner(partitions)
|
||||
p_two, c_two = self._get_partitioner(partitions, host='b')
|
||||
|
||||
p_one.start()
|
||||
p_two.start()
|
||||
|
||||
# Call c_one watchers making it refresh it's partitions
|
||||
c_one.run_watchers()
|
||||
|
||||
self.assertEqual([0, 1, 2, 3, 4], p_one.my_partitions)
|
||||
self.assertEqual([5, 6, 7, 8, 9], p_two.my_partitions)
|
||||
|
||||
def test_two_odd_partitions(self):
|
||||
partitions = range(0, 11)
|
||||
|
||||
p_one, c_one = self._get_partitioner(partitions)
|
||||
p_two, c_two = self._get_partitioner(partitions, host='b')
|
||||
|
||||
p_one.start()
|
||||
p_two.start()
|
||||
|
||||
# Call c_one watchers making it refresh it's partitions
|
||||
c_one.run_watchers()
|
||||
|
||||
self.assertEqual([0, 1, 2, 3, 4, 5], p_one.my_partitions)
|
||||
self.assertEqual([6, 7, 8, 9, 10], p_two.my_partitions)
|
||||
|
||||
def test_three_even_partitions(self):
|
||||
partitions = range(0, 10)
|
||||
|
||||
p_one, c_one = self._get_partitioner(partitions)
|
||||
p_two, c_two = self._get_partitioner(partitions, host='b')
|
||||
p_three, c_three = self._get_partitioner(partitions, host='c')
|
||||
|
||||
p_one.start()
|
||||
p_two.start()
|
||||
p_three.start()
|
||||
|
||||
# Call c_one watchers making it refresh it's partitions
|
||||
c_one.run_watchers()
|
||||
c_two.run_watchers()
|
||||
|
||||
self.assertEqual([0, 1, 2, 3], p_one.my_partitions)
|
||||
self.assertEqual([4, 5, 6, 7], p_two.my_partitions)
|
||||
self.assertEqual([8, 9], p_three.my_partitions)
|
||||
|
||||
def test_three_odd_partitions(self):
|
||||
partitions = range(0, 11)
|
||||
|
||||
p_one, c_one = self._get_partitioner(partitions)
|
||||
p_two, c_two = self._get_partitioner(partitions, host='b')
|
||||
p_three, c_three = self._get_partitioner(partitions, host='c')
|
||||
|
||||
p_one.start()
|
||||
p_two.start()
|
||||
p_three.start()
|
||||
|
||||
c_one.run_watchers()
|
||||
c_two.run_watchers()
|
||||
|
||||
self.assertEqual([0, 1, 2, 3], p_one.my_partitions)
|
||||
self.assertEqual([4, 5, 6, 7], p_two.my_partitions)
|
||||
self.assertEqual([8, 9, 10], p_three.my_partitions)
|
||||
|
||||
|
||||
class PartitionerNoCoordinationTests(TestCase):
|
||||
def test_start(self):
|
||||
# We test starting the partitioner and calling the watch func first
|
||||
partitions = range(0, 10)
|
||||
|
||||
cb1 = mock.Mock()
|
||||
cb2 = mock.Mock()
|
||||
partitioner = coordination.Partitioner(
|
||||
None, 'group', 'meme', partitions)
|
||||
partitioner.watch_partition_change(cb1)
|
||||
partitioner.watch_partition_change(cb2)
|
||||
partitioner.start()
|
||||
cb1.assert_called_with(partitions, None, None)
|
||||
cb2.assert_called_with(partitions, None, None)
|
||||
|
||||
def test_cb_on_watch(self):
|
||||
partitions = range(0, 10)
|
||||
cb = mock.Mock()
|
||||
|
||||
partitioner = coordination.Partitioner(
|
||||
None, 'group', 'meme', partitions)
|
||||
partitioner.start()
|
||||
partitioner.watch_partition_change(cb)
|
||||
cb.assert_called_with(partitions, None, None)
|
||||
|
31
designate/zone_manager/__init__.py
Normal file
31
designate/zone_manager/__init__.py
Normal file
@ -0,0 +1,31 @@
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Author: Endre Karlson <endre.karlson@hp.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from oslo_config import cfg
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
CONF.register_group(cfg.OptGroup(
|
||||
name='service:zone_manager', title="Configuration for Zone Manager Service"
|
||||
))
|
||||
|
||||
OPTS = [
|
||||
cfg.IntOpt('workers', default=None,
|
||||
help='Number of Zone Manager worker processes to spawn'),
|
||||
cfg.IntOpt('threads', default=1000,
|
||||
help='Number of Zone Manager greenthreads to spawn'),
|
||||
]
|
||||
|
||||
CONF.register_opts(OPTS, group='service:zone_manager')
|
52
designate/zone_manager/service.py
Normal file
52
designate/zone_manager/service.py
Normal file
@ -0,0 +1,52 @@
|
||||
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Author: Endre Karlson <endre.karlson@hp.com>
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
from designate.i18n import _LI
|
||||
from designate import coordination
|
||||
from designate import service
|
||||
from designate.central import rpcapi as central_api
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class Service(coordination.CoordinationMixin, service.Service):
|
||||
def __init__(self, threads=None):
|
||||
super(Service, self).__init__(threads=threads)
|
||||
|
||||
self._partitioner = coordination.Partitioner(
|
||||
self._coordinator, self.service_name, self._coordination_id,
|
||||
range(0, 4095))
|
||||
|
||||
def _rebalance(self, my_partitions, members, event):
|
||||
LOG.info(_LI("Received rebalance event %s") % event)
|
||||
self.partition_range = my_partitions
|
||||
|
||||
def start(self):
|
||||
super(Service, self).start()
|
||||
self._partitioner.start()
|
||||
self._partitioner.watch_partition_change(self._rebalance)
|
||||
|
||||
@property
|
||||
def service_name(self):
|
||||
return 'zone_manager'
|
||||
|
||||
@property
|
||||
def central_api(self):
|
||||
return central_api.CentralAPI.get_instance()
|
@ -44,6 +44,7 @@ console_scripts =
|
||||
designate-manage = designate.cmd.manage:main
|
||||
designate-mdns = designate.cmd.mdns:main
|
||||
designate-pool-manager = designate.cmd.pool_manager:main
|
||||
designate-zone-manager = designate.cmd.zone_manager:main
|
||||
designate-sink = designate.cmd.sink:main
|
||||
designate-agent = designate.cmd.agent:main
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user