diff --git a/ceilometer/coordination.py b/ceilometer/coordination.py index 50c9b202b9..0a31fe4f5e 100644 --- a/ceilometer/coordination.py +++ b/ceilometer/coordination.py @@ -167,7 +167,7 @@ class PartitionCoordinator(object): self.join_group(group_id) try: members = self._get_members(group_id) - LOG.debug('Members of group: %s', members) + LOG.debug('Members of group: %s, Me: %s', members, self._my_id) hr = utils.HashRing(members) filtered = [v for v in iterable if hr.get_node(str(v)) == self._my_id] diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 5de404ff48..41dba88094 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. import itertools +import threading from oslo_config import cfg from oslo_context import context @@ -99,6 +100,7 @@ class NotificationService(service_base.BaseService): super(NotificationService, self).__init__(*args, **kwargs) self.partition_coordinator = None self.listeners, self.pipeline_listeners = [], [] + self.coord_lock = threading.Lock() self.group_id = None @classmethod @@ -162,7 +164,6 @@ class NotificationService(service_base.BaseService): self.group_id = self.NOTIFICATION_NAMESPACE self.partition_coordinator = coordination.PartitionCoordinator() self.partition_coordinator.start() - self.partition_coordinator.join_group(self.group_id) else: # FIXME(sileht): endpoint uses the notification_topics option # and it should not because this is an oslo_messaging option @@ -182,14 +183,16 @@ class NotificationService(service_base.BaseService): self.event_pipe_manager) if cfg.CONF.notification.workload_partitioning: - self._configure_pipeline_listeners() + # join group after all manager set up is configured + self.partition_coordinator.join_group(self.group_id) self.partition_coordinator.watch_group(self.group_id, self._refresh_agent) - self.tg.add_timer(cfg.CONF.coordination.heartbeat, self.partition_coordinator.heartbeat) self.tg.add_timer(cfg.CONF.coordination.check_watchers, self.partition_coordinator.run_watchers) + # configure pipelines after all coordination is configured. + self._configure_pipeline_listeners() if not cfg.CONF.notification.disable_non_metric_meters: LOG.warning(_LW('Non-metric meters may be collected. It is highly ' @@ -247,49 +250,50 @@ class NotificationService(service_base.BaseService): self._configure_pipeline_listeners(True) def _configure_pipeline_listeners(self, reuse_listeners=False): - ev_pipes = [] - if cfg.CONF.notification.store_events: - ev_pipes = self.event_pipeline_manager.pipelines - pipelines = self.pipeline_manager.pipelines + ev_pipes - transport = messaging.get_transport() - partitioned = self.partition_coordinator.extract_my_subset( - self.group_id, - range(cfg.CONF.notification.pipeline_processing_queues)) + with self.coord_lock: + ev_pipes = [] + if cfg.CONF.notification.store_events: + ev_pipes = self.event_pipeline_manager.pipelines + pipelines = self.pipeline_manager.pipelines + ev_pipes + transport = messaging.get_transport() + partitioned = self.partition_coordinator.extract_my_subset( + self.group_id, + range(cfg.CONF.notification.pipeline_processing_queues)) - queue_set = {} - for pipe_set, pipe in itertools.product(partitioned, pipelines): - queue_set['%s-%s-%s' % - (self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe + queue_set = {} + for pipe_set, pipe in itertools.product(partitioned, pipelines): + queue_set['%s-%s-%s' % + (self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe - if reuse_listeners: - topics = queue_set.keys() - kill_list = [] - for listener in self.pipeline_listeners: - if listener.dispatcher.targets[0].topic in topics: - queue_set.pop(listener.dispatcher.targets[0].topic) - else: - kill_list.append(listener) - for listener in kill_list: - utils.kill_listeners([listener]) - self.pipeline_listeners.remove(listener) - else: - utils.kill_listeners(self.pipeline_listeners) - self.pipeline_listeners = [] + if reuse_listeners: + topics = queue_set.keys() + kill_list = [] + for listener in self.pipeline_listeners: + if listener.dispatcher.targets[0].topic in topics: + queue_set.pop(listener.dispatcher.targets[0].topic) + else: + kill_list.append(listener) + for listener in kill_list: + utils.kill_listeners([listener]) + self.pipeline_listeners.remove(listener) + else: + utils.kill_listeners(self.pipeline_listeners) + self.pipeline_listeners = [] - for topic, pipe in queue_set.items(): - LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name, - pipe_set) - pipe_endpoint = (pipeline.EventPipelineEndpoint - if isinstance(pipe, pipeline.EventPipeline) - else pipeline.SamplePipelineEndpoint) - listener = messaging.get_batch_notification_listener( - transport, - [oslo_messaging.Target(topic=topic)], - [pipe_endpoint(self.ctxt, pipe)], - batch_size=cfg.CONF.notification.batch_size, - batch_timeout=cfg.CONF.notification.batch_timeout) - listener.start() - self.pipeline_listeners.append(listener) + for topic, pipe in queue_set.items(): + LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name, + pipe_set) + pipe_endpoint = (pipeline.EventPipelineEndpoint + if isinstance(pipe, pipeline.EventPipeline) + else pipeline.SamplePipelineEndpoint) + listener = messaging.get_batch_notification_listener( + transport, + [oslo_messaging.Target(topic=topic)], + [pipe_endpoint(self.ctxt, pipe)], + batch_size=cfg.CONF.notification.batch_size, + batch_timeout=cfg.CONF.notification.batch_timeout) + listener.start() + self.pipeline_listeners.append(listener) def stop(self): if self.partition_coordinator: diff --git a/releasenotes/notes/fix-agent-coordination-a7103a78fecaec24.yaml b/releasenotes/notes/fix-agent-coordination-a7103a78fecaec24.yaml new file mode 100644 index 0000000000..45794a74b5 --- /dev/null +++ b/releasenotes/notes/fix-agent-coordination-a7103a78fecaec24.yaml @@ -0,0 +1,9 @@ +--- +critical: + - > + [`bug 1533787 `_] + Fix an issue where agents are not properly getting registered to group + when multiple notification agents are deployed. This can result in + bad transformation as the agents are not coordinated. It is still + recommended to set heartbeat_timeout_threshold = 0 in + [oslo_messaging_rabbit] section when deploying multiple agents.