common notification endpoint

make samples and events use a common endpoint class

Change-Id: I1d15783721f91ee90adfbac88cef2a44e0b23868
This commit is contained in:
gord chung 2017-10-04 19:16:18 +00:00
parent 98444a2515
commit 000c5d89a3
12 changed files with 182 additions and 166 deletions

View File

@ -17,110 +17,14 @@
import abc
from oslo_log import log
import oslo_messaging
import six
from stevedore import extension
LOG = log.getLogger(__name__)
class PluginBase(object):
"""Base class for all plugins."""
@six.add_metaclass(abc.ABCMeta)
class NotificationBase(PluginBase):
"""Base class for plugins that support the notification API."""
def __init__(self, manager):
super(NotificationBase, self).__init__()
# NOTE(gordc): this is filter rule used by oslo.messaging to dispatch
# messages to an endpoint.
if self.event_types:
self.filter_rule = oslo_messaging.NotificationFilter(
event_type='|'.join(self.event_types))
self.manager = manager
@staticmethod
def get_notification_topics(conf):
if 'notification_topics' in conf:
return conf.notification_topics
return conf.oslo_messaging_notifications.topics
@abc.abstractproperty
def event_types(self):
"""Return a sequence of strings.
Strings are defining the event types to be given to this plugin.
"""
@abc.abstractmethod
def get_targets(self, conf):
"""Return a sequence of oslo.messaging.Target.
Sequence is defining the exchange and topics to be connected for this
plugin.
:param conf: Configuration.
"""
@abc.abstractmethod
def process_notification(self, message):
"""Return a sequence of Counter instances for the given message.
:param message: Message to process.
"""
@staticmethod
def _consume_and_drop(notifications):
"""RPC endpoint for useless notification level"""
# NOTE(sileht): nothing special todo here, but because we listen
# for the generic notification exchange we have to consume all its
# queues
audit = _consume_and_drop
debug = _consume_and_drop
warn = _consume_and_drop
error = _consume_and_drop
critical = _consume_and_drop
def info(self, notifications):
"""RPC endpoint for notification messages at info level
When another service sends a notification over the message
bus, this method receives it.
:param notifications: list of notifications
"""
self._process_notifications('info', notifications)
def sample(self, notifications):
"""RPC endpoint for notification messages at sample level
When another service sends a notification over the message
bus at sample priority, this method receives it.
:param notifications: list of notifications
"""
self._process_notifications('sample', notifications)
def _process_notifications(self, priority, notifications):
for notification in notifications:
try:
self.to_samples_and_publish(notification)
except Exception:
LOG.error('Fail to process notification', exc_info=True)
def to_samples_and_publish(self, notification):
"""Return samples produced by *process_notification*.
Samples produced for the given notification.
:param context: Execution context from the service or RPC call
:param notification: The notification to process.
"""
with self.manager.publisher() as p:
p(list(self.process_notification(notification)))
class ExtensionLoadError(Exception):
"""Error of loading pollster plugin.

View File

@ -18,7 +18,7 @@ notification events.
from oslo_log import log
from ceilometer import notification
from ceilometer.pipeline import sample as endpoint
from ceilometer import sample
LOG = log.getLogger(__name__)
@ -54,7 +54,7 @@ class InvalidSensorData(ValueError):
pass
class SensorNotification(notification.NotificationProcessBase):
class SensorNotification(endpoint.SampleEndpoint):
"""A generic class for extracting samples from sensor data notifications.
A notification message can contain multiple samples from multiple

View File

@ -24,7 +24,7 @@ from stevedore import extension
from ceilometer import declarative
from ceilometer.i18n import _
from ceilometer import notification
from ceilometer.pipeline import sample as endpoint
from ceilometer import sample as sample_util
OPTS = [
@ -182,7 +182,7 @@ class MeterDefinition(object):
yield sample
class ProcessMeterNotifications(notification.NotificationProcessBase):
class ProcessMeterNotifications(endpoint.SampleEndpoint):
event_types = []
@ -224,8 +224,8 @@ class ProcessMeterNotifications(notification.NotificationProcessBase):
definitions[meter_cfg['name']] = md
return definitions.values()
def process_notification(self, notification_body):
def process_notification(self, notification):
for d in self.definitions:
if d.match_type(notification_body['event_type']):
for s in d.to_samples(notification_body):
if d.match_type(notification['event_type']):
for s in d.to_samples(notification):
yield sample_util.Sample.from_notification(**s)

View File

@ -13,11 +13,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from ceilometer import notification
from ceilometer.pipeline import sample as endpoint
from ceilometer import sample
class HTTPRequest(notification.NotificationProcessBase):
class HTTPRequest(endpoint.SampleEndpoint):
event_types = ['http.request']
def process_notification(self, message):

View File

@ -13,13 +13,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
import threading
import time
import uuid
from ceilometer.agent import plugin_base
from concurrent import futures
import cotyledon
from futurist import periodics
@ -29,10 +27,10 @@ import oslo_messaging
from stevedore import extension
from tooz import coordination
from ceilometer.event import endpoint as event_endpoint
from ceilometer.i18n import _
from ceilometer import messaging
from ceilometer import pipeline
from ceilometer.pipeline import event as event_endpoint
from ceilometer import utils
@ -236,7 +234,7 @@ class NotificationService(cotyledon.Service):
endpoints = []
endpoints.append(
event_endpoint.EventsNotificationEndpoint(event_pipe_manager))
event_endpoint.EventEndpoint(event_pipe_manager))
targets = []
for ext in notification_manager:
@ -326,17 +324,3 @@ class NotificationService(cotyledon.Service):
utils.kill_listeners(self.listeners)
super(NotificationService, self).terminate()
class NotificationProcessBase(plugin_base.NotificationBase):
def get_targets(self, conf):
"""Return a sequence of oslo_messaging.Target
This sequence is defining the exchange and topics to be connected for
this plugin.
"""
return [oslo_messaging.Target(topic=topic, exchange=exchange)
for topic in self.get_notification_topics(conf)
for exchange in
conf.notification.notification_control_exchanges]

View File

@ -651,7 +651,7 @@ class ConfigManagerBase(object):
self.conf = conf
self.cfg_loc = None
def load_config(self, cfg_file, fallback_cfg_prefix='pipeline/data/'):
def load_config(self, cfg_file, fallback_cfg_prefix='data/'):
"""Load a configuration file and set its refresh values."""
if os.path.exists(cfg_file):
self.cfg_loc = cfg_file
@ -906,3 +906,103 @@ def get_pipeline_grouping_key(pipe):
for transformer in pipe.sink.transformers:
keys += transformer.grouping_keys
return list(set(keys))
class NotificationEndpoint(object):
"""Base Endpoint for plugins that support the notification API."""
def __init__(self, manager):
super(NotificationEndpoint, self).__init__()
# NOTE(gordc): this is filter rule used by oslo.messaging to dispatch
# messages to an endpoint.
if self.event_types:
self.filter_rule = oslo_messaging.NotificationFilter(
event_type='|'.join(self.event_types))
self.manager = manager
@staticmethod
def get_notification_topics(conf):
if 'notification_topics' in conf:
return conf.notification_topics
return conf.oslo_messaging_notifications.topics
def get_targets(self, conf):
"""Return a sequence of oslo_messaging.Target
This sequence is defining the exchange and topics to be connected for
this plugin.
"""
return [oslo_messaging.Target(topic=topic, exchange=exchange)
for topic in self.get_notification_topics(conf)
for exchange in
conf.notification.notification_control_exchanges]
@abc.abstractproperty
def event_types(self):
"""Return a sequence of strings to filter on.
Strings are defining the event types to be given to this plugin.
"""
@abc.abstractmethod
def process_notifications(self, priority, notifications):
"""Return a sequence of Counter instances for the given message.
:param message: Message to process.
"""
@staticmethod
def _consume_and_drop(notifications):
"""RPC endpoint for useless notification level"""
# NOTE(sileht): nothing special todo here, but because we listen
# for the generic notification exchange we have to consume all its
# queues
def audit(self, notifications):
"""endpoint for notification messages at audit level
:param notifications: list of notifications
"""
self._consume_and_drop(notifications)
def critical(self, notifications):
"""endpoint for notification messages at critical level
:param notifications: list of notifications
"""
self._consume_and_drop(notifications)
def debug(self, notifications):
"""endpoint for notification messages at debug level
:param notifications: list of notifications
"""
self._consume_and_drop(notifications)
def error(self, notifications):
"""endpoint for notification messages at error level
:param notifications: list of notifications
"""
self._consume_and_drop(notifications)
def info(self, notifications):
"""endpoint for notification messages at info level
:param notifications: list of notifications
"""
self._consume_and_drop(notifications)
def sample(self, notifications):
"""endpoint for notification messages at sample level
:param notifications: list of notifications
"""
self._consume_and_drop(notifications)
def warn(self, notifications):
"""endpoint for notification messages at warn level
:param notifications: list of notifications
"""
self._consume_and_drop(notifications)

View File

@ -17,38 +17,41 @@ import oslo_messaging
from stevedore import extension
from ceilometer.event import converter
from ceilometer import pipeline
LOG = log.getLogger(__name__)
class EventsNotificationEndpoint(object):
class EventEndpoint(pipeline.NotificationEndpoint):
event_types = []
def __init__(self, manager):
super(EventsNotificationEndpoint, self).__init__()
super(EventEndpoint, self).__init__(manager)
LOG.debug('Loading event definitions')
self.event_converter = converter.setup_events(
manager.conf,
extension.ExtensionManager(
namespace='ceilometer.event.trait_plugin'))
self.manager = manager
def info(self, notifications):
"""Convert message at info level to Ceilometer Event.
:param notifications: list of notifications
"""
return self.process_notification('info', notifications)
return self.process_notifications('info', notifications)
def error(self, notifications):
"""Convert message at error level to Ceilometer Event.
:param notifications: list of notifications
"""
return self.process_notification('error', notifications)
return self.process_notifications('error', notifications)
def process_notification(self, priority, notifications):
for notification in notifications:
def process_notifications(self, priority, notifications):
for message in notifications:
try:
event = self.event_converter.to_event(priority, notification)
event = self.event_converter.to_event(priority, message)
if event is not None:
with self.manager.publisher() as p:
p(event)

View File

@ -0,0 +1,46 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from ceilometer import pipeline
LOG = log.getLogger(__name__)
class SampleEndpoint(pipeline.NotificationEndpoint):
def info(self, notifications):
"""Convert message at info level to Ceilometer sample.
:param notifications: list of notifications
"""
return self.process_notifications('info', notifications)
def sample(self, notifications):
"""Convert message at sample level to Ceilometer Event.
:param notifications: list of notifications
"""
return self.process_notifications('sample', notifications)
def process_notifications(self, priority, notifications):
for message in notifications:
try:
with self.manager.publisher() as p:
p(list(self.process_notification(message)))
except Exception:
LOG.error('Fail to process notification', exc_info=True)
def process_notification(notification):
"""Build sample from provided notification."""
pass

View File

@ -10,31 +10,11 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import oslo_messaging
from ceilometer.agent import plugin_base
from ceilometer.pipeline import sample as endpoint
from ceilometer import sample
class TelemetryBase(plugin_base.NotificationBase):
"""Convert telemetry notification into Samples."""
def get_targets(self, conf):
"""Return a sequence of oslo_messaging.Target
Sequence defining the exchange and topics to be connected for this
plugin.
"""
return [
oslo_messaging.Target(
topic=topic,
exchange=conf.notification.notification_control_exchanges[0])
for topic in self.get_notification_topics(conf)
]
class TelemetryIpc(TelemetryBase):
class TelemetryIpc(endpoint.SampleEndpoint):
"""Handle sample from notification bus
Telemetry samples polled by polling agent.

View File

@ -21,8 +21,8 @@ from oslo_utils import fileutils
import six
import yaml
from ceilometer.event import endpoint as event_endpoint
from ceilometer import pipeline
from ceilometer.pipeline import event as event_endpoint
from ceilometer import publisher
from ceilometer.publisher import test
from ceilometer import service
@ -114,8 +114,7 @@ class TestEventEndpoint(tests_base.BaseTestCase):
def _setup_endpoint(self, publishers):
ev_pipeline_mgr = self._setup_pipeline(publishers)
self.endpoint = event_endpoint.EventsNotificationEndpoint(
ev_pipeline_mgr)
self.endpoint = event_endpoint.EventEndpoint(ev_pipeline_mgr)
self.endpoint.event_converter = mock.MagicMock()
self.endpoint.event_converter.to_event.return_value = mock.MagicMock(
@ -167,7 +166,7 @@ class TestEventEndpoint(tests_base.BaseTestCase):
'ctxt': {}
}
with mock.patch("ceilometer.pipeline.LOG") as mock_logger:
ret = self.endpoint.process_notification('info', [message])
ret = self.endpoint.process_notifications('info', [message])
self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)
exception_mock = mock_logger.error
self.assertIn('Exit after error from publisher',
@ -187,7 +186,7 @@ class TestEventEndpoint(tests_base.BaseTestCase):
'ctxt': {}
}
with mock.patch("ceilometer.pipeline.LOG") as mock_logger:
ret = self.endpoint.process_notification('info', [message])
ret = self.endpoint.process_notifications('info', [message])
self.assertEqual(oslo_messaging.NotificationResult.HANDLED, ret)
exception_mock = mock_logger.error
self.assertIn('Continue after error from publisher',

View File

@ -23,9 +23,9 @@ import six
from stevedore import extension
import yaml
from ceilometer.agent import plugin_base
from ceilometer import messaging
from ceilometer import notification
from ceilometer import pipeline
from ceilometer.publisher import test as test_publisher
from ceilometer import service
from ceilometer.tests import base as tests_base
@ -82,7 +82,7 @@ TEST_NOTICE_PAYLOAD = {
}
class _FakeNotificationPlugin(plugin_base.NotificationBase):
class _FakeNotificationPlugin(pipeline.NotificationEndpoint):
event_types = ['fake.event']
def get_targets(self, conf):
@ -117,7 +117,7 @@ class TestNotification(tests_base.BaseTestCase):
]
)
@mock.patch('ceilometer.event.endpoint.EventsNotificationEndpoint')
@mock.patch('ceilometer.pipeline.event.EventEndpoint')
def _do_process_notification_manager_start(self,
fake_event_endpoint_class):
with mock.patch.object(self.srv,

View File

@ -113,16 +113,16 @@ Notifications
suffice.
Notifications are defined as subclass of the
:class:`ceilometer.agent.plugin_base.NotificationBase` meta class.
:class:`ceilometer.notification.NotificationEndpoint` meta class.
Notifications must implement:
``event_types``
A sequence of strings defining the event types to be given to the plugin
``process_notification(self, message)``
``process_notifications(self, priority, message)``
Receives an event message from the list provided to ``event_types`` and
returns a sequence of ``Sample`` objects as defined in the
:file:`ceilometer/sample.py` file.
returns a sequence of objects. Using the SampleEndpoint, it should yield
``Sample`` objects as defined in the :file:`ceilometer/sample.py` file.
In the ``InstanceNotifications`` plugin, it listens to three events: