Move task_log functions to conductor
This moves the use of the following functions in compute/utils to the conductor: task_log_get() task_log_begin_task() task_log_end_task() This removes the db import from compute/utils.py Related to blueprint no-db-compute Change-Id: I6ed0563969afee78d9f2970308bc7f91e5690159
This commit is contained in:
parent
d70d2f374d
commit
896bf4f9b1
@ -3085,7 +3085,9 @@ class ComputeManager(manager.SchedulerDependentManager):
|
||||
@manager.periodic_task
|
||||
def _instance_usage_audit(self, context):
|
||||
if CONF.instance_usage_audit:
|
||||
if not compute_utils.has_audit_been_run(context, self.host):
|
||||
if not compute_utils.has_audit_been_run(context,
|
||||
self.conductor_api,
|
||||
self.host):
|
||||
begin, end = utils.last_completed_audit_period()
|
||||
capi = self.conductor_api
|
||||
instances = capi.instance_get_active_by_window_joined(
|
||||
@ -3102,6 +3104,7 @@ class ComputeManager(manager.SchedulerDependentManager):
|
||||
number_instances=num_instances))
|
||||
start_time = time.time()
|
||||
compute_utils.start_instance_usage_audit(context,
|
||||
self.conductor_api,
|
||||
begin, end,
|
||||
self.host, num_instances)
|
||||
for instance in instances:
|
||||
@ -3117,6 +3120,7 @@ class ComputeManager(manager.SchedulerDependentManager):
|
||||
instance=instance)
|
||||
errors += 1
|
||||
compute_utils.finish_instance_usage_audit(context,
|
||||
self.conductor_api,
|
||||
begin, end,
|
||||
self.host, errors,
|
||||
"Instance usage audit ran "
|
||||
|
@ -22,7 +22,6 @@ import traceback
|
||||
|
||||
from nova import block_device
|
||||
from nova.compute import instance_types
|
||||
from nova import db
|
||||
from nova import exception
|
||||
from nova.network import model as network_model
|
||||
from nova import notifications
|
||||
@ -206,24 +205,27 @@ def get_nw_info_for_instance(instance):
|
||||
return network_model.NetworkInfo.hydrate(cached_nwinfo)
|
||||
|
||||
|
||||
def has_audit_been_run(context, host, timestamp=None):
|
||||
def has_audit_been_run(context, conductor, host, timestamp=None):
|
||||
begin, end = utils.last_completed_audit_period(before=timestamp)
|
||||
task_log = db.task_log_get(context, "instance_usage_audit",
|
||||
begin, end, host)
|
||||
task_log = conductor.task_log_get(context, "instance_usage_audit",
|
||||
begin, end, host)
|
||||
if task_log:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def start_instance_usage_audit(context, begin, end, host, num_instances):
|
||||
db.task_log_begin_task(context, "instance_usage_audit", begin, end, host,
|
||||
num_instances, "Instance usage audit started...")
|
||||
def start_instance_usage_audit(context, conductor, begin, end, host,
|
||||
num_instances):
|
||||
conductor.task_log_begin_task(context, "instance_usage_audit", begin,
|
||||
end, host, num_instances,
|
||||
"Instance usage audit started...")
|
||||
|
||||
|
||||
def finish_instance_usage_audit(context, begin, end, host, errors, message):
|
||||
db.task_log_end_task(context, "instance_usage_audit", begin, end, host,
|
||||
errors, message)
|
||||
def finish_instance_usage_audit(context, conductor, begin, end, host, errors,
|
||||
message):
|
||||
conductor.task_log_end_task(context, "instance_usage_audit", begin, end,
|
||||
host, errors, message)
|
||||
|
||||
|
||||
def usage_volume_info(vol_usage):
|
||||
|
@ -296,6 +296,22 @@ class LocalAPI(object):
|
||||
def service_update(self, context, service, values):
|
||||
return self._manager.service_update(context, service, values)
|
||||
|
||||
def task_log_get(self, context, task_name, begin, end, host, state):
|
||||
return self._manager.task_log_get(context, task_name, begin, end,
|
||||
host, state)
|
||||
|
||||
def task_log_begin_task(self, context, task_name, begin, end, host,
|
||||
task_items=None, message=None):
|
||||
return self._manager.task_log_begin_task(context, task_name,
|
||||
begin, end, host,
|
||||
task_items, message)
|
||||
|
||||
def task_log_end_task(self, context, task_name, begin, end, host,
|
||||
errors, message=None):
|
||||
return self._manager.task_log_end_task(context, task_name,
|
||||
begin, end, host,
|
||||
errors, message)
|
||||
|
||||
|
||||
class API(object):
|
||||
"""Conductor API that does updates via RPC to the ConductorManager."""
|
||||
@ -570,3 +586,19 @@ class API(object):
|
||||
|
||||
def service_update(self, context, service, values):
|
||||
return self.conductor_rpcapi.service_update(context, service, values)
|
||||
|
||||
def task_log_get(self, context, task_name, begin, end, host, state):
|
||||
return self.conductor_rpcapi.task_log_get(context, task_name, begin,
|
||||
end, host, state)
|
||||
|
||||
def task_log_begin_task(self, context, task_name, begin, end, host,
|
||||
task_items=None, message=None):
|
||||
return self.conductor_rpcapi.task_log_begin_task(context, task_name,
|
||||
begin, end, host,
|
||||
task_items, message)
|
||||
|
||||
def task_log_end_task(self, context, task_name, begin, end, host,
|
||||
errors, message=None):
|
||||
return self.conductor_rpcapi.task_log_end_task(context, task_name,
|
||||
begin, end, host,
|
||||
errors, message)
|
||||
|
@ -43,7 +43,7 @@ datetime_fields = ['launched_at', 'terminated_at']
|
||||
class ConductorManager(manager.SchedulerDependentManager):
|
||||
"""Mission: TBD."""
|
||||
|
||||
RPC_API_VERSION = '1.36'
|
||||
RPC_API_VERSION = '1.37'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(ConductorManager, self).__init__(service_name='conductor',
|
||||
@ -323,3 +323,21 @@ class ConductorManager(manager.SchedulerDependentManager):
|
||||
def service_update(self, context, service, values):
|
||||
svc = self.db.service_update(context, service['id'], values)
|
||||
return jsonutils.to_primitive(svc)
|
||||
|
||||
def task_log_get(self, context, task_name, begin, end, host, state=None):
|
||||
result = self.db.task_log_get(context, task_name, begin, end, host,
|
||||
state)
|
||||
return jsonutils.to_primitive(result)
|
||||
|
||||
def task_log_begin_task(self, context, task_name, begin, end, host,
|
||||
task_items=None, message=None):
|
||||
result = self.db.task_log_begin_task(context.elevated(), task_name,
|
||||
begin, end, host, task_items,
|
||||
message)
|
||||
return jsonutils.to_primitive(result)
|
||||
|
||||
def task_log_end_task(self, context, task_name, begin, end, host,
|
||||
errors, message=None):
|
||||
result = self.db.task_log_end_task(context.elevated(), task_name,
|
||||
begin, end, host, errors, message)
|
||||
return jsonutils.to_primitive(result)
|
||||
|
@ -69,6 +69,7 @@ class ConductorAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
||||
1.34 - Added service_update
|
||||
1.35 - Added instance_get_active_by_window_joined
|
||||
1.36 - Added instance_fault_create
|
||||
1.37 - Added task_log_get, task_log_begin_task, task_log_end_task
|
||||
"""
|
||||
|
||||
BASE_RPC_API_VERSION = '1.0'
|
||||
@ -335,3 +336,22 @@ class ConductorAPI(nova.openstack.common.rpc.proxy.RpcProxy):
|
||||
service_p = jsonutils.to_primitive(service)
|
||||
msg = self.make_msg('service_update', service=service_p, values=values)
|
||||
return self.call(context, msg, version='1.34')
|
||||
|
||||
def task_log_get(self, context, task_name, begin, end, host, state=None):
|
||||
msg = self.make_msg('task_log_get', task_name=task_name,
|
||||
begin=begin, end=end, host=host, state=state)
|
||||
return self.call(context, msg, version='1.37')
|
||||
|
||||
def task_log_begin_task(self, context, task_name, begin, end, host,
|
||||
task_items=None, message=None):
|
||||
msg = self.make_msg('task_log_begin_task', task_name=task_name,
|
||||
begin=begin, end=end, host=host,
|
||||
task_items=task_items, message=message)
|
||||
return self.call(context, msg, version='1.37')
|
||||
|
||||
def task_log_end_task(self, context, task_name, begin, end, host, errors,
|
||||
message=None):
|
||||
msg = self.make_msg('task_log_end_task', task_name=task_name,
|
||||
begin=begin, end=end, host=host, errors=errors,
|
||||
message=message)
|
||||
return self.call(context, msg, version='1.37')
|
||||
|
@ -435,6 +435,34 @@ class _BaseTestCase(object):
|
||||
'fake-values')
|
||||
self.assertEqual(result, 'fake-result')
|
||||
|
||||
def test_task_log_get(self):
|
||||
self.mox.StubOutWithMock(db, 'task_log_get')
|
||||
db.task_log_get(self.context, 'task', 'begin', 'end', 'host',
|
||||
'state').AndReturn('result')
|
||||
self.mox.ReplayAll()
|
||||
result = self.conductor.task_log_get(self.context, 'task', 'begin',
|
||||
'end', 'host', 'state')
|
||||
self.assertEqual(result, 'result')
|
||||
|
||||
def test_task_log_begin_task(self):
|
||||
self.mox.StubOutWithMock(db, 'task_log_begin_task')
|
||||
db.task_log_begin_task(self.context.elevated(), 'task', 'begin',
|
||||
'end', 'host', 'items',
|
||||
'message').AndReturn('result')
|
||||
self.mox.ReplayAll()
|
||||
result = self.conductor.task_log_begin_task(
|
||||
self.context, 'task', 'begin', 'end', 'host', 'items', 'message')
|
||||
self.assertEqual(result, 'result')
|
||||
|
||||
def test_task_log_end_task(self):
|
||||
self.mox.StubOutWithMock(db, 'task_log_end_task')
|
||||
db.task_log_end_task(self.context.elevated(), 'task', 'begin', 'end',
|
||||
'host', 'errors', 'message').AndReturn('result')
|
||||
self.mox.ReplayAll()
|
||||
result = self.conductor.task_log_end_task(
|
||||
self.context, 'task', 'begin', 'end', 'host', 'errors', 'message')
|
||||
self.assertEqual(result, 'result')
|
||||
|
||||
|
||||
class ConductorTestCase(_BaseTestCase, test.TestCase):
|
||||
"""Conductor Manager Tests."""
|
||||
|
Loading…
x
Reference in New Issue
Block a user