Add Aggregate image caching progress notifications
This adds AggregateCacheNotification, related payload, and code in conductor to emit this per-compute with progress information. This also adds a "progress" phase to NotificationPhase, which allows for start..progress..progress..end information for a single operation (cache_images in this case). Related to blueprint image-precache-support Change-Id: I69ae26d4caf4b56ab2c4864455bfe9b5b736dbf3
This commit is contained in:
parent
7ecd502f6d
commit
c935377531
@ -0,0 +1,20 @@
|
||||
{
|
||||
"priority": "INFO",
|
||||
"payload": {
|
||||
"nova_object.version": "1.0",
|
||||
"nova_object.namespace": "nova",
|
||||
"nova_object.name": "AggregateCachePayload",
|
||||
"nova_object.data": {
|
||||
"name": "my-aggregate",
|
||||
"uuid": "788608ec-ebdc-45c5-bc7f-e5f24ab92c80",
|
||||
"host": "compute",
|
||||
"total": 1,
|
||||
"index": 1,
|
||||
"images_cached": ["155d900f-4e14-4e4c-a73d-069cbf4541e6"],
|
||||
"images_failed": [],
|
||||
"id": 1
|
||||
}
|
||||
},
|
||||
"event_type": "aggregate.cache_images.progress",
|
||||
"publisher_id": "nova-conductor:fake-mini"
|
||||
}
|
@ -384,9 +384,9 @@ manner. The images and aggregate provided are checked by the server
|
||||
when the command is run, but the compute nodes are not checked to see
|
||||
if they support image caching until the process runs. Progress and
|
||||
results are logged by each compute, and the process sends
|
||||
``aggregate.cache_images.start`` and ``aggregate.cache_images.end``
|
||||
notifications, which may be useful for monitoring the operation
|
||||
externally.
|
||||
``aggregate.cache_images.start``, ``aggregate.cache_images.progress``,
|
||||
and ``aggregate.cache_images.end`` notifications, which may be useful
|
||||
for monitoring the operation externally.
|
||||
|
||||
References
|
||||
----------
|
||||
|
@ -772,6 +772,45 @@ def notify_about_aggregate_action(context, aggregate, action, phase):
|
||||
notification.emit(context)
|
||||
|
||||
|
||||
@rpc.if_notifications_enabled
|
||||
def notify_about_aggregate_cache(context, aggregate, host, image_status,
|
||||
index, total):
|
||||
"""Send a notification about aggregate cache_images progress.
|
||||
|
||||
:param context: The RequestContext
|
||||
:param aggregate: The target aggregate
|
||||
:param host: The host within the aggregate for which to report status
|
||||
:param image_status: The result from the compute host, which is a dict
|
||||
of {image_id: status}
|
||||
:param index: An integer indicating progress toward completion, between
|
||||
1 and $total
|
||||
:param total: The total number of hosts being processed in this operation,
|
||||
to bound $index
|
||||
"""
|
||||
success_statuses = ('cached', 'existing')
|
||||
payload = aggregate_notification.AggregateCachePayload(aggregate,
|
||||
host,
|
||||
index,
|
||||
total)
|
||||
payload.images_cached = []
|
||||
payload.images_failed = []
|
||||
for img, status in image_status.items():
|
||||
if status in success_statuses:
|
||||
payload.images_cached.append(img)
|
||||
else:
|
||||
payload.images_failed.append(img)
|
||||
notification = aggregate_notification.AggregateCacheNotification(
|
||||
priority=fields.NotificationPriority.INFO,
|
||||
publisher=notification_base.NotificationPublisher(
|
||||
host=CONF.host, source=fields.NotificationSource.CONDUCTOR),
|
||||
event_type=notification_base.EventType(
|
||||
object='aggregate',
|
||||
action=fields.NotificationAction.IMAGE_CACHE,
|
||||
phase=fields.NotificationPhase.PROGRESS),
|
||||
payload=payload)
|
||||
notification.emit(context)
|
||||
|
||||
|
||||
def notify_about_host_update(context, event_suffix, host_payload):
|
||||
"""Send a notification about host update.
|
||||
|
||||
|
@ -1671,10 +1671,12 @@ class ComputeTaskManager(base.Base):
|
||||
stats = collections.defaultdict(lambda: (0, 0, 0, 0))
|
||||
failed_images = collections.defaultdict(int)
|
||||
down_hosts = set()
|
||||
host_stats = {
|
||||
'completed': 0,
|
||||
'total': len(aggregate.hosts),
|
||||
}
|
||||
|
||||
def wrap_cache_images(ctxt, host, image_ids):
|
||||
result = self.compute_rpcapi.cache_images(
|
||||
ctxt, host=host, image_ids=image_ids)
|
||||
def host_completed(context, host, result):
|
||||
for image_id, status in result.items():
|
||||
cached, existing, error, unsupported = stats[image_id]
|
||||
if status == 'error':
|
||||
@ -1688,6 +1690,23 @@ class ComputeTaskManager(base.Base):
|
||||
unsupported += 1
|
||||
stats[image_id] = (cached, existing, error, unsupported)
|
||||
|
||||
host_stats['completed'] += 1
|
||||
compute_utils.notify_about_aggregate_cache(context, aggregate,
|
||||
host, result,
|
||||
host_stats['completed'],
|
||||
host_stats['total'])
|
||||
|
||||
def wrap_cache_images(ctxt, host, image_ids):
|
||||
result = self.compute_rpcapi.cache_images(
|
||||
ctxt,
|
||||
host=host,
|
||||
image_ids=image_ids)
|
||||
host_completed(context, host, result)
|
||||
|
||||
def skipped_host(context, host, image_ids):
|
||||
result = {image: 'skipped' for image in image_ids}
|
||||
host_completed(context, host, result)
|
||||
|
||||
for cell_uuid, hosts in hosts_by_cell.items():
|
||||
cell = cells_by_uuid[cell_uuid]
|
||||
with nova_context.target_cell(context, cell) as target_ctxt:
|
||||
@ -1700,6 +1719,7 @@ class ComputeTaskManager(base.Base):
|
||||
'Skipping image pre-cache request to compute '
|
||||
'%(host)r because it is not up',
|
||||
{'host': host})
|
||||
skipped_host(target_ctxt, host, image_ids)
|
||||
continue
|
||||
|
||||
fetch_pool.spawn_n(wrap_cache_images, target_ctxt, host,
|
||||
|
@ -64,3 +64,52 @@ class AggregateNotification(base.NotificationBase):
|
||||
fields = {
|
||||
'payload': fields.ObjectField('AggregatePayload')
|
||||
}
|
||||
|
||||
|
||||
@nova_base.NovaObjectRegistry.register_notification
|
||||
class AggregateCachePayload(base.NotificationPayloadBase):
|
||||
SCHEMA = {
|
||||
'id': ('aggregate', 'id'),
|
||||
'uuid': ('aggregate', 'uuid'),
|
||||
'name': ('aggregate', 'name'),
|
||||
}
|
||||
|
||||
# Version 1.0: Initial version
|
||||
VERSION = '1.0'
|
||||
|
||||
fields = {
|
||||
'id': fields.IntegerField(),
|
||||
'uuid': fields.UUIDField(),
|
||||
'name': fields.StringField(),
|
||||
|
||||
# The host that we just worked
|
||||
'host': fields.StringField(),
|
||||
|
||||
# The images that were downloaded or are already there
|
||||
'images_cached': fields.ListOfStringsField(),
|
||||
|
||||
# The images that are unable to be cached for some reason
|
||||
'images_failed': fields.ListOfStringsField(),
|
||||
|
||||
# The N/M progress information for this operation
|
||||
'index': fields.IntegerField(),
|
||||
'total': fields.IntegerField(),
|
||||
}
|
||||
|
||||
def __init__(self, aggregate, host, index, total):
|
||||
super(AggregateCachePayload, self).__init__()
|
||||
self.populate_schema(aggregate=aggregate)
|
||||
self.host = host
|
||||
self.index = index
|
||||
self.total = total
|
||||
|
||||
|
||||
@base.notification_sample('aggregate-cache_images-progress.json')
|
||||
@nova_base.NovaObjectRegistry.register_notification
|
||||
class AggregateCacheNotification(base.NotificationBase):
|
||||
# Version 1.0: Initial version
|
||||
VERSION = '1.0'
|
||||
|
||||
fields = {
|
||||
'payload': fields.ObjectField('AggregateCachePayload'),
|
||||
}
|
||||
|
@ -72,7 +72,8 @@ class EventType(NotificationObject):
|
||||
# Version 1.19: SELECT_DESTINATIONS is added to the NotificationActionField
|
||||
# enum
|
||||
# Version 1.20: IMAGE_CACHE is added to the NotificationActionField enum
|
||||
VERSION = '1.20'
|
||||
# Version 1.21: PROGRESS added to NotificationPhase enum
|
||||
VERSION = '1.21'
|
||||
|
||||
fields = {
|
||||
'object': fields.StringField(nullable=False),
|
||||
|
@ -768,8 +768,9 @@ class NotificationPhase(BaseNovaEnum):
|
||||
START = 'start'
|
||||
END = 'end'
|
||||
ERROR = 'error'
|
||||
PROGRESS = 'progress'
|
||||
|
||||
ALL = (START, END, ERROR)
|
||||
ALL = (START, END, ERROR, PROGRESS)
|
||||
|
||||
|
||||
class NotificationSource(BaseNovaEnum):
|
||||
|
@ -72,6 +72,23 @@ class ImageCacheTest(test.TestCase):
|
||||
|
||||
fake_notifier.wait_for_versioned_notifications(
|
||||
'aggregate.cache_images.start')
|
||||
|
||||
progress = fake_notifier.wait_for_versioned_notifications(
|
||||
'aggregate.cache_images.progress', n_events=4)
|
||||
self.assertEqual(4, len(progress), progress)
|
||||
for notification in progress:
|
||||
payload = notification['payload']['nova_object.data']
|
||||
if payload['host'] == 'compute5':
|
||||
self.assertEqual(['an-image'], payload['images_failed'])
|
||||
self.assertEqual([], payload['images_cached'])
|
||||
else:
|
||||
self.assertEqual(['an-image'], payload['images_cached'])
|
||||
self.assertEqual([], payload['images_failed'])
|
||||
self.assertLessEqual(payload['index'], 4)
|
||||
self.assertGreater(payload['index'], 0)
|
||||
self.assertEqual(4, payload['total'])
|
||||
self.assertIn('conductor', notification['publisher_id'])
|
||||
|
||||
fake_notifier.wait_for_versioned_notifications(
|
||||
'aggregate.cache_images.end')
|
||||
|
||||
|
@ -17,6 +17,10 @@ from nova.tests.unit import fake_notifier
|
||||
class TestAggregateNotificationSample(
|
||||
notification_sample_base.NotificationSampleTestBase):
|
||||
|
||||
def setUp(self):
|
||||
self.flags(compute_driver='fake.FakeDriverWithCaching')
|
||||
super(TestAggregateNotificationSample, self).setUp()
|
||||
|
||||
def test_aggregate_create_delete(self):
|
||||
aggregate_req = {
|
||||
"aggregate": {
|
||||
@ -194,7 +198,7 @@ class TestAggregateNotificationSample(
|
||||
fake_notifier.wait_for_versioned_notifications(
|
||||
'aggregate.cache_images.end')
|
||||
|
||||
self.assertEqual(2, len(fake_notifier.VERSIONED_NOTIFICATIONS),
|
||||
self.assertEqual(3, len(fake_notifier.VERSIONED_NOTIFICATIONS),
|
||||
fake_notifier.VERSIONED_NOTIFICATIONS)
|
||||
self._verify_notification(
|
||||
'aggregate-cache_images-start',
|
||||
@ -203,8 +207,14 @@ class TestAggregateNotificationSample(
|
||||
'id': aggregate['id']},
|
||||
actual=fake_notifier.VERSIONED_NOTIFICATIONS[0])
|
||||
self._verify_notification(
|
||||
'aggregate-cache_images-end',
|
||||
'aggregate-cache_images-progress',
|
||||
replacements={
|
||||
'uuid': aggregate['uuid'],
|
||||
'id': aggregate['id']},
|
||||
actual=fake_notifier.VERSIONED_NOTIFICATIONS[1])
|
||||
self._verify_notification(
|
||||
'aggregate-cache_images-end',
|
||||
replacements={
|
||||
'uuid': aggregate['uuid'],
|
||||
'id': aggregate['id']},
|
||||
actual=fake_notifier.VERSIONED_NOTIFICATIONS[2])
|
||||
|
@ -3683,7 +3683,7 @@ class ConductorTaskAPITestCase(_BaseTaskTestCase, test_compute.BaseTestCase):
|
||||
transport_url='')
|
||||
fake_mapping = objects.HostMapping(cell_mapping=fake_cell)
|
||||
mock_gbh.return_value = fake_mapping
|
||||
fake_agg = objects.Aggregate(name='agg', uuid=uuids.agg,
|
||||
fake_agg = objects.Aggregate(name='agg', uuid=uuids.agg, id=1,
|
||||
hosts=['host1', 'host2', 'host3'])
|
||||
|
||||
@mock.patch.object(self.conductor_manager.compute_rpcapi,
|
||||
|
@ -366,6 +366,8 @@ class TestNotificationBase(test.NoDBTestCase):
|
||||
|
||||
|
||||
notification_object_data = {
|
||||
'AggregateCacheNotification': '1.0-a73147b93b520ff0061865849d3dfa56',
|
||||
'AggregateCachePayload': '1.0-3f4dc002bed67d06eecb577242a43572',
|
||||
'AggregateNotification': '1.0-a73147b93b520ff0061865849d3dfa56',
|
||||
'AggregatePayload': '1.1-1eb9adcc4440d8627de6ec37c6398746',
|
||||
'AuditPeriodPayload': '1.0-2b429dd307b8374636703b843fa3f9cb',
|
||||
@ -375,7 +377,7 @@ notification_object_data = {
|
||||
'ComputeTaskNotification': '1.0-a73147b93b520ff0061865849d3dfa56',
|
||||
'ComputeTaskPayload': '1.0-e3d34762c14d131c98337b72e8c600e1',
|
||||
'DestinationPayload': '1.0-4ccf26318dd18c4377dada2b1e74ec2e',
|
||||
'EventType': '1.20-4e02a676d3a18cab99579cacd1c91453',
|
||||
'EventType': '1.21-6a5f57fafe478f354f66b81b4cb537ea',
|
||||
'ExceptionNotification': '1.0-a73147b93b520ff0061865849d3dfa56',
|
||||
'ExceptionPayload': '1.1-6c43008bd81885a63bc7f7c629f0793b',
|
||||
'FlavorNotification': '1.0-a73147b93b520ff0061865849d3dfa56',
|
||||
|
Loading…
x
Reference in New Issue
Block a user