Merge "retain existing listeners on refresh"

This commit is contained in:
Jenkins 2015-09-21 13:43:00 +00:00 committed by Gerrit Code Review
commit 20624895f1
2 changed files with 59 additions and 25 deletions

View File

@ -12,6 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import itertools
from oslo_config import cfg from oslo_config import cfg
from oslo_context import context from oslo_context import context
@ -233,14 +234,9 @@ class NotificationService(service_base.BaseService):
self.listeners.append(listener) self.listeners.append(listener)
def _refresh_agent(self, event): def _refresh_agent(self, event):
self._refresh_listeners() self._configure_pipeline_listeners(True)
def _refresh_listeners(self): def _configure_pipeline_listeners(self, reuse_listeners=False):
utils.kill_listeners(self.pipeline_listeners)
self._configure_pipeline_listeners()
def _configure_pipeline_listeners(self):
self.pipeline_listeners = []
ev_pipes = [] ev_pipes = []
if cfg.CONF.notification.store_events: if cfg.CONF.notification.store_events:
ev_pipes = self.event_pipeline_manager.pipelines ev_pipes = self.event_pipeline_manager.pipelines
@ -249,21 +245,39 @@ class NotificationService(service_base.BaseService):
partitioned = self.partition_coordinator.extract_my_subset( partitioned = self.partition_coordinator.extract_my_subset(
self.group_id, self.group_id,
range(cfg.CONF.notification.pipeline_processing_queues)) range(cfg.CONF.notification.pipeline_processing_queues))
for pipe_set in partitioned:
for pipe in pipelines: queue_set = {}
LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name, for pipe_set, pipe in itertools.product(partitioned, pipelines):
pipe_set) queue_set['%s-%s-%s' %
pipe_endpoint = (pipeline.EventPipelineEndpoint (self.NOTIFICATION_IPC, pipe.name, pipe_set)] = pipe
if isinstance(pipe, pipeline.EventPipeline)
else pipeline.SamplePipelineEndpoint) if reuse_listeners:
listener = messaging.get_notification_listener( topics = queue_set.keys()
transport, kill_list = []
[oslo_messaging.Target( for listener in self.pipeline_listeners:
topic='%s-%s-%s' % (self.NOTIFICATION_IPC, if listener.dispatcher.targets[0].topic in topics:
pipe.name, pipe_set))], queue_set.pop(listener.dispatcher.targets[0].topic)
[pipe_endpoint(self.ctxt, pipe)]) else:
listener.start() kill_list.append(listener)
self.pipeline_listeners.append(listener) for listener in kill_list:
utils.kill_listeners([listener])
self.pipeline_listeners.remove(listener)
else:
utils.kill_listeners(self.pipeline_listeners)
self.pipeline_listeners = []
for topic, pipe in queue_set.items():
LOG.debug('Pipeline endpoint: %s from set: %s', pipe.name,
pipe_set)
pipe_endpoint = (pipeline.EventPipelineEndpoint
if isinstance(pipe, pipeline.EventPipeline)
else pipeline.SamplePipelineEndpoint)
listener = messaging.get_notification_listener(
transport,
[oslo_messaging.Target(topic=topic)],
[pipe_endpoint(self.ctxt, pipe)])
listener.start()
self.pipeline_listeners.append(listener)
def stop(self): def stop(self):
if self.partition_coordinator: if self.partition_coordinator:
@ -290,4 +304,4 @@ class NotificationService(service_base.BaseService):
# re-start the pipeline listeners if workload partitioning # re-start the pipeline listeners if workload partitioning
# is enabled. # is enabled.
if cfg.CONF.notification.workload_partitioning: if cfg.CONF.notification.workload_partitioning:
self._refresh_listeners() self._configure_pipeline_listeners()

View File

@ -455,9 +455,29 @@ class TestRealNotificationHA(BaseRealNotification):
def test_reset_listeners_on_refresh(self): def test_reset_listeners_on_refresh(self):
self.srv.start() self.srv.start()
listeners = self.srv.pipeline_listeners
self.assertEqual(20, len(listeners))
self.srv._configure_pipeline_listeners()
self.assertEqual(20, len(self.srv.pipeline_listeners)) self.assertEqual(20, len(self.srv.pipeline_listeners))
self.srv._refresh_listeners() for listener in listeners:
self.assertEqual(20, len(self.srv.pipeline_listeners)) self.assertNotIn(listeners, set(self.srv.pipeline_listeners))
self.srv.stop()
def test_retain_common_listeners_on_refresh(self):
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'.extract_my_subset', return_value=[1, 2]):
self.srv.start()
self.assertEqual(4, len(self.srv.pipeline_listeners))
listeners = [listener for listener in self.srv.pipeline_listeners]
with mock.patch('ceilometer.coordination.PartitionCoordinator'
'.extract_my_subset', return_value=[1, 3]):
self.srv._refresh_agent(None)
self.assertEqual(4, len(self.srv.pipeline_listeners))
for listener in listeners:
if listener.dispatcher.targets[0].topic.endswith('1'):
self.assertIn(listener, set(self.srv.pipeline_listeners))
else:
self.assertNotIn(listener, set(self.srv.pipeline_listeners))
self.srv.stop() self.srv.stop()
@mock.patch('oslo_messaging.Notifier.sample') @mock.patch('oslo_messaging.Notifier.sample')