From 27604abd461d7dbf8098c7cc794dfcc2686c4527 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Mon, 13 Mar 2017 14:08:44 +0100 Subject: [PATCH] Replace Ceilometer coordination layer by tooz partition system This replaces the custom made partitioning system using the hashring by the one provided in tooz. Change-Id: I2321c92315accc5e5972138e7673d3a665df891e --- ceilometer/agent/manager.py | 41 ++-- ceilometer/coordination.py | 141 ----------- ceilometer/notification.py | 38 +-- ceilometer/opts.py | 16 +- .../tests/functional/test_notification.py | 65 +++-- ceilometer/tests/unit/agent/agentbase.py | 60 ++--- ceilometer/tests/unit/test_coordination.py | 228 ------------------ ...-coordination-system-d1054b9d1a5ddf32.yaml | 6 + 8 files changed, 125 insertions(+), 470 deletions(-) delete mode 100644 ceilometer/coordination.py delete mode 100644 ceilometer/tests/unit/test_coordination.py create mode 100644 releasenotes/notes/tooz-coordination-system-d1054b9d1a5ddf32.yaml diff --git a/ceilometer/agent/manager.py b/ceilometer/agent/manager.py index c27142121a..a233546969 100644 --- a/ceilometer/agent/manager.py +++ b/ceilometer/agent/manager.py @@ -32,9 +32,9 @@ from oslo_utils import timeutils from six import moves from six.moves.urllib import parse as urlparse from stevedore import extension +from tooz import coordination from ceilometer.agent import plugin_base -from ceilometer import coordination from ceilometer import keystone_client from ceilometer import messaging from ceilometer import pipeline @@ -102,14 +102,16 @@ class Resources(object): source_discovery = (self.agent_manager.discover(self._discovery, discovery_cache) if self._discovery else []) - static_resources = [] + if self._resources: static_resources_group = self.agent_manager.construct_group_id( utils.hash_of_set(self._resources)) - p_coord = self.agent_manager.partition_coordinator - static_resources = p_coord.extract_my_subset( - static_resources_group, self._resources) - return static_resources + source_discovery + return list(filter( + self.agent_manager.hashrings[ + static_resources_group].belongs_to_self, self._resources + )) + source_discovery + + return source_discovery @staticmethod def key(source_name, pollster): @@ -278,8 +280,8 @@ class AgentManager(cotyledon.Service): if self.conf.coordination.backend_url: # XXX uuid4().bytes ought to work, but it requires ascii for now coordination_id = str(uuid.uuid4()).encode('ascii') - self.partition_coordinator = coordination.PartitionCoordinator( - self.conf, coordination_id) + self.partition_coordinator = coordination.get_coordinator( + self.conf.coordination.backend_url, coordination_id) else: self.partition_coordinator = None @@ -355,8 +357,9 @@ class AgentManager(cotyledon.Service): ]) groups.update(static_resource_groups) - for group in groups: - self.partition_coordinator.join_group(group) + self.hashrings = dict( + (group, self.partition_coordinator.join_partitioned_group(group)) + for group in groups) def create_polling_task(self): """Create an initially empty polling task.""" @@ -492,17 +495,17 @@ class AgentManager(cotyledon.Service): continue discovered = discoverer.discover(self, param) + if self.partition_coordinator: - partitioned = ( - self.partition_coordinator.extract_my_subset( - self.construct_group_id(discoverer.group_id), - discovered) - ) - else: - partitioned = discovered - resources.extend(partitioned) + discovered = list(filter( + self.hashrings[ + self.construct_group_id(discoverer.group_id) + ].belongs_to_self, discovered + )) + + resources.extend(discovered) if discovery_cache is not None: - discovery_cache[url] = partitioned + discovery_cache[url] = discovered except ka_exceptions.ClientException as e: LOG.error('Skipping %(name)s, keystone issue: ' '%(exc)s', {'name': name, 'exc': e}) diff --git a/ceilometer/coordination.py b/ceilometer/coordination.py deleted file mode 100644 index 2aa8883cb7..0000000000 --- a/ceilometer/coordination.py +++ /dev/null @@ -1,141 +0,0 @@ -# -# Copyright 2014-2017 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import six - -from oslo_config import cfg -from oslo_log import log -import tenacity -import tooz.coordination -from tooz import hashring - - -LOG = log.getLogger(__name__) - -OPTS = [ - cfg.StrOpt('backend_url', - help='The backend URL to use for distributed coordination. If ' - 'left empty, per-deployment central agent and per-host ' - 'compute agent won\'t do workload ' - 'partitioning and will only function correctly if a ' - 'single instance of that service is running.'), - cfg.FloatOpt('check_watchers', - default=10.0, - help='Number of seconds between checks to see if group ' - 'membership has changed'), - cfg.IntOpt('retry_backoff', - default=1, - help='Retry backoff factor when retrying to connect with ' - 'coordination backend'), - cfg.IntOpt('max_retry_interval', - default=30, - help='Maximum number of seconds between retry to join ' - 'partitioning group') -] - - -class PartitionCoordinator(object): - """Workload partitioning coordinator. - - This class uses the `tooz` library to manage group membership. - - Coordination errors and reconnects are handled under the hood, so the - service using the partition coordinator need not care whether the - coordination backend is down. The `extract_my_subset` will simply return an - empty iterable in this case. - """ - - def __init__(self, conf, my_id): - self.conf = conf - self._my_id = my_id - self._coordinator = tooz.coordination.get_coordinator( - conf.coordination.backend_url, my_id) - - def start(self): - try: - self._coordinator.start(start_heart=True) - LOG.info('Coordination backend started successfully.') - except tooz.coordination.ToozError: - LOG.exception('Error connecting to coordination backend.') - - def stop(self): - try: - self._coordinator.stop() - except tooz.coordination.ToozError: - LOG.exception('Error connecting to coordination backend.') - finally: - del self._coordinator - - def watch_group(self, namespace, callback): - self._coordinator.watch_join_group(namespace, callback) - self._coordinator.watch_leave_group(namespace, callback) - - def run_watchers(self): - self._coordinator.run_watchers() - - def join_group(self, group_id): - - @tenacity.retry( - wait=tenacity.wait_exponential( - multiplier=self.conf.coordination.retry_backoff, - max=self.conf.coordination.max_retry_interval), - retry=tenacity.retry_never) - def _inner(): - try: - self._coordinator.join_group_create(group_id) - except tooz.coordination.MemberAlreadyExist: - pass - except tooz.coordination.ToozError: - LOG.exception('Error joining partitioning group %s,' - ' re-trying', group_id) - raise tenacity.TryAgain - LOG.info('Joined partitioning group %s', group_id) - - return _inner() - - def _get_members(self, group_id): - while True: - get_members_req = self._coordinator.get_members(group_id) - try: - return get_members_req.get() - except tooz.coordination.GroupNotCreated: - self.join_group(group_id) - - def extract_my_subset(self, group_id, iterable): - """Filters an iterable, returning only objects assigned to this agent. - - We have a list of objects and get a list of active group members from - `tooz`. We then hash all the objects into buckets and return only - the ones that hashed into *our* bucket. - """ - try: - members = self._get_members(group_id) - hr = hashring.HashRing(members, partitions=100) - iterable = list(iterable) - filtered = [v for v in iterable - if self._my_id in hr.get_nodes(self.encode_task(v))] - LOG.debug('The universal set: %s, my subset: %s', - [six.text_type(f) for f in iterable], - [six.text_type(f) for f in filtered]) - return filtered - except tooz.coordination.ToozError: - LOG.exception('Error getting group membership info from ' - 'coordination backend.') - return [] - - @staticmethod - def encode_task(value): - """encode to bytes""" - return six.text_type(value).encode('utf-8') diff --git a/ceilometer/notification.py b/ceilometer/notification.py index d571713088..6233b7b486 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -1,4 +1,5 @@ # +# Copyright 2017 Red Hat, Inc. # Copyright 2012-2013 eNovance # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -25,9 +26,10 @@ from futurist import periodics from oslo_config import cfg from oslo_log import log import oslo_messaging +import six from stevedore import extension +from tooz import coordination -from ceilometer import coordination from ceilometer.event import endpoint as event_endpoint from ceilometer.i18n import _ from ceilometer import messaging @@ -112,9 +114,14 @@ class NotificationService(cotyledon.Service): super(NotificationService, self).__init__(worker_id) self.startup_delay = worker_id self.conf = conf - # XXX uuid4().bytes ought to work, but it requires ascii for now - self.coordination_id = (coordination_id or - str(uuid.uuid4()).encode('ascii')) + if self.conf.notification.workload_partitioning: + # XXX uuid4().bytes ought to work, but it requires ascii for now + coordination_id = (coordination_id or + str(uuid.uuid4()).encode('ascii')) + self.partition_coordinator = coordination.get_coordinator( + self.conf.coordination.backend_url, coordination_id) + else: + self.partition_coordinator = None @classmethod def _get_notifications_manager(cls, pm): @@ -168,7 +175,6 @@ class NotificationService(cotyledon.Service): super(NotificationService, self).run() self.shutdown = False self.periodic = None - self.partition_coordinator = None self.coord_lock = threading.Lock() self.listeners = [] @@ -184,9 +190,6 @@ class NotificationService(cotyledon.Service): self.transport = messaging.get_transport(self.conf) if self.conf.notification.workload_partitioning: - self.group_id = self.NOTIFICATION_NAMESPACE - self.partition_coordinator = coordination.PartitionCoordinator( - self.conf, self.coordination_id) self.partition_coordinator.start() else: # FIXME(sileht): endpoint uses the notification_topics option @@ -204,9 +207,8 @@ class NotificationService(cotyledon.Service): if self.conf.notification.workload_partitioning: # join group after all manager set up is configured - self.partition_coordinator.join_group(self.group_id) - self.partition_coordinator.watch_group(self.group_id, - self._refresh_agent) + self.hashring = self.partition_coordinator.join_partitioned_group( + self.NOTIFICATION_NAMESPACE) @periodics.periodic(spacing=self.conf.coordination.check_watchers, run_immediately=True) @@ -219,7 +221,6 @@ class NotificationService(cotyledon.Service): self.periodic.add(run_watchers) utils.spawn_thread(self.periodic.start) - # configure pipelines after all coordination is configured. with self.coord_lock: self._configure_pipeline_listener() @@ -275,14 +276,13 @@ class NotificationService(cotyledon.Service): ev_pipes = self.event_pipeline_manager.pipelines pipelines = self.pipeline_manager.pipelines + ev_pipes transport = messaging.get_transport(self.conf) + partitioned = six.moves.range( + self.conf.notification.pipeline_processing_queues + ) + if self.partition_coordinator: - partitioned = self.partition_coordinator.extract_my_subset( - self.group_id, - range(self.conf.notification.pipeline_processing_queues)) - else: - partitioned = range( - self.conf.notification.pipeline_processing_queues - ) + partitioned = list(filter( + self.hashring.belongs_to_self, partitioned)) endpoints = [] targets = [] diff --git a/ceilometer/opts.py b/ceilometer/opts.py index 52ba520da7..e72834227d 100644 --- a/ceilometer/opts.py +++ b/ceilometer/opts.py @@ -26,7 +26,6 @@ import ceilometer.compute.virt.inspector import ceilometer.compute.virt.libvirt.utils import ceilometer.compute.virt.vmware.inspector import ceilometer.compute.virt.xenapi.inspector -import ceilometer.coordination import ceilometer.dispatcher import ceilometer.dispatcher.file import ceilometer.dispatcher.gnocchi_opts @@ -93,7 +92,20 @@ def list_opts(): ceilometer.api.controllers.v2.root.API_OPTS)), ('collector', ceilometer.collector.OPTS), ('compute', ceilometer.compute.discovery.OPTS), - ('coordination', ceilometer.coordination.OPTS), + ('coordination', [ + cfg.StrOpt( + 'backend_url', + help='The backend URL to use for distributed coordination. If ' + 'left empty, per-deployment central agent and per-host ' + 'compute agent won\'t do workload ' + 'partitioning and will only function correctly if a ' + 'single instance of that service is running.'), + cfg.FloatOpt( + 'check_watchers', + default=10.0, + help='Number of seconds between checks to see if group ' + 'membership has changed'), + ]), ('database', ceilometer.storage.OPTS), ('dispatcher_file', ceilometer.dispatcher.file.OPTS), ('dispatcher_http', ceilometer.dispatcher.http.http_dispatcher_opts), diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index fd22d3deb8..c60b822e3c 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -297,12 +297,14 @@ class TestRealNotificationHA(BaseRealNotification): fake_publisher_cls.return_value = self.publisher self._check_notification_service() + @mock.patch("ceilometer.utils.kill_listeners", mock.MagicMock()) @mock.patch.object(oslo_messaging.MessageHandlingServer, 'start') def test_notification_threads(self, m_listener): self.CONF.set_override('batch_size', 1, group='notification') self.srv.run() m_listener.assert_called_with(override_pool_size=None) m_listener.reset_mock() + self.srv.terminate() self.CONF.set_override('batch_size', 2, group='notification') self.srv.run() m_listener.assert_called_with(override_pool_size=1) @@ -324,22 +326,32 @@ class TestRealNotificationHA(BaseRealNotification): @mock.patch('oslo_messaging.get_batch_notification_listener') def test_retain_common_targets_on_refresh(self, mock_listener): - with mock.patch('ceilometer.coordination.PartitionCoordinator' - '.extract_my_subset', return_value=[1, 2]): - self.srv.run() - self.addCleanup(self.srv.terminate) + maybe = {"maybe": 0} + + def _once_over_five(item): + maybe["maybe"] += 1 + return maybe["maybe"] % 5 == 0 + + hashring = mock.MagicMock() + hashring.belongs_to_self = _once_over_five + self.srv.partition_coordinator = pc = mock.MagicMock() + pc.join_partitioned_group.return_value = hashring + self.srv.run() + self.addCleanup(self.srv.terminate) listened_before = [target.topic for target in mock_listener.call_args[0][1]] self.assertEqual(4, len(listened_before)) - with mock.patch('ceilometer.coordination.PartitionCoordinator' - '.extract_my_subset', return_value=[1, 3]): - self.srv._refresh_agent(None) + self.srv._refresh_agent(None) listened_after = [target.topic for target in mock_listener.call_args[0][1]] self.assertEqual(4, len(listened_after)) common = set(listened_before) & set(listened_after) - for topic in common: - self.assertTrue(topic.endswith('1')) + self.assertEqual( + {'ceilometer-pipe-test_pipeline:test_sink-4', + 'ceilometer-pipe-event:test_event:test_sink-4', + 'ceilometer-pipe-event:test_event:test_sink-9', + 'ceilometer-pipe-test_pipeline:test_sink-9'}, + common) @mock.patch('oslo_messaging.get_batch_notification_listener') def test_notify_to_relevant_endpoint(self, mock_listener): @@ -463,14 +475,31 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase): def _check_notifications(self, fake_publisher_cls): fake_publisher_cls.side_effect = [self.publisher, self.publisher2] - self.srv = notification.NotificationService(0, self.CONF, 'harry') - self.srv2 = notification.NotificationService(0, self.CONF, 'lloyd') - with mock.patch('ceilometer.coordination.PartitionCoordinator' - '._get_members', return_value=['harry', 'lloyd']): - self.srv.run() - self.addCleanup(self.srv.terminate) - self.srv2.run() - self.addCleanup(self.srv2.terminate) + maybe = {"srv": 0, "srv2": -1} + + def _sometimes_srv(item): + maybe["srv"] += 1 + return (maybe["srv"] % 2) == 0 + + self.srv = notification.NotificationService(0, self.CONF) + self.srv.partition_coordinator = pc = mock.MagicMock() + hashring = mock.MagicMock() + hashring.belongs_to_self = _sometimes_srv + pc.join_partitioned_group.return_value = hashring + self.srv.run() + self.addCleanup(self.srv.terminate) + + def _sometimes_srv2(item): + maybe["srv2"] += 1 + return (maybe["srv2"] % 2) == 0 + + self.srv2 = notification.NotificationService(0, self.CONF) + self.srv2.partition_coordinator = pc = mock.MagicMock() + hashring = mock.MagicMock() + hashring.belongs_to_self = _sometimes_srv2 + pc.join_partitioned_group.return_value = hashring + self.srv2.run() + self.addCleanup(self.srv2.terminate) notifier = messaging.get_notifier(self.transport, "compute.vagrant-precise") @@ -483,7 +512,7 @@ class TestRealNotificationMultipleAgents(tests_base.BaseTestCase): self.expected_samples = 4 with mock.patch('six.moves.builtins.hash', lambda x: int(x)): start = time.time() - while time.time() - start < 60: + while time.time() - start < 10: if (len(self.publisher.samples + self.publisher2.samples) >= self.expected_samples): break diff --git a/ceilometer/tests/unit/agent/agentbase.py b/ceilometer/tests/unit/agent/agentbase.py index 1939ae1058..c166fce44e 100644 --- a/ceilometer/tests/unit/agent/agentbase.py +++ b/ceilometer/tests/unit/agent/agentbase.py @@ -32,7 +32,6 @@ from ceilometer import pipeline from ceilometer import sample from ceilometer import service from ceilometer.tests import base -from ceilometer import utils class TestSample(sample.Sample): @@ -248,16 +247,20 @@ class BaseAgentManagerTestCase(base.BaseTestCase): def setUp(self): super(BaseAgentManagerTestCase, self).setUp() self.CONF = service.prepare_service([], []) + self.CONF.set_override("backend_url", "zake://", "coordination") self.CONF.set_override( 'cfg_file', self.path_get('etc/ceilometer/polling.yaml'), group='polling' ) self.mgr = self.create_manager() self.mgr.extensions = self.create_extension_list() - self.mgr.partition_coordinator = mock.MagicMock() - fake_subset = lambda _, x: x - p_coord = self.mgr.partition_coordinator - p_coord.extract_my_subset.side_effect = fake_subset + + self.hashring = mock.MagicMock() + self.hashring.belongs_to_self = mock.MagicMock() + self.hashring.belongs_to_self.return_value = True + + self.mgr.hashrings = mock.MagicMock() + self.mgr.hashrings.__getitem__.return_value = self.hashring self.mgr.tg = mock.MagicMock() self.polling_cfg = { 'sources': [{ @@ -291,27 +294,10 @@ class BaseAgentManagerTestCase(base.BaseTestCase): @mock.patch('ceilometer.pipeline.setup_polling') def test_start(self, setup_polling): self.mgr.setup_polling_tasks = mock.MagicMock() - mpc = self.mgr.partition_coordinator self.mgr.run() setup_polling.assert_called_once_with(self.CONF) - mpc.start.assert_called_once_with() - self.assertEqual(2, mpc.join_group.call_count) self.mgr.setup_polling_tasks.assert_called_once_with() self.mgr.terminate() - mpc.stop.assert_called_once_with() - - def test_join_partitioning_groups(self): - self.mgr.discoveries = self.create_discoveries() - self.mgr.join_partitioning_groups() - p_coord = self.mgr.partition_coordinator - static_group_ids = [utils.hash_of_set(p['resources']) - for p in self.polling_cfg['sources'] - if p['resources']] - expected = [mock.call(self.mgr.construct_group_id(g)) - for g in ['another_group', 'global'] + static_group_ids] - self.assertEqual(len(expected), len(p_coord.join_group.call_args_list)) - for c in expected: - self.assertIn(c, p_coord.join_group.call_args_list) def test_setup_polling_tasks(self): polling_tasks = self.mgr.setup_polling_tasks() @@ -578,7 +564,6 @@ class BaseAgentManagerTestCase(base.BaseTestCase): def test_discovery_partitioning(self): self.mgr.discoveries = self.create_discoveries() - p_coord = self.mgr.partition_coordinator self.polling_cfg['sources'][0]['discovery'] = [ 'testdiscovery', 'testdiscoveryanother', 'testdiscoverynonexistent', 'testdiscoveryexception'] @@ -586,17 +571,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) - expected = [mock.call(self.mgr.construct_group_id(d.obj.group_id), - d.obj.resources) - for d in self.mgr.discoveries - if hasattr(d.obj, 'resources')] - self.assertEqual(len(expected), - len(p_coord.extract_my_subset.call_args_list)) - for c in expected: - self.assertIn(c, p_coord.extract_my_subset.call_args_list) + self.mgr.hashrings.__getitem__.assert_called_with( + 'central-compute-another_group') + self.hashring.belongs_to_self.assert_not_called() def test_static_resources_partitioning(self): - p_coord = self.mgr.partition_coordinator static_resources = ['static_1', 'static_2'] static_resources2 = ['static_3', 'static_4'] self.polling_cfg['sources'][0]['resources'] = static_resources @@ -616,17 +595,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.setup_polling() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) - # Only two groups need to be created, one for each polling, - # even though counter test is used twice - expected = [mock.call(self.mgr.construct_group_id( - utils.hash_of_set(resources)), - resources) - for resources in [static_resources, - static_resources2]] - self.assertEqual(len(expected), - len(p_coord.extract_my_subset.call_args_list)) - for c in expected: - self.assertIn(c, p_coord.extract_my_subset.call_args_list) + self.hashring.belongs_to_self.assert_has_calls([ + mock.call('static_1'), + mock.call('static_2'), + mock.call('static_3'), + mock.call('static_4'), + ], any_order=True) @mock.patch('ceilometer.agent.manager.LOG') def test_polling_and_notify_with_resources(self, LOG): diff --git a/ceilometer/tests/unit/test_coordination.py b/ceilometer/tests/unit/test_coordination.py deleted file mode 100644 index 5e38b730e8..0000000000 --- a/ceilometer/tests/unit/test_coordination.py +++ /dev/null @@ -1,228 +0,0 @@ -# -# Copyright 2014-2017 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import logging - -import mock -import tooz.coordination -from tooz import hashring - -from ceilometer import coordination -from ceilometer import service -from ceilometer.tests import base - - -class MockToozCoordinator(tooz.coordination.CoordinationDriver): - def __init__(self, member_id, shared_storage): - super(MockToozCoordinator, self).__init__(member_id) - self._member_id = member_id - self._shared_storage = shared_storage - - def create_group(self, group_id): - if group_id in self._shared_storage: - return MockAsyncError( - tooz.coordination.GroupAlreadyExist(group_id)) - self._shared_storage[group_id] = {} - return MockAsyncResult(None) - - def join_group(self, group_id, capabilities=b''): - if group_id not in self._shared_storage: - return MockAsyncError( - tooz.coordination.GroupNotCreated(group_id)) - if self._member_id in self._shared_storage[group_id]: - return MockAsyncError( - tooz.coordination.MemberAlreadyExist(group_id, - self._member_id)) - self._shared_storage[group_id][self._member_id] = { - "capabilities": capabilities, - } - return MockAsyncResult(None) - - def get_members(self, group_id): - if group_id not in self._shared_storage: - return MockAsyncError( - tooz.coordination.GroupNotCreated(group_id)) - return MockAsyncResult(self._shared_storage[group_id]) - - -class MockToozCoordExceptionOnJoinRaiser(MockToozCoordinator): - def __init__(self, member_id, shared_storage, retry_count=None): - super(MockToozCoordExceptionOnJoinRaiser, - self).__init__(member_id, shared_storage) - self.tooz_error_count = retry_count - self.count = 0 - - def join_group_create(self, group_id, capabilities=b''): - if self.count == self.tooz_error_count: - return super( - MockToozCoordExceptionOnJoinRaiser, self).join_group_create( - group_id, capabilities) - else: - self.count += 1 - raise tooz.coordination.ToozError('error') - - -class MockAsyncResult(tooz.coordination.CoordAsyncResult): - def __init__(self, result): - self.result = result - - def get(self, timeout=0): - return self.result - - @staticmethod - def done(): - return True - - -class MockAsyncError(tooz.coordination.CoordAsyncResult): - def __init__(self, error): - self.error = error - - def get(self, timeout=0): - raise self.error - - @staticmethod - def done(): - return True - - -class MockLoggingHandler(logging.Handler): - """Mock logging handler to check for expected logs.""" - - def __init__(self, *args, **kwargs): - self.reset() - logging.Handler.__init__(self, *args, **kwargs) - - def emit(self, record): - self.messages[record.levelname.lower()].append(record.getMessage()) - - def reset(self): - self.messages = {'debug': [], - 'info': [], - 'warning': [], - 'error': [], - 'critical': []} - - -class TestPartitioning(base.BaseTestCase): - - def setUp(self): - super(TestPartitioning, self).setUp() - self.CONF = service.prepare_service([], []) - self.str_handler = MockLoggingHandler() - coordination.LOG.logger.addHandler(self.str_handler) - self.shared_storage = {} - - def _get_new_started_coordinator(self, shared_storage, agent_id=None, - coordinator_cls=None, retry_count=None): - coordinator_cls = coordinator_cls or MockToozCoordinator - self.CONF.set_override('backend_url', 'xxx://yyy', - group='coordination') - with mock.patch('tooz.coordination.get_coordinator', - lambda _, member_id: - coordinator_cls(member_id, shared_storage, - retry_count) if retry_count else - coordinator_cls(member_id, shared_storage)): - pc = coordination.PartitionCoordinator(self.CONF, agent_id) - pc.start() - self.addCleanup(pc.stop) - return pc - - def _usage_simulation(self, *agents_kwargs): - partition_coordinators = [] - for kwargs in agents_kwargs: - partition_coordinator = self._get_new_started_coordinator( - self.shared_storage, kwargs['agent_id'], kwargs.get( - 'coordinator_cls')) - partition_coordinator.join_group(kwargs['group_id']) - partition_coordinators.append(partition_coordinator) - - for i, kwargs in enumerate(agents_kwargs): - all_resources = kwargs.get('all_resources', []) - expected_resources = kwargs.get('expected_resources', []) - actual_resources = partition_coordinators[i].extract_my_subset( - kwargs['group_id'], all_resources) - self.assertEqual(expected_resources, actual_resources) - - def test_single_group(self): - agents = [dict(agent_id='agent1', group_id='group'), - dict(agent_id='agent2', group_id='group')] - self._usage_simulation(*agents) - - self.assertEqual(['group'], sorted(self.shared_storage.keys())) - self.assertEqual(['agent1', 'agent2'], - sorted(self.shared_storage['group'].keys())) - - def test_multiple_groups(self): - agents = [dict(agent_id='agent1', group_id='group1'), - dict(agent_id='agent2', group_id='group2')] - self._usage_simulation(*agents) - - self.assertEqual(['group1', 'group2'], - sorted(self.shared_storage.keys())) - - def test_partitioning(self): - all_resources = ['resource_%s' % i for i in range(1000)] - agents = ['agent_%s' % i for i in range(10)] - - expected_resources = [list() for _ in range(len(agents))] - hr = hashring.HashRing(agents, partitions=100) - for r in all_resources: - encode = coordination.PartitionCoordinator.encode_task - key = agents.index(list(hr.get_nodes(encode(r)))[0]) - expected_resources[key].append(r) - - agents_kwargs = [] - for i, agent in enumerate(agents): - agents_kwargs.append(dict(agent_id=agent, - group_id='group', - all_resources=all_resources, - expected_resources=expected_resources[i])) - self._usage_simulation(*agents_kwargs) - - def test_coordination_backend_connection_fail_on_join(self): - coord = self._get_new_started_coordinator( - {'group': {}}, 'agent1', MockToozCoordExceptionOnJoinRaiser, - retry_count=2) - with mock.patch('tooz.coordination.get_coordinator', - return_value=MockToozCoordExceptionOnJoinRaiser): - coord.join_group(group_id='group') - - expected_errors = ['Error joining partitioning group group,' - ' re-trying', - 'Error joining partitioning group group,' - ' re-trying'] - self.assertEqual(expected_errors, self.str_handler.messages['error']) - - def test_partitioning_with_unicode(self): - all_resources = [u'\u0634\u0628\u06a9\u0647', - u'\u0627\u0647\u0644', - u'\u0645\u062d\u0628\u0627\u0646'] - agents = ['agent_%s' % i for i in range(2)] - - expected_resources = [list() for _ in range(len(agents))] - hr = hashring.HashRing(agents, partitions=100) - for r in all_resources: - encode = coordination.PartitionCoordinator.encode_task - key = agents.index(list(hr.get_nodes(encode(r)))[0]) - expected_resources[key].append(r) - - agents_kwargs = [] - for i, agent in enumerate(agents): - agents_kwargs.append(dict(agent_id=agent, - group_id='group', - all_resources=all_resources, - expected_resources=expected_resources[i])) - self._usage_simulation(*agents_kwargs) diff --git a/releasenotes/notes/tooz-coordination-system-d1054b9d1a5ddf32.yaml b/releasenotes/notes/tooz-coordination-system-d1054b9d1a5ddf32.yaml new file mode 100644 index 0000000000..99638e15fd --- /dev/null +++ b/releasenotes/notes/tooz-coordination-system-d1054b9d1a5ddf32.yaml @@ -0,0 +1,6 @@ +--- +upgrade: + - | + Ceilometer now leverages the latest distribution mechanism provided by the + tooz library. Therefore the options `coordination.retry_backoff` and + `coordination.max_retry_interval` do not exist anymore.