diff --git a/contrib/devstack/lib/designate b/contrib/devstack/lib/designate index bc61502d1..c5b599c5f 100644 --- a/contrib/devstack/lib/designate +++ b/contrib/devstack/lib/designate @@ -2,7 +2,7 @@ # Install and start **Designate** service # To enable Designate services, add the following to localrc -# enable_service designate,designate-central,designate-api,designate-pool-manager,designate-mdns,designate-agent,designate-sink +# enable_service designate,designate-central,designate-api,designate-pool-manager,designate-zone-manager,designate-mdns,designate-agent,designate-sink # stack.sh # --------- @@ -295,6 +295,7 @@ function start_designate { run_process designate-central "$DESIGNATE_BIN_DIR/designate-central --config-file $DESIGNATE_CONF" run_process designate-api "$DESIGNATE_BIN_DIR/designate-api --config-file $DESIGNATE_CONF" run_process designate-pool-manager "$DESIGNATE_BIN_DIR/designate-pool-manager --config-file $DESIGNATE_CONF" + run_process designate-zone-manager "$DESIGNATE_BIN_DIR/designate-zone-manager --config-file $DESIGNATE_CONF" run_process designate-mdns "$DESIGNATE_BIN_DIR/designate-mdns --config-file $DESIGNATE_CONF" run_process designate-agent "$DESIGNATE_BIN_DIR/designate-agent --config-file $DESIGNATE_CONF" run_process designate-sink "$DESIGNATE_BIN_DIR/designate-sink --config-file $DESIGNATE_CONF" @@ -315,6 +316,7 @@ function stop_designate { stop_process designate-central stop_process designate-api stop_process designate-pool-manager + stop_process designate-zone-manager stop_process designate-mdns stop_process designate-agent stop_process designate-sink diff --git a/contrib/vagrant/localrc b/contrib/vagrant/localrc index ffd86fa90..6ce2e4bc8 100644 --- a/contrib/vagrant/localrc +++ b/contrib/vagrant/localrc @@ -21,7 +21,7 @@ ENABLED_SERVICES=rabbit,mysql,key # Designate Devstack Config # ========================= # Enable core Designate services -ENABLED_SERVICES+=,designate,designate-central,designate-api,designate-pool-manager,designate-mdns +ENABLED_SERVICES+=,designate,designate-central,designate-api,designate-pool-manager,designate-zone-manager,designate-mdns # Optional Designate services #ENABLED_SERVICES+=,designate-agent diff --git a/designate/cmd/zone_manager.py b/designate/cmd/zone_manager.py new file mode 100644 index 000000000..75931be86 --- /dev/null +++ b/designate/cmd/zone_manager.py @@ -0,0 +1,41 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Author: Endre Karlson +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import sys + +from oslo_config import cfg +from oslo_log import log as logging + +from designate import service +from designate import utils +from designate.zone_manager import service as zone_manager_service + + +CONF = cfg.CONF +CONF.import_opt('workers', 'designate.zone_manager', + group='service:zone_manager') +CONF.import_opt('threads', 'designate.zone_manager', + group='service:zone_manager') + + +def main(): + utils.read_config('designate', sys.argv) + logging.setup(CONF, 'designate') + utils.setup_gmr(log_dir=cfg.CONF.log_dir) + + server = zone_manager_service.Service( + threads=CONF['service:zone_manager'].threads) + service.serve(server, workers=CONF['service:zone_manager'].workers) + service.wait() diff --git a/designate/coordination.py b/designate/coordination.py index a9a08de8e..d095d3d07 100644 --- a/designate/coordination.py +++ b/designate/coordination.py @@ -16,6 +16,7 @@ # License for the specific language governing permissions and limitations # under the License. +import math import uuid from oslo_config import cfg @@ -109,3 +110,93 @@ class CoordinationMixin(object): return self._coordinator.run_watchers() + + +class Partitioner(object): + def __init__(self, coordinator, group_id, my_id, partitions): + self._coordinator = coordinator + self._group_id = group_id + self._my_id = my_id + self._partitions = partitions + + self._started = False + self._my_partitions = None + self._callbacks = [] + + def _warn_no_backend(self): + LOG.warning(_LW('No coordination backend configure, assuming we are ' + 'the only worker. Please configure a coordination ' + 'backend')) + + def _get_members(self, group_id): + get_members_req = self._coordinator.get_members(group_id) + try: + return get_members_req.get() + except tooz.ToozError: + self.join_group(group_id) + + def _on_group_change(self, event): + LOG.debug("Received member change %s" % event) + members, self._my_partitions = self._update_partitions() + + self._run_callbacks(members, event) + + def _partition(self, members, me, partitions): + member_count = len(members) + partition_count = len(partitions) + partition_size = int( + math.ceil(float(partition_count) / float(member_count))) + + my_index = members.index(me) + my_start = partition_size * my_index + my_end = my_start + partition_size + + return partitions[my_start:my_end] + + def _run_callbacks(self, members, event): + for cb in self._callbacks: + cb(self.my_partitions, members, event) + + def _update_partitions(self): + # Recalculate partitions - we need to sort the list of members + # alphabetically so that it's the same order across all nodes. + members = sorted(list(self._get_members(self._group_id))) + partitions = self._partition( + members, self._my_id, self._partitions) + return members, partitions + + @property + def my_partitions(self): + return self._my_partitions + + def start(self): + """Allow the partitioner to start timers after the coordinator has + gotten it's connections up. + """ + LOG.debug("Starting partitioner") + if self._coordinator: + self._coordinator.watch_join_group( + self._group_id, self._on_group_change) + self._coordinator.watch_leave_group( + self._group_id, self._on_group_change) + + # We need to get our partitions now. Events doesn't help in this + # case since they require state change in the group that we wont + # get when starting initially + self._my_partitions = self._update_partitions()[1] + else: + self._my_partitions = self._partitions + self._run_callbacks(None, None) + + self._started = True + + def watch_partition_change(self, callback): + LOG.debug("Watching for change %s" % self._group_id) + self._callbacks.append(callback) + if self._started: + if not self._coordinator: + self._warn_no_backend() + callback(self._my_partitions, None, None) + + def unwatch_partition_change(self, callback): + self._callbacks.remove(callback) diff --git a/designate/tests/fixtures.py b/designate/tests/fixtures.py index a02c571d6..186cb61bb 100644 --- a/designate/tests/fixtures.py +++ b/designate/tests/fixtures.py @@ -22,6 +22,7 @@ import fixtures from oslo_log import log as logging from oslo_utils import importutils from oslo_config import cfg +import tooz.coordination from designate import policy from designate import network_api @@ -33,6 +34,20 @@ from designate.sqlalchemy import utils as sqlalchemy_utils LOG = logging.getLogger(__name__) +class CoordinatorFixture(fixtures.Fixture): + def __init__(self, *args, **kwargs): + self._args = args + self._kwargs = kwargs + + def setUp(self): + super(CoordinatorFixture, self).setUp() + self.coordinator = tooz.coordination.get_coordinator( + *self._args, **self._kwargs) + + self.coordinator.start() + self.addCleanup(self.coordinator.stop) + + class RPCFixture(fixtures.Fixture): def __init__(self, conf): diff --git a/designate/tests/test_coordination.py b/designate/tests/test_coordination.py index ca1585ed3..b8b31ace6 100644 --- a/designate/tests/test_coordination.py +++ b/designate/tests/test_coordination.py @@ -13,10 +13,12 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. +import mock from oslo_config import cfg from designate import coordination from designate import service +from designate.tests import fixtures from designate.tests import TestCase cfg.CONF.register_group(cfg.OptGroup("service:dummy")) @@ -46,6 +48,7 @@ class CoordinationMixinTests(TestCase): self.assertIn(service._coordination_id, service._coordinator.get_members( service.service_name).get()) + service.stop() def test_stop(self): service = CoordinatedService() @@ -65,3 +68,134 @@ class CoordinationMixinTests(TestCase): self.assertEqual(None, service._coordinator) service.start() service.stop() + + +class PartitionerTests(TestCase): + def _get_partitioner(self, partitions, host='a'): + fixture = self.useFixture(fixtures.CoordinatorFixture( + 'zake://', host)) + group = 'group' + fixture.coordinator.create_group(group) + fixture.coordinator.join_group(group) + + return coordination.Partitioner(fixture.coordinator, group, host, + partitions), fixture.coordinator + + def test_callbacks(self): + cb1 = mock.Mock() + cb2 = mock.Mock() + partitions = range(0, 10) + + p_one, c_one = self._get_partitioner(partitions) + p_one.start() + p_one.watch_partition_change(cb1) + p_one.watch_partition_change(cb2) + + # Initial partitions are calucated upon service bootup + cb1.assert_called_with(partitions, None, None) + cb2.assert_called_with(partitions, None, None) + + cb1.reset_mock() + cb2.reset_mock() + + # Startup a new partioner that will cause the cb's to be called + p_two, c_two = self._get_partitioner(partitions, host='b') + p_two.start() + + # We'll get the 5 first partition ranges + c_one.run_watchers() + cb1.assert_called_with(partitions[:5], ['a', 'b'], mock.ANY) + cb2.assert_called_with(partitions[:5], ['a', 'b'], mock.ANY) + + def test_two_even_partitions(self): + partitions = range(0, 10) + + p_one, c_one = self._get_partitioner(partitions) + p_two, c_two = self._get_partitioner(partitions, host='b') + + p_one.start() + p_two.start() + + # Call c_one watchers making it refresh it's partitions + c_one.run_watchers() + + self.assertEqual([0, 1, 2, 3, 4], p_one.my_partitions) + self.assertEqual([5, 6, 7, 8, 9], p_two.my_partitions) + + def test_two_odd_partitions(self): + partitions = range(0, 11) + + p_one, c_one = self._get_partitioner(partitions) + p_two, c_two = self._get_partitioner(partitions, host='b') + + p_one.start() + p_two.start() + + # Call c_one watchers making it refresh it's partitions + c_one.run_watchers() + + self.assertEqual([0, 1, 2, 3, 4, 5], p_one.my_partitions) + self.assertEqual([6, 7, 8, 9, 10], p_two.my_partitions) + + def test_three_even_partitions(self): + partitions = range(0, 10) + + p_one, c_one = self._get_partitioner(partitions) + p_two, c_two = self._get_partitioner(partitions, host='b') + p_three, c_three = self._get_partitioner(partitions, host='c') + + p_one.start() + p_two.start() + p_three.start() + + # Call c_one watchers making it refresh it's partitions + c_one.run_watchers() + c_two.run_watchers() + + self.assertEqual([0, 1, 2, 3], p_one.my_partitions) + self.assertEqual([4, 5, 6, 7], p_two.my_partitions) + self.assertEqual([8, 9], p_three.my_partitions) + + def test_three_odd_partitions(self): + partitions = range(0, 11) + + p_one, c_one = self._get_partitioner(partitions) + p_two, c_two = self._get_partitioner(partitions, host='b') + p_three, c_three = self._get_partitioner(partitions, host='c') + + p_one.start() + p_two.start() + p_three.start() + + c_one.run_watchers() + c_two.run_watchers() + + self.assertEqual([0, 1, 2, 3], p_one.my_partitions) + self.assertEqual([4, 5, 6, 7], p_two.my_partitions) + self.assertEqual([8, 9, 10], p_three.my_partitions) + + +class PartitionerNoCoordinationTests(TestCase): + def test_start(self): + # We test starting the partitioner and calling the watch func first + partitions = range(0, 10) + + cb1 = mock.Mock() + cb2 = mock.Mock() + partitioner = coordination.Partitioner( + None, 'group', 'meme', partitions) + partitioner.watch_partition_change(cb1) + partitioner.watch_partition_change(cb2) + partitioner.start() + cb1.assert_called_with(partitions, None, None) + cb2.assert_called_with(partitions, None, None) + + def test_cb_on_watch(self): + partitions = range(0, 10) + cb = mock.Mock() + + partitioner = coordination.Partitioner( + None, 'group', 'meme', partitions) + partitioner.start() + partitioner.watch_partition_change(cb) + cb.assert_called_with(partitions, None, None) diff --git a/designate/zone_manager/__init__.py b/designate/zone_manager/__init__.py new file mode 100644 index 000000000..da7036315 --- /dev/null +++ b/designate/zone_manager/__init__.py @@ -0,0 +1,31 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Author: Endre Karlson +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo_config import cfg + +CONF = cfg.CONF + +CONF.register_group(cfg.OptGroup( + name='service:zone_manager', title="Configuration for Zone Manager Service" +)) + +OPTS = [ + cfg.IntOpt('workers', default=None, + help='Number of Zone Manager worker processes to spawn'), + cfg.IntOpt('threads', default=1000, + help='Number of Zone Manager greenthreads to spawn'), +] + +CONF.register_opts(OPTS, group='service:zone_manager') diff --git a/designate/zone_manager/service.py b/designate/zone_manager/service.py new file mode 100644 index 000000000..699af2fb0 --- /dev/null +++ b/designate/zone_manager/service.py @@ -0,0 +1,52 @@ +# Copyright 2015 Hewlett-Packard Development Company, L.P. +# +# Author: Endre Karlson +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo_config import cfg +from oslo_log import log as logging + +from designate.i18n import _LI +from designate import coordination +from designate import service +from designate.central import rpcapi as central_api + + +LOG = logging.getLogger(__name__) +CONF = cfg.CONF + + +class Service(coordination.CoordinationMixin, service.Service): + def __init__(self, threads=None): + super(Service, self).__init__(threads=threads) + + self._partitioner = coordination.Partitioner( + self._coordinator, self.service_name, self._coordination_id, + range(0, 4095)) + + def _rebalance(self, my_partitions, members, event): + LOG.info(_LI("Received rebalance event %s") % event) + self.partition_range = my_partitions + + def start(self): + super(Service, self).start() + self._partitioner.start() + self._partitioner.watch_partition_change(self._rebalance) + + @property + def service_name(self): + return 'zone_manager' + + @property + def central_api(self): + return central_api.CentralAPI.get_instance() diff --git a/setup.cfg b/setup.cfg index bd70fd2d2..f0d2273c3 100644 --- a/setup.cfg +++ b/setup.cfg @@ -44,6 +44,7 @@ console_scripts = designate-manage = designate.cmd.manage:main designate-mdns = designate.cmd.mdns:main designate-pool-manager = designate.cmd.pool_manager:main + designate-zone-manager = designate.cmd.zone_manager:main designate-sink = designate.cmd.sink:main designate-agent = designate.cmd.agent:main