From 000c5d89a38997edc8332c24cdb737a3d0eb9410 Mon Sep 17 00:00:00 2001 From: gord chung Date: Wed, 4 Oct 2017 19:16:18 +0000 Subject: [PATCH] common notification endpoint make samples and events use a common endpoint class Change-Id: I1d15783721f91ee90adfbac88cef2a44e0b23868 --- ceilometer/agent/plugin_base.py | 96 ----------------- ceilometer/ipmi/notifications/ironic.py | 4 +- ceilometer/meter/notifications.py | 10 +- ceilometer/middleware.py | 4 +- ceilometer/notification.py | 20 +--- .../{pipeline.py => pipeline/__init__.py} | 102 +++++++++++++++++- .../{event/endpoint.py => pipeline/event.py} | 19 ++-- ceilometer/pipeline/sample.py | 46 ++++++++ ceilometer/telemetry/notifications.py | 24 +---- ceilometer/tests/unit/event/test_endpoint.py | 9 +- ceilometer/tests/unit/test_notification.py | 6 +- doc/source/contributor/plugins.rst | 8 +- 12 files changed, 182 insertions(+), 166 deletions(-) rename ceilometer/{pipeline.py => pipeline/__init__.py} (90%) rename ceilometer/{event/endpoint.py => pipeline/event.py} (82%) create mode 100644 ceilometer/pipeline/sample.py diff --git a/ceilometer/agent/plugin_base.py b/ceilometer/agent/plugin_base.py index a7eb5fb3f1..38f33ede02 100644 --- a/ceilometer/agent/plugin_base.py +++ b/ceilometer/agent/plugin_base.py @@ -17,110 +17,14 @@ import abc -from oslo_log import log -import oslo_messaging import six from stevedore import extension -LOG = log.getLogger(__name__) - class PluginBase(object): """Base class for all plugins.""" -@six.add_metaclass(abc.ABCMeta) -class NotificationBase(PluginBase): - """Base class for plugins that support the notification API.""" - def __init__(self, manager): - super(NotificationBase, self).__init__() - # NOTE(gordc): this is filter rule used by oslo.messaging to dispatch - # messages to an endpoint. - if self.event_types: - self.filter_rule = oslo_messaging.NotificationFilter( - event_type='|'.join(self.event_types)) - self.manager = manager - - @staticmethod - def get_notification_topics(conf): - if 'notification_topics' in conf: - return conf.notification_topics - return conf.oslo_messaging_notifications.topics - - @abc.abstractproperty - def event_types(self): - """Return a sequence of strings. - - Strings are defining the event types to be given to this plugin. - """ - - @abc.abstractmethod - def get_targets(self, conf): - """Return a sequence of oslo.messaging.Target. - - Sequence is defining the exchange and topics to be connected for this - plugin. - :param conf: Configuration. - """ - - @abc.abstractmethod - def process_notification(self, message): - """Return a sequence of Counter instances for the given message. - - :param message: Message to process. - """ - - @staticmethod - def _consume_and_drop(notifications): - """RPC endpoint for useless notification level""" - # NOTE(sileht): nothing special todo here, but because we listen - # for the generic notification exchange we have to consume all its - # queues - - audit = _consume_and_drop - debug = _consume_and_drop - warn = _consume_and_drop - error = _consume_and_drop - critical = _consume_and_drop - - def info(self, notifications): - """RPC endpoint for notification messages at info level - - When another service sends a notification over the message - bus, this method receives it. - - :param notifications: list of notifications - """ - self._process_notifications('info', notifications) - - def sample(self, notifications): - """RPC endpoint for notification messages at sample level - - When another service sends a notification over the message - bus at sample priority, this method receives it. - - :param notifications: list of notifications - """ - self._process_notifications('sample', notifications) - - def _process_notifications(self, priority, notifications): - for notification in notifications: - try: - self.to_samples_and_publish(notification) - except Exception: - LOG.error('Fail to process notification', exc_info=True) - - def to_samples_and_publish(self, notification): - """Return samples produced by *process_notification*. - - Samples produced for the given notification. - :param context: Execution context from the service or RPC call - :param notification: The notification to process. - """ - with self.manager.publisher() as p: - p(list(self.process_notification(notification))) - - class ExtensionLoadError(Exception): """Error of loading pollster plugin. diff --git a/ceilometer/ipmi/notifications/ironic.py b/ceilometer/ipmi/notifications/ironic.py index e15ffd56e8..c93710fb8a 100644 --- a/ceilometer/ipmi/notifications/ironic.py +++ b/ceilometer/ipmi/notifications/ironic.py @@ -18,7 +18,7 @@ notification events. from oslo_log import log -from ceilometer import notification +from ceilometer.pipeline import sample as endpoint from ceilometer import sample LOG = log.getLogger(__name__) @@ -54,7 +54,7 @@ class InvalidSensorData(ValueError): pass -class SensorNotification(notification.NotificationProcessBase): +class SensorNotification(endpoint.SampleEndpoint): """A generic class for extracting samples from sensor data notifications. A notification message can contain multiple samples from multiple diff --git a/ceilometer/meter/notifications.py b/ceilometer/meter/notifications.py index d55c148351..2f4c30fad7 100644 --- a/ceilometer/meter/notifications.py +++ b/ceilometer/meter/notifications.py @@ -24,7 +24,7 @@ from stevedore import extension from ceilometer import declarative from ceilometer.i18n import _ -from ceilometer import notification +from ceilometer.pipeline import sample as endpoint from ceilometer import sample as sample_util OPTS = [ @@ -182,7 +182,7 @@ class MeterDefinition(object): yield sample -class ProcessMeterNotifications(notification.NotificationProcessBase): +class ProcessMeterNotifications(endpoint.SampleEndpoint): event_types = [] @@ -224,8 +224,8 @@ class ProcessMeterNotifications(notification.NotificationProcessBase): definitions[meter_cfg['name']] = md return definitions.values() - def process_notification(self, notification_body): + def process_notification(self, notification): for d in self.definitions: - if d.match_type(notification_body['event_type']): - for s in d.to_samples(notification_body): + if d.match_type(notification['event_type']): + for s in d.to_samples(notification): yield sample_util.Sample.from_notification(**s) diff --git a/ceilometer/middleware.py b/ceilometer/middleware.py index 0f2380197b..92cc1e1e34 100644 --- a/ceilometer/middleware.py +++ b/ceilometer/middleware.py @@ -13,11 +13,11 @@ # License for the specific language governing permissions and limitations # under the License. -from ceilometer import notification +from ceilometer.pipeline import sample as endpoint from ceilometer import sample -class HTTPRequest(notification.NotificationProcessBase): +class HTTPRequest(endpoint.SampleEndpoint): event_types = ['http.request'] def process_notification(self, message): diff --git a/ceilometer/notification.py b/ceilometer/notification.py index 89b4fe43da..e12cd749da 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -13,13 +13,11 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - import itertools import threading import time import uuid -from ceilometer.agent import plugin_base from concurrent import futures import cotyledon from futurist import periodics @@ -29,10 +27,10 @@ import oslo_messaging from stevedore import extension from tooz import coordination -from ceilometer.event import endpoint as event_endpoint from ceilometer.i18n import _ from ceilometer import messaging from ceilometer import pipeline +from ceilometer.pipeline import event as event_endpoint from ceilometer import utils @@ -236,7 +234,7 @@ class NotificationService(cotyledon.Service): endpoints = [] endpoints.append( - event_endpoint.EventsNotificationEndpoint(event_pipe_manager)) + event_endpoint.EventEndpoint(event_pipe_manager)) targets = [] for ext in notification_manager: @@ -326,17 +324,3 @@ class NotificationService(cotyledon.Service): utils.kill_listeners(self.listeners) super(NotificationService, self).terminate() - - -class NotificationProcessBase(plugin_base.NotificationBase): - - def get_targets(self, conf): - """Return a sequence of oslo_messaging.Target - - This sequence is defining the exchange and topics to be connected for - this plugin. - """ - return [oslo_messaging.Target(topic=topic, exchange=exchange) - for topic in self.get_notification_topics(conf) - for exchange in - conf.notification.notification_control_exchanges] diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline/__init__.py similarity index 90% rename from ceilometer/pipeline.py rename to ceilometer/pipeline/__init__.py index c2b3f92b7a..e54328c548 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline/__init__.py @@ -651,7 +651,7 @@ class ConfigManagerBase(object): self.conf = conf self.cfg_loc = None - def load_config(self, cfg_file, fallback_cfg_prefix='pipeline/data/'): + def load_config(self, cfg_file, fallback_cfg_prefix='data/'): """Load a configuration file and set its refresh values.""" if os.path.exists(cfg_file): self.cfg_loc = cfg_file @@ -906,3 +906,103 @@ def get_pipeline_grouping_key(pipe): for transformer in pipe.sink.transformers: keys += transformer.grouping_keys return list(set(keys)) + + +class NotificationEndpoint(object): + """Base Endpoint for plugins that support the notification API.""" + + def __init__(self, manager): + super(NotificationEndpoint, self).__init__() + # NOTE(gordc): this is filter rule used by oslo.messaging to dispatch + # messages to an endpoint. + if self.event_types: + self.filter_rule = oslo_messaging.NotificationFilter( + event_type='|'.join(self.event_types)) + self.manager = manager + + @staticmethod + def get_notification_topics(conf): + if 'notification_topics' in conf: + return conf.notification_topics + return conf.oslo_messaging_notifications.topics + + def get_targets(self, conf): + """Return a sequence of oslo_messaging.Target + + This sequence is defining the exchange and topics to be connected for + this plugin. + """ + return [oslo_messaging.Target(topic=topic, exchange=exchange) + for topic in self.get_notification_topics(conf) + for exchange in + conf.notification.notification_control_exchanges] + + @abc.abstractproperty + def event_types(self): + """Return a sequence of strings to filter on. + + Strings are defining the event types to be given to this plugin. + """ + + @abc.abstractmethod + def process_notifications(self, priority, notifications): + """Return a sequence of Counter instances for the given message. + + :param message: Message to process. + """ + + @staticmethod + def _consume_and_drop(notifications): + """RPC endpoint for useless notification level""" + # NOTE(sileht): nothing special todo here, but because we listen + # for the generic notification exchange we have to consume all its + # queues + + def audit(self, notifications): + """endpoint for notification messages at audit level + + :param notifications: list of notifications + """ + self._consume_and_drop(notifications) + + def critical(self, notifications): + """endpoint for notification messages at critical level + + :param notifications: list of notifications + """ + self._consume_and_drop(notifications) + + def debug(self, notifications): + """endpoint for notification messages at debug level + + :param notifications: list of notifications + """ + self._consume_and_drop(notifications) + + def error(self, notifications): + """endpoint for notification messages at error level + + :param notifications: list of notifications + """ + self._consume_and_drop(notifications) + + def info(self, notifications): + """endpoint for notification messages at info level + + :param notifications: list of notifications + """ + self._consume_and_drop(notifications) + + def sample(self, notifications): + """endpoint for notification messages at sample level + + :param notifications: list of notifications + """ + self._consume_and_drop(notifications) + + def warn(self, notifications): + """endpoint for notification messages at warn level + + :param notifications: list of notifications + """ + self._consume_and_drop(notifications) diff --git a/ceilometer/event/endpoint.py b/ceilometer/pipeline/event.py similarity index 82% rename from ceilometer/event/endpoint.py rename to ceilometer/pipeline/event.py index 8aa9cf28f5..6f1423471b 100644 --- a/ceilometer/event/endpoint.py +++ b/ceilometer/pipeline/event.py @@ -17,38 +17,41 @@ import oslo_messaging from stevedore import extension from ceilometer.event import converter +from ceilometer import pipeline LOG = log.getLogger(__name__) -class EventsNotificationEndpoint(object): +class EventEndpoint(pipeline.NotificationEndpoint): + + event_types = [] + def __init__(self, manager): - super(EventsNotificationEndpoint, self).__init__() + super(EventEndpoint, self).__init__(manager) LOG.debug('Loading event definitions') self.event_converter = converter.setup_events( manager.conf, extension.ExtensionManager( namespace='ceilometer.event.trait_plugin')) - self.manager = manager def info(self, notifications): """Convert message at info level to Ceilometer Event. :param notifications: list of notifications """ - return self.process_notification('info', notifications) + return self.process_notifications('info', notifications) def error(self, notifications): """Convert message at error level to Ceilometer Event. :param notifications: list of notifications """ - return self.process_notification('error', notifications) + return self.process_notifications('error', notifications) - def process_notification(self, priority, notifications): - for notification in notifications: + def process_notifications(self, priority, notifications): + for message in notifications: try: - event = self.event_converter.to_event(priority, notification) + event = self.event_converter.to_event(priority, message) if event is not None: with self.manager.publisher() as p: p(event) diff --git a/ceilometer/pipeline/sample.py b/ceilometer/pipeline/sample.py new file mode 100644 index 0000000000..64be1cd2f4 --- /dev/null +++ b/ceilometer/pipeline/sample.py @@ -0,0 +1,46 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from oslo_log import log + +from ceilometer import pipeline + +LOG = log.getLogger(__name__) + + +class SampleEndpoint(pipeline.NotificationEndpoint): + + def info(self, notifications): + """Convert message at info level to Ceilometer sample. + + :param notifications: list of notifications + """ + return self.process_notifications('info', notifications) + + def sample(self, notifications): + """Convert message at sample level to Ceilometer Event. + + :param notifications: list of notifications + """ + return self.process_notifications('sample', notifications) + + def process_notifications(self, priority, notifications): + for message in notifications: + try: + with self.manager.publisher() as p: + p(list(self.process_notification(message))) + except Exception: + LOG.error('Fail to process notification', exc_info=True) + + def process_notification(notification): + """Build sample from provided notification.""" + pass diff --git a/ceilometer/telemetry/notifications.py b/ceilometer/telemetry/notifications.py index 77983f3bf7..5868480f07 100644 --- a/ceilometer/telemetry/notifications.py +++ b/ceilometer/telemetry/notifications.py @@ -10,31 +10,11 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - -import oslo_messaging - -from ceilometer.agent import plugin_base +from ceilometer.pipeline import sample as endpoint from ceilometer import sample -class TelemetryBase(plugin_base.NotificationBase): - """Convert telemetry notification into Samples.""" - - def get_targets(self, conf): - """Return a sequence of oslo_messaging.Target - - Sequence defining the exchange and topics to be connected for this - plugin. - """ - return [ - oslo_messaging.Target( - topic=topic, - exchange=conf.notification.notification_control_exchanges[0]) - for topic in self.get_notification_topics(conf) - ] - - -class TelemetryIpc(TelemetryBase): +class TelemetryIpc(endpoint.SampleEndpoint): """Handle sample from notification bus Telemetry samples polled by polling agent. diff --git a/ceilometer/tests/unit/event/test_endpoint.py b/ceilometer/tests/unit/event/test_endpoint.py index cedc47a5a8..1982685803 100644 --- a/ceilometer/tests/unit/event/test_endpoint.py +++ b/ceilometer/tests/unit/event/test_endpoint.py @@ -21,8 +21,8 @@ from oslo_utils import fileutils import six import yaml -from ceilometer.event import endpoint as event_endpoint from ceilometer import pipeline +from ceilometer.pipeline import event as event_endpoint from ceilometer import publisher from ceilometer.publisher import test from ceilometer import service @@ -114,8 +114,7 @@ class TestEventEndpoint(tests_base.BaseTestCase): def _setup_endpoint(self, publishers): ev_pipeline_mgr = self._setup_pipeline(publishers) - self.endpoint = event_endpoint.EventsNotificationEndpoint( - ev_pipeline_mgr) + self.endpoint = event_endpoint.EventEndpoint(ev_pipeline_mgr) self.endpoint.event_converter = mock.MagicMock() self.endpoint.event_converter.to_event.return_value = mock.MagicMock( @@ -167,7 +166,7 @@ class TestEventEndpoint(tests_base.BaseTestCase): 'ctxt': {} } with mock.patch("ceilometer.pipeline.LOG") as mock_logger: - ret = self.endpoint.process_notification('info', [message]) + ret = self.endpoint.process_notifications('info', [message]) self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret) exception_mock = mock_logger.error self.assertIn('Exit after error from publisher', @@ -187,7 +186,7 @@ class TestEventEndpoint(tests_base.BaseTestCase): 'ctxt': {} } with mock.patch("ceilometer.pipeline.LOG") as mock_logger: - ret = self.endpoint.process_notification('info', [message]) + ret = self.endpoint.process_notifications('info', [message]) self.assertEqual(oslo_messaging.NotificationResult.HANDLED, ret) exception_mock = mock_logger.error self.assertIn('Continue after error from publisher', diff --git a/ceilometer/tests/unit/test_notification.py b/ceilometer/tests/unit/test_notification.py index bc6aa35d15..b51ac1a413 100644 --- a/ceilometer/tests/unit/test_notification.py +++ b/ceilometer/tests/unit/test_notification.py @@ -23,9 +23,9 @@ import six from stevedore import extension import yaml -from ceilometer.agent import plugin_base from ceilometer import messaging from ceilometer import notification +from ceilometer import pipeline from ceilometer.publisher import test as test_publisher from ceilometer import service from ceilometer.tests import base as tests_base @@ -82,7 +82,7 @@ TEST_NOTICE_PAYLOAD = { } -class _FakeNotificationPlugin(plugin_base.NotificationBase): +class _FakeNotificationPlugin(pipeline.NotificationEndpoint): event_types = ['fake.event'] def get_targets(self, conf): @@ -117,7 +117,7 @@ class TestNotification(tests_base.BaseTestCase): ] ) - @mock.patch('ceilometer.event.endpoint.EventsNotificationEndpoint') + @mock.patch('ceilometer.pipeline.event.EventEndpoint') def _do_process_notification_manager_start(self, fake_event_endpoint_class): with mock.patch.object(self.srv, diff --git a/doc/source/contributor/plugins.rst b/doc/source/contributor/plugins.rst index b20d4d70f8..c485fbd884 100644 --- a/doc/source/contributor/plugins.rst +++ b/doc/source/contributor/plugins.rst @@ -113,16 +113,16 @@ Notifications suffice. Notifications are defined as subclass of the -:class:`ceilometer.agent.plugin_base.NotificationBase` meta class. +:class:`ceilometer.notification.NotificationEndpoint` meta class. Notifications must implement: ``event_types`` A sequence of strings defining the event types to be given to the plugin -``process_notification(self, message)`` +``process_notifications(self, priority, message)`` Receives an event message from the list provided to ``event_types`` and - returns a sequence of ``Sample`` objects as defined in the - :file:`ceilometer/sample.py` file. + returns a sequence of objects. Using the SampleEndpoint, it should yield + ``Sample`` objects as defined in the :file:`ceilometer/sample.py` file. In the ``InstanceNotifications`` plugin, it listens to three events: