diff --git a/nova/objects/__init__.py b/nova/objects/__init__.py index cd6491684104..5a835f6d49f7 100644 --- a/nova/objects/__init__.py +++ b/nova/objects/__init__.py @@ -53,6 +53,7 @@ def register_all(): __import__('nova.objects.monitor_metric') __import__('nova.objects.network') __import__('nova.objects.network_request') + __import__('nova.objects.notification') __import__('nova.objects.numa') __import__('nova.objects.pci_device') __import__('nova.objects.pci_device_pool') diff --git a/nova/objects/fields.py b/nova/objects/fields.py index 0b6effabd58e..dd7b75fd8f01 100644 --- a/nova/objects/fields.py +++ b/nova/objects/fields.py @@ -483,6 +483,44 @@ class DiskFormat(Enum): valid_values=DiskFormat.ALL) +class NotificationPriority(Enum): + AUDIT = 'audit' + CRITICAL = 'critical' + DEBUG = 'debug' + INFO = 'info' + ERROR = 'error' + SAMPLE = 'sample' + WARN = 'warn' + + ALL = (AUDIT, CRITICAL, DEBUG, INFO, ERROR, SAMPLE, WARN) + + def __init__(self): + super(NotificationPriority, self).__init__( + valid_values=NotificationPriority.ALL) + + +class NotificationPhase(Enum): + START = 'start' + END = 'end' + ERROR = 'error' + + ALL = (START, END, ERROR) + + def __init__(self): + super(NotificationPhase, self).__init__( + valid_values=NotificationPhase.ALL) + + +class NotificationAction(Enum): + UPDATE = 'update' + + ALL = (UPDATE,) + + def __init__(self): + super(NotificationAction, self).__init__( + valid_values=NotificationAction.ALL) + + class IPAddress(FieldType): @staticmethod def coerce(obj, attr, value): @@ -737,6 +775,18 @@ class DiskFormatField(BaseEnumField): AUTO_TYPE = DiskFormat() +class NotificationPriorityField(BaseEnumField): + AUTO_TYPE = NotificationPriority() + + +class NotificationPhaseField(BaseEnumField): + AUTO_TYPE = NotificationPhase() + + +class NotificationActionField(BaseEnumField): + AUTO_TYPE = NotificationAction() + + class IPAddressField(AutoTypedField): AUTO_TYPE = IPAddress() diff --git a/nova/objects/notification.py b/nova/objects/notification.py new file mode 100644 index 000000000000..d41219ee4aba --- /dev/null +++ b/nova/objects/notification.py @@ -0,0 +1,137 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from nova.objects import base +from nova.objects import fields +from nova import rpc + + +@base.NovaObjectRegistry.register +class EventType(base.NovaObject): + # Version 1.0: Initial version + VERSION = '1.0' + + fields = { + 'object': fields.StringField(nullable=False), + 'action': fields.NotificationActionField(nullable=False), + 'phase': fields.NotificationPhaseField(nullable=True), + } + + def to_notification_event_type_field(self): + """Serialize the object to the wire format.""" + s = '%s.%s' % (self.object, self.action) + if self.obj_attr_is_set('phase'): + s += '.%s' % self.phase + return s + + +# Note(gibi): It is explicitly not registered as this class shall not be used +# directly, it is just a base class for notification payloads. +@base.NovaObjectRegistry.register_if(False) +class NotificationPayloadBase(base.NovaObject): + """Base class for the payload of versioned notifications.""" + # SCHEMA defines how to populate the payload fields. It is a dictionary + # where every key value pair has the following format: + # : (, + # ) + # The is the name where the data will be stored in the + # payload object, this field has to be defined as a field of the payload. + # The shall refer to name of the parameter passed as + # kwarg to the payload's populate_schema() call and this object will be + # used as the source of the data. The shall be + # a valid field of the passed argument. + # The SCHEMA needs to be applied with the populate_schema() call before the + # notification can be emitted. + # The value of the payload. field will be set by the + # . field. The + # will not be part of the payload object internal or + # external representation. + # Payload fields that are not set by the SCHEMA can be filled in the same + # way as in any versioned object. + SCHEMA = {} + # Version 1.0: Initial version + VERSION = '1.0' + + def __init__(self, *args, **kwargs): + super(NotificationPayloadBase, self).__init__(*args, **kwargs) + self.populated = not self.SCHEMA + + def populate_schema(self, **kwargs): + """Populate the object based on the SCHEMA and the source objects + + :param kwargs: A dict contains the source object at the key defined in + the SCHEMA + """ + for key, (obj, field) in self.SCHEMA.items(): + source = kwargs[obj] + if source.obj_attr_is_set(field): + setattr(self, key, getattr(source, field)) + self.populated = True + + +@base.NovaObjectRegistry.register +class NotificationPublisher(base.NovaObject): + # Version 1.0: Initial version + VERSION = '1.0' + + fields = { + 'host': fields.StringField(nullable=False), + 'binary': fields.StringField(nullable=False), + } + + @classmethod + def from_service_obj(cls, service): + return cls(host=service.host, binary=service.binary) + + +# Note(gibi): It is explicitly not registered as this class shall not be used +# directly, it is just a base class for notification. +@base.NovaObjectRegistry.register_if(False) +class NotificationBase(base.NovaObject): + """Base class for versioned notifications. + + Every subclass shall define a 'payload' field. + """ + # Version 1.0: Initial version + VERSION = '1.0' + + fields = { + 'priority': fields.NotificationPriorityField(), + 'event_type': fields.ObjectField('EventType'), + 'publisher': fields.ObjectField('NotificationPublisher'), + } + + def _emit(self, context, event_type, publisher_id, payload): + assert rpc.NOTIFIER is not None + notifier = rpc.NOTIFIER.prepare(publisher_id=publisher_id) + notify = getattr(notifier, self.priority) + notify(context, event_type=event_type, payload=payload) + + def emit(self, context): + """Send the notification.""" + assert self.payload.populated + + # Note(gibi): notification payload will be a newly populated object + # therefore every field of it will look changed so this does not carry + # any extra information so we drop this from the payload. + self.payload.obj_reset_changes(recursive=False) + + self._emit(context, + event_type= + self.event_type.to_notification_event_type_field(), + publisher_id='%s:%s' % + (self.publisher.binary, + self.publisher.host), + payload=self.payload.obj_to_primitive()) diff --git a/nova/tests/unit/objects/test_fields.py b/nova/tests/unit/objects/test_fields.py index f17ebce5adcf..3af8a7b10d1a 100644 --- a/nova/tests/unit/objects/test_fields.py +++ b/nova/tests/unit/objects/test_fields.py @@ -941,3 +941,58 @@ class TestIPV6Network(TestField): for x in good] self.from_primitive_values = [(x, netaddr.IPNetwork(x)) for x in good] + + +class TestNotificationPriority(TestField): + def setUp(self): + super(TestNotificationPriority, self).setUp() + self.field = fields.NotificationPriorityField() + self.coerce_good_values = [('audit', 'audit'), + ('critical', 'critical'), + ('debug', 'debug'), + ('error', 'error'), + ('sample', 'sample'), + ('warn', 'warn')] + self.coerce_bad_values = ['warning'] + self.to_primitive_values = self.coerce_good_values[0:1] + self.from_primitive_values = self.coerce_good_values[0:1] + + def test_stringify(self): + self.assertEqual("'warn'", self.field.stringify('warn')) + + def test_stringify_invalid(self): + self.assertRaises(ValueError, self.field.stringify, 'warning') + + +class TestNotificationPhase(TestField): + def setUp(self): + super(TestNotificationPhase, self).setUp() + self.field = fields.NotificationPhaseField() + self.coerce_good_values = [('start', 'start'), + ('end', 'end'), + ('error', 'error')] + self.coerce_bad_values = ['begin'] + self.to_primitive_values = self.coerce_good_values[0:1] + self.from_primitive_values = self.coerce_good_values[0:1] + + def test_stringify(self): + self.assertEqual("'error'", self.field.stringify('error')) + + def test_stringify_invalid(self): + self.assertRaises(ValueError, self.field.stringify, 'begin') + + +class TestNotificationAction(TestField): + def setUp(self): + super(TestNotificationAction, self).setUp() + self.field = fields.NotificationActionField() + self.coerce_good_values = [('update', 'update')] + self.coerce_bad_values = ['magic'] + self.to_primitive_values = self.coerce_good_values[0:1] + self.from_primitive_values = self.coerce_good_values[0:1] + + def test_stringify(self): + self.assertEqual("'update'", self.field.stringify('update')) + + def test_stringify_invalid(self): + self.assertRaises(ValueError, self.field.stringify, 'magic') diff --git a/nova/tests/unit/objects/test_notification.py b/nova/tests/unit/objects/test_notification.py new file mode 100644 index 000000000000..a885708f3736 --- /dev/null +++ b/nova/tests/unit/objects/test_notification.py @@ -0,0 +1,240 @@ +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from oslo_utils import timeutils + +from nova import objects +from nova.objects import base +from nova.objects import fields +from nova.objects import notification +from nova import test + + +class TestNotificationBase(test.NoDBTestCase): + + @base.NovaObjectRegistry.register_if(False) + class TestObject(base.NovaObject): + VERSION = '1.0' + fields = { + 'field_1': fields.StringField(), + 'field_2': fields.IntegerField(), + 'not_important_field': fields.IntegerField(), + } + + @base.NovaObjectRegistry.register_if(False) + class TestNotificationPayload(notification.NotificationPayloadBase): + VERSION = '1.0' + + SCHEMA = { + 'field_1': ('source_field', 'field_1'), + 'field_2': ('source_field', 'field_2'), + } + + fields = { + 'extra_field': fields.StringField(), # filled by ctor + 'field_1': fields.StringField(), # filled by the schema + 'field_2': fields.IntegerField(), # filled by the schema + } + + def populate_schema(self, source_field): + super(TestNotificationBase.TestNotificationPayload, + self).populate_schema(source_field=source_field) + + @base.NovaObjectRegistry.register_if(False) + class TestNotificationPayloadEmptySchema( + notification.NotificationPayloadBase): + VERSION = '1.0' + + fields = { + 'extra_field': fields.StringField(), # filled by ctor + } + + @base.NovaObjectRegistry.register_if(False) + class TestNotification(notification.NotificationBase): + VERSION = '1.0' + fields = { + 'payload': fields.ObjectField('TestNotificationPayload') + } + + @base.NovaObjectRegistry.register_if(False) + class TestNotificationEmptySchema(notification.NotificationBase): + VERSION = '1.0' + fields = { + 'payload': fields.ObjectField('TestNotificationPayloadEmptySchema') + } + + fake_service = { + 'created_at': timeutils.utcnow().replace(microsecond=0), + 'updated_at': None, + 'deleted_at': None, + 'deleted': False, + 'id': 123, + 'host': 'fake-host', + 'binary': 'nova-fake', + 'topic': 'fake-service-topic', + 'report_count': 1, + 'forced_down': False, + 'disabled': False, + 'disabled_reason': None, + 'last_seen_up': None, + 'version': 1} + + expected_payload = { + 'nova_object.name': 'TestNotificationPayload', + 'nova_object.data': { + 'extra_field': 'test string', + 'field_1': 'test1', + 'field_2': 42}, + 'nova_object.version': '1.0', + 'nova_object.namespace': 'nova'} + + def setUp(self): + super(TestNotificationBase, self).setUp() + with mock.patch('nova.db.service_update') as mock_db_service_update: + self.service_obj = objects.Service(context=mock.sentinel.context, + id=self.fake_service['id']) + self.service_obj.obj_reset_changes(['version']) + mock_db_service_update.return_value = self.fake_service + self.service_obj.save() + + self.my_obj = self.TestObject(field_1='test1', + field_2=42, + not_important_field=13) + + self.payload = self.TestNotificationPayload( + extra_field='test string') + self.payload.populate_schema(source_field=self.my_obj) + + self.notification = self.TestNotification( + event_type=notification.EventType( + object='test_object', + action=fields.NotificationAction.UPDATE, + phase=fields.NotificationPhase.START), + publisher=notification.NotificationPublisher.from_service_obj( + self.service_obj), + priority=fields.NotificationPriority.INFO, + payload=self.payload) + + def _verify_notification(self, mock_notifier, mock_context, + expected_event_type, + expected_payload): + mock_notifier.prepare.assert_called_once_with( + publisher_id='nova-fake:fake-host') + mock_notify = mock_notifier.prepare.return_value.info + self.assertTrue(mock_notify.called) + self.assertEqual(mock_notify.call_args[0][0], mock_context) + self.assertEqual(mock_notify.call_args[1]['event_type'], + expected_event_type) + actual_payload = mock_notify.call_args[1]['payload'] + self.assertJsonEqual(expected_payload, actual_payload) + + @mock.patch('nova.rpc.NOTIFIER') + def test_emit_notification(self, mock_notifier): + + mock_context = mock.Mock() + mock_context.to_dict.return_value = {} + self.notification.emit(mock_context) + + self._verify_notification( + mock_notifier, + mock_context, + expected_event_type='test_object.update.start', + expected_payload=self.expected_payload) + + @mock.patch('nova.rpc.NOTIFIER') + def test_emit_with_host_and_binary_as_publisher(self, mock_notifier): + noti = self.TestNotification( + event_type=notification.EventType( + object='test_object', + action=fields.NotificationAction.UPDATE), + publisher=notification.NotificationPublisher(host='fake-host', + binary='nova-fake'), + priority=fields.NotificationPriority.INFO, + payload=self.payload) + + mock_context = mock.Mock() + mock_context.to_dict.return_value = {} + noti.emit(mock_context) + + self._verify_notification( + mock_notifier, + mock_context, + expected_event_type='test_object.update', + expected_payload=self.expected_payload) + + @mock.patch('nova.rpc.NOTIFIER') + def test_emit_event_type_without_phase(self, mock_notifier): + noti = self.TestNotification( + event_type=notification.EventType( + object='test_object', + action=fields.NotificationAction.UPDATE), + publisher=notification.NotificationPublisher.from_service_obj( + self.service_obj), + priority=fields.NotificationPriority.INFO, + payload=self.payload) + + mock_context = mock.Mock() + mock_context.to_dict.return_value = {} + noti.emit(mock_context) + + self._verify_notification( + mock_notifier, + mock_context, + expected_event_type='test_object.update', + expected_payload=self.expected_payload) + + @mock.patch('nova.rpc.NOTIFIER') + def test_not_possible_to_emit_if_not_populated(self, mock_notifier): + non_populated_payload = self.TestNotificationPayload( + extra_field='test string') + noti = self.TestNotification( + event_type=notification.EventType( + object='test_object', + action=fields.NotificationAction.UPDATE), + publisher=notification.NotificationPublisher.from_service_obj( + self.service_obj), + priority=fields.NotificationPriority.INFO, + payload=non_populated_payload) + + mock_context = mock.Mock() + self.assertRaises(AssertionError, noti.emit, mock_context) + self.assertFalse(mock_notifier.called) + + @mock.patch('nova.rpc.NOTIFIER') + def test_empty_schema(self, mock_notifier): + non_populated_payload = self.TestNotificationPayloadEmptySchema( + extra_field='test string') + noti = self.TestNotificationEmptySchema( + event_type=notification.EventType( + object='test_object', + action=fields.NotificationAction.UPDATE), + publisher=notification.NotificationPublisher.from_service_obj( + self.service_obj), + priority=fields.NotificationPriority.INFO, + payload=non_populated_payload) + + mock_context = mock.Mock() + mock_context.to_dict.return_value = {} + noti.emit(mock_context) + + self._verify_notification( + mock_notifier, + mock_context, + expected_event_type='test_object.update', + expected_payload= + {'nova_object.name': 'TestNotificationPayloadEmptySchema', + 'nova_object.data': {'extra_field': u'test string'}, + 'nova_object.version': '1.0', + 'nova_object.namespace': 'nova'}) diff --git a/nova/tests/unit/objects/test_objects.py b/nova/tests/unit/objects/test_objects.py index 3e93abaf82fd..e74e92ec5cf0 100644 --- a/nova/tests/unit/objects/test_objects.py +++ b/nova/tests/unit/objects/test_objects.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import contextlib import copy import datetime @@ -35,6 +36,7 @@ from nova import exception from nova import objects from nova.objects import base from nova.objects import fields +from nova.objects import notification from nova import test from nova.tests import fixtures as nova_fixtures from nova.tests.unit import fake_notifier @@ -1124,6 +1126,7 @@ object_data = { 'EC2InstanceMapping': '1.0-a4556eb5c5e94c045fe84f49cf71644f', 'EC2SnapshotMapping': '1.0-47e7ddabe1af966dce0cfd0ed6cd7cd1', 'EC2VolumeMapping': '1.0-5b713751d6f97bad620f3378a521020d', + 'EventType': '1.0-21dc35de314fc5fc0a7965211c0c00f7', 'FixedIP': '1.14-53e1c10b539f1a82fe83b1af4720efae', 'FixedIPList': '1.14-87a39361c8f08f059004d6b15103cdfd', 'Flavor': '1.1-b6bb7a730a79d720344accefafacf7ee', @@ -1161,6 +1164,7 @@ object_data = { 'MigrationList': '1.2-02c0ec0c50b75ca86a2a74c5e8c911cc', 'MonitorMetric': '1.1-53b1db7c4ae2c531db79761e7acc52ba', 'MonitorMetricList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e', + 'NotificationPublisher': '1.0-bbbc1402fb0e443a3eb227cc52b61545', 'NUMACell': '1.2-74fc993ac5c83005e76e34e8487f1c05', 'NUMAPagesTopology': '1.0-c71d86317283266dc8364c149155e48e', 'NUMATopology': '1.2-c63fad38be73b6afd04715c9c1b29220', @@ -1201,7 +1205,7 @@ object_data = { class TestObjectVersions(test.NoDBTestCase): def test_versions(self): - checker = fixture.ObjectVersionChecker( + checker = NotificationAwareObjectVersionChecker( base.NovaObjectRegistry.obj_classes()) fingerprints = checker.get_hashes() @@ -1217,6 +1221,32 @@ class TestObjectVersions(test.NoDBTestCase): 'versions have been bumped, and then update their ' 'hashes here.') + def test_notification_payload_version_depends_on_the_schema(self): + @base.NovaObjectRegistry.register_if(False) + class TestNotificationPayload(notification.NotificationPayloadBase): + VERSION = '1.0' + + SCHEMA = { + 'field_1': ('source_field', 'field_1'), + 'field_2': ('source_field', 'field_2'), + } + + fields = { + 'extra_field': fields.StringField(), # filled by ctor + 'field_1': fields.StringField(), # filled by the schema + 'field_2': fields.IntegerField(), # filled by the schema + } + + checker = NotificationAwareObjectVersionChecker( + {'TestNotificationPayload': (TestNotificationPayload,)}) + + old_hash = checker._get_fingerprint('TestNotificationPayload') + TestNotificationPayload.SCHEMA['field_3'] = ('source_field', + 'field_3') + new_hash = checker._get_fingerprint('TestNotificationPayload') + + self.assertNotEqual(old_hash, new_hash) + def test_obj_make_compatible(self): # Iterate all object classes and verify that we can run # obj_make_compatible with every older version than current. @@ -1332,3 +1362,20 @@ class TestObjMethodOverrides(test.NoDBTestCase): obj_class = obj_classes[obj_name][0] self.assertEqual(args, inspect.getargspec(obj_class.obj_reset_changes)) + + +class NotificationAwareObjectVersionChecker(fixture.ObjectVersionChecker): + def _get_fingerprint(self, obj_name, extra_data_func=None): + obj_class = copy.deepcopy(self.obj_classes[obj_name][0]) + if issubclass(obj_class, notification.NotificationPayloadBase): + # Note(gibi): to make NotificationPayload version dependent on the + # SCHEMA of the payload we inject the SCHEMA content to the hash + # calculation algorithm + extra_relevant_data = collections.OrderedDict( + sorted(obj_class.SCHEMA.items())) + fields = getattr(obj_class, 'fields', {}) + fields['_schema_'] = extra_relevant_data + setattr(obj_class, 'fields', fields) + + return super(NotificationAwareObjectVersionChecker, + self)._get_fingerprint(obj_name)