Merge "Add infra for versioned notifications"

This commit is contained in:
Jenkins 2016-01-21 23:43:19 +00:00 committed by Gerrit Code Review
commit c29da21272
6 changed files with 531 additions and 1 deletions

View File

@ -53,6 +53,7 @@ def register_all():
__import__('nova.objects.monitor_metric')
__import__('nova.objects.network')
__import__('nova.objects.network_request')
__import__('nova.objects.notification')
__import__('nova.objects.numa')
__import__('nova.objects.pci_device')
__import__('nova.objects.pci_device_pool')

View File

@ -483,6 +483,44 @@ class DiskFormat(Enum):
valid_values=DiskFormat.ALL)
class NotificationPriority(Enum):
AUDIT = 'audit'
CRITICAL = 'critical'
DEBUG = 'debug'
INFO = 'info'
ERROR = 'error'
SAMPLE = 'sample'
WARN = 'warn'
ALL = (AUDIT, CRITICAL, DEBUG, INFO, ERROR, SAMPLE, WARN)
def __init__(self):
super(NotificationPriority, self).__init__(
valid_values=NotificationPriority.ALL)
class NotificationPhase(Enum):
START = 'start'
END = 'end'
ERROR = 'error'
ALL = (START, END, ERROR)
def __init__(self):
super(NotificationPhase, self).__init__(
valid_values=NotificationPhase.ALL)
class NotificationAction(Enum):
UPDATE = 'update'
ALL = (UPDATE,)
def __init__(self):
super(NotificationAction, self).__init__(
valid_values=NotificationAction.ALL)
class IPAddress(FieldType):
@staticmethod
def coerce(obj, attr, value):
@ -737,6 +775,18 @@ class DiskFormatField(BaseEnumField):
AUTO_TYPE = DiskFormat()
class NotificationPriorityField(BaseEnumField):
AUTO_TYPE = NotificationPriority()
class NotificationPhaseField(BaseEnumField):
AUTO_TYPE = NotificationPhase()
class NotificationActionField(BaseEnumField):
AUTO_TYPE = NotificationAction()
class IPAddressField(AutoTypedField):
AUTO_TYPE = IPAddress()

View File

@ -0,0 +1,137 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.objects import base
from nova.objects import fields
from nova import rpc
@base.NovaObjectRegistry.register
class EventType(base.NovaObject):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'object': fields.StringField(nullable=False),
'action': fields.NotificationActionField(nullable=False),
'phase': fields.NotificationPhaseField(nullable=True),
}
def to_notification_event_type_field(self):
"""Serialize the object to the wire format."""
s = '%s.%s' % (self.object, self.action)
if self.obj_attr_is_set('phase'):
s += '.%s' % self.phase
return s
# Note(gibi): It is explicitly not registered as this class shall not be used
# directly, it is just a base class for notification payloads.
@base.NovaObjectRegistry.register_if(False)
class NotificationPayloadBase(base.NovaObject):
"""Base class for the payload of versioned notifications."""
# SCHEMA defines how to populate the payload fields. It is a dictionary
# where every key value pair has the following format:
# <payload_field_name>: (<data_source_name>,
# <field_of_the_data_source>)
# The <payload_field_name> is the name where the data will be stored in the
# payload object, this field has to be defined as a field of the payload.
# The <data_source_name> shall refer to name of the parameter passed as
# kwarg to the payload's populate_schema() call and this object will be
# used as the source of the data. The <field_of_the_data_source> shall be
# a valid field of the passed argument.
# The SCHEMA needs to be applied with the populate_schema() call before the
# notification can be emitted.
# The value of the payload.<payload_field_name> field will be set by the
# <data_source_name>.<field_of_the_data_source> field. The
# <data_source_name> will not be part of the payload object internal or
# external representation.
# Payload fields that are not set by the SCHEMA can be filled in the same
# way as in any versioned object.
SCHEMA = {}
# Version 1.0: Initial version
VERSION = '1.0'
def __init__(self, *args, **kwargs):
super(NotificationPayloadBase, self).__init__(*args, **kwargs)
self.populated = not self.SCHEMA
def populate_schema(self, **kwargs):
"""Populate the object based on the SCHEMA and the source objects
:param kwargs: A dict contains the source object at the key defined in
the SCHEMA
"""
for key, (obj, field) in self.SCHEMA.items():
source = kwargs[obj]
if source.obj_attr_is_set(field):
setattr(self, key, getattr(source, field))
self.populated = True
@base.NovaObjectRegistry.register
class NotificationPublisher(base.NovaObject):
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'host': fields.StringField(nullable=False),
'binary': fields.StringField(nullable=False),
}
@classmethod
def from_service_obj(cls, service):
return cls(host=service.host, binary=service.binary)
# Note(gibi): It is explicitly not registered as this class shall not be used
# directly, it is just a base class for notification.
@base.NovaObjectRegistry.register_if(False)
class NotificationBase(base.NovaObject):
"""Base class for versioned notifications.
Every subclass shall define a 'payload' field.
"""
# Version 1.0: Initial version
VERSION = '1.0'
fields = {
'priority': fields.NotificationPriorityField(),
'event_type': fields.ObjectField('EventType'),
'publisher': fields.ObjectField('NotificationPublisher'),
}
def _emit(self, context, event_type, publisher_id, payload):
assert rpc.NOTIFIER is not None
notifier = rpc.NOTIFIER.prepare(publisher_id=publisher_id)
notify = getattr(notifier, self.priority)
notify(context, event_type=event_type, payload=payload)
def emit(self, context):
"""Send the notification."""
assert self.payload.populated
# Note(gibi): notification payload will be a newly populated object
# therefore every field of it will look changed so this does not carry
# any extra information so we drop this from the payload.
self.payload.obj_reset_changes(recursive=False)
self._emit(context,
event_type=
self.event_type.to_notification_event_type_field(),
publisher_id='%s:%s' %
(self.publisher.binary,
self.publisher.host),
payload=self.payload.obj_to_primitive())

View File

@ -941,3 +941,58 @@ class TestIPV6Network(TestField):
for x in good]
self.from_primitive_values = [(x, netaddr.IPNetwork(x))
for x in good]
class TestNotificationPriority(TestField):
def setUp(self):
super(TestNotificationPriority, self).setUp()
self.field = fields.NotificationPriorityField()
self.coerce_good_values = [('audit', 'audit'),
('critical', 'critical'),
('debug', 'debug'),
('error', 'error'),
('sample', 'sample'),
('warn', 'warn')]
self.coerce_bad_values = ['warning']
self.to_primitive_values = self.coerce_good_values[0:1]
self.from_primitive_values = self.coerce_good_values[0:1]
def test_stringify(self):
self.assertEqual("'warn'", self.field.stringify('warn'))
def test_stringify_invalid(self):
self.assertRaises(ValueError, self.field.stringify, 'warning')
class TestNotificationPhase(TestField):
def setUp(self):
super(TestNotificationPhase, self).setUp()
self.field = fields.NotificationPhaseField()
self.coerce_good_values = [('start', 'start'),
('end', 'end'),
('error', 'error')]
self.coerce_bad_values = ['begin']
self.to_primitive_values = self.coerce_good_values[0:1]
self.from_primitive_values = self.coerce_good_values[0:1]
def test_stringify(self):
self.assertEqual("'error'", self.field.stringify('error'))
def test_stringify_invalid(self):
self.assertRaises(ValueError, self.field.stringify, 'begin')
class TestNotificationAction(TestField):
def setUp(self):
super(TestNotificationAction, self).setUp()
self.field = fields.NotificationActionField()
self.coerce_good_values = [('update', 'update')]
self.coerce_bad_values = ['magic']
self.to_primitive_values = self.coerce_good_values[0:1]
self.from_primitive_values = self.coerce_good_values[0:1]
def test_stringify(self):
self.assertEqual("'update'", self.field.stringify('update'))
def test_stringify_invalid(self):
self.assertRaises(ValueError, self.field.stringify, 'magic')

View File

@ -0,0 +1,240 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_utils import timeutils
from nova import objects
from nova.objects import base
from nova.objects import fields
from nova.objects import notification
from nova import test
class TestNotificationBase(test.NoDBTestCase):
@base.NovaObjectRegistry.register_if(False)
class TestObject(base.NovaObject):
VERSION = '1.0'
fields = {
'field_1': fields.StringField(),
'field_2': fields.IntegerField(),
'not_important_field': fields.IntegerField(),
}
@base.NovaObjectRegistry.register_if(False)
class TestNotificationPayload(notification.NotificationPayloadBase):
VERSION = '1.0'
SCHEMA = {
'field_1': ('source_field', 'field_1'),
'field_2': ('source_field', 'field_2'),
}
fields = {
'extra_field': fields.StringField(), # filled by ctor
'field_1': fields.StringField(), # filled by the schema
'field_2': fields.IntegerField(), # filled by the schema
}
def populate_schema(self, source_field):
super(TestNotificationBase.TestNotificationPayload,
self).populate_schema(source_field=source_field)
@base.NovaObjectRegistry.register_if(False)
class TestNotificationPayloadEmptySchema(
notification.NotificationPayloadBase):
VERSION = '1.0'
fields = {
'extra_field': fields.StringField(), # filled by ctor
}
@base.NovaObjectRegistry.register_if(False)
class TestNotification(notification.NotificationBase):
VERSION = '1.0'
fields = {
'payload': fields.ObjectField('TestNotificationPayload')
}
@base.NovaObjectRegistry.register_if(False)
class TestNotificationEmptySchema(notification.NotificationBase):
VERSION = '1.0'
fields = {
'payload': fields.ObjectField('TestNotificationPayloadEmptySchema')
}
fake_service = {
'created_at': timeutils.utcnow().replace(microsecond=0),
'updated_at': None,
'deleted_at': None,
'deleted': False,
'id': 123,
'host': 'fake-host',
'binary': 'nova-fake',
'topic': 'fake-service-topic',
'report_count': 1,
'forced_down': False,
'disabled': False,
'disabled_reason': None,
'last_seen_up': None,
'version': 1}
expected_payload = {
'nova_object.name': 'TestNotificationPayload',
'nova_object.data': {
'extra_field': 'test string',
'field_1': 'test1',
'field_2': 42},
'nova_object.version': '1.0',
'nova_object.namespace': 'nova'}
def setUp(self):
super(TestNotificationBase, self).setUp()
with mock.patch('nova.db.service_update') as mock_db_service_update:
self.service_obj = objects.Service(context=mock.sentinel.context,
id=self.fake_service['id'])
self.service_obj.obj_reset_changes(['version'])
mock_db_service_update.return_value = self.fake_service
self.service_obj.save()
self.my_obj = self.TestObject(field_1='test1',
field_2=42,
not_important_field=13)
self.payload = self.TestNotificationPayload(
extra_field='test string')
self.payload.populate_schema(source_field=self.my_obj)
self.notification = self.TestNotification(
event_type=notification.EventType(
object='test_object',
action=fields.NotificationAction.UPDATE,
phase=fields.NotificationPhase.START),
publisher=notification.NotificationPublisher.from_service_obj(
self.service_obj),
priority=fields.NotificationPriority.INFO,
payload=self.payload)
def _verify_notification(self, mock_notifier, mock_context,
expected_event_type,
expected_payload):
mock_notifier.prepare.assert_called_once_with(
publisher_id='nova-fake:fake-host')
mock_notify = mock_notifier.prepare.return_value.info
self.assertTrue(mock_notify.called)
self.assertEqual(mock_notify.call_args[0][0], mock_context)
self.assertEqual(mock_notify.call_args[1]['event_type'],
expected_event_type)
actual_payload = mock_notify.call_args[1]['payload']
self.assertJsonEqual(expected_payload, actual_payload)
@mock.patch('nova.rpc.NOTIFIER')
def test_emit_notification(self, mock_notifier):
mock_context = mock.Mock()
mock_context.to_dict.return_value = {}
self.notification.emit(mock_context)
self._verify_notification(
mock_notifier,
mock_context,
expected_event_type='test_object.update.start',
expected_payload=self.expected_payload)
@mock.patch('nova.rpc.NOTIFIER')
def test_emit_with_host_and_binary_as_publisher(self, mock_notifier):
noti = self.TestNotification(
event_type=notification.EventType(
object='test_object',
action=fields.NotificationAction.UPDATE),
publisher=notification.NotificationPublisher(host='fake-host',
binary='nova-fake'),
priority=fields.NotificationPriority.INFO,
payload=self.payload)
mock_context = mock.Mock()
mock_context.to_dict.return_value = {}
noti.emit(mock_context)
self._verify_notification(
mock_notifier,
mock_context,
expected_event_type='test_object.update',
expected_payload=self.expected_payload)
@mock.patch('nova.rpc.NOTIFIER')
def test_emit_event_type_without_phase(self, mock_notifier):
noti = self.TestNotification(
event_type=notification.EventType(
object='test_object',
action=fields.NotificationAction.UPDATE),
publisher=notification.NotificationPublisher.from_service_obj(
self.service_obj),
priority=fields.NotificationPriority.INFO,
payload=self.payload)
mock_context = mock.Mock()
mock_context.to_dict.return_value = {}
noti.emit(mock_context)
self._verify_notification(
mock_notifier,
mock_context,
expected_event_type='test_object.update',
expected_payload=self.expected_payload)
@mock.patch('nova.rpc.NOTIFIER')
def test_not_possible_to_emit_if_not_populated(self, mock_notifier):
non_populated_payload = self.TestNotificationPayload(
extra_field='test string')
noti = self.TestNotification(
event_type=notification.EventType(
object='test_object',
action=fields.NotificationAction.UPDATE),
publisher=notification.NotificationPublisher.from_service_obj(
self.service_obj),
priority=fields.NotificationPriority.INFO,
payload=non_populated_payload)
mock_context = mock.Mock()
self.assertRaises(AssertionError, noti.emit, mock_context)
self.assertFalse(mock_notifier.called)
@mock.patch('nova.rpc.NOTIFIER')
def test_empty_schema(self, mock_notifier):
non_populated_payload = self.TestNotificationPayloadEmptySchema(
extra_field='test string')
noti = self.TestNotificationEmptySchema(
event_type=notification.EventType(
object='test_object',
action=fields.NotificationAction.UPDATE),
publisher=notification.NotificationPublisher.from_service_obj(
self.service_obj),
priority=fields.NotificationPriority.INFO,
payload=non_populated_payload)
mock_context = mock.Mock()
mock_context.to_dict.return_value = {}
noti.emit(mock_context)
self._verify_notification(
mock_notifier,
mock_context,
expected_event_type='test_object.update',
expected_payload=
{'nova_object.name': 'TestNotificationPayloadEmptySchema',
'nova_object.data': {'extra_field': u'test string'},
'nova_object.version': '1.0',
'nova_object.namespace': 'nova'})

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import copy
import datetime
@ -35,6 +36,7 @@ from nova import exception
from nova import objects
from nova.objects import base
from nova.objects import fields
from nova.objects import notification
from nova import test
from nova.tests import fixtures as nova_fixtures
from nova.tests.unit import fake_notifier
@ -1124,6 +1126,7 @@ object_data = {
'EC2InstanceMapping': '1.0-a4556eb5c5e94c045fe84f49cf71644f',
'EC2SnapshotMapping': '1.0-47e7ddabe1af966dce0cfd0ed6cd7cd1',
'EC2VolumeMapping': '1.0-5b713751d6f97bad620f3378a521020d',
'EventType': '1.0-21dc35de314fc5fc0a7965211c0c00f7',
'FixedIP': '1.14-53e1c10b539f1a82fe83b1af4720efae',
'FixedIPList': '1.14-87a39361c8f08f059004d6b15103cdfd',
'Flavor': '1.1-b6bb7a730a79d720344accefafacf7ee',
@ -1161,6 +1164,7 @@ object_data = {
'MigrationList': '1.2-02c0ec0c50b75ca86a2a74c5e8c911cc',
'MonitorMetric': '1.1-53b1db7c4ae2c531db79761e7acc52ba',
'MonitorMetricList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e',
'NotificationPublisher': '1.0-bbbc1402fb0e443a3eb227cc52b61545',
'NUMACell': '1.2-74fc993ac5c83005e76e34e8487f1c05',
'NUMAPagesTopology': '1.0-c71d86317283266dc8364c149155e48e',
'NUMATopology': '1.2-c63fad38be73b6afd04715c9c1b29220',
@ -1201,7 +1205,7 @@ object_data = {
class TestObjectVersions(test.NoDBTestCase):
def test_versions(self):
checker = fixture.ObjectVersionChecker(
checker = NotificationAwareObjectVersionChecker(
base.NovaObjectRegistry.obj_classes())
fingerprints = checker.get_hashes()
@ -1217,6 +1221,32 @@ class TestObjectVersions(test.NoDBTestCase):
'versions have been bumped, and then update their '
'hashes here.')
def test_notification_payload_version_depends_on_the_schema(self):
@base.NovaObjectRegistry.register_if(False)
class TestNotificationPayload(notification.NotificationPayloadBase):
VERSION = '1.0'
SCHEMA = {
'field_1': ('source_field', 'field_1'),
'field_2': ('source_field', 'field_2'),
}
fields = {
'extra_field': fields.StringField(), # filled by ctor
'field_1': fields.StringField(), # filled by the schema
'field_2': fields.IntegerField(), # filled by the schema
}
checker = NotificationAwareObjectVersionChecker(
{'TestNotificationPayload': (TestNotificationPayload,)})
old_hash = checker._get_fingerprint('TestNotificationPayload')
TestNotificationPayload.SCHEMA['field_3'] = ('source_field',
'field_3')
new_hash = checker._get_fingerprint('TestNotificationPayload')
self.assertNotEqual(old_hash, new_hash)
def test_obj_make_compatible(self):
# Iterate all object classes and verify that we can run
# obj_make_compatible with every older version than current.
@ -1332,3 +1362,20 @@ class TestObjMethodOverrides(test.NoDBTestCase):
obj_class = obj_classes[obj_name][0]
self.assertEqual(args,
inspect.getargspec(obj_class.obj_reset_changes))
class NotificationAwareObjectVersionChecker(fixture.ObjectVersionChecker):
def _get_fingerprint(self, obj_name, extra_data_func=None):
obj_class = copy.deepcopy(self.obj_classes[obj_name][0])
if issubclass(obj_class, notification.NotificationPayloadBase):
# Note(gibi): to make NotificationPayload version dependent on the
# SCHEMA of the payload we inject the SCHEMA content to the hash
# calculation algorithm
extra_relevant_data = collections.OrderedDict(
sorted(obj_class.SCHEMA.items()))
fields = getattr(obj_class, 'fields', {})
fields['_schema_'] = extra_relevant_data
setattr(obj_class, 'fields', fields)
return super(NotificationAwareObjectVersionChecker,
self)._get_fingerprint(obj_name)