DCorch API Proxy enhacements for scalability

This commit updates the two funtionalities of dcorch-api-proxy,
notify() and enqueue_work(), in order to ensure shared resource
changes are sync'ed timely post dcorch redesign.

The logic of the notify() method is updated to utilize the new
EngineWorkerService through an RPC call.

For a high number of subclouds, the enqueue_work method in utils.py
significantly delayed the execution of a dcorch proxy command due to
creating objects in the orch_request table one by one within a for
loop for each subcloud. This logic has been changed to use
SQLAlchemy's bulk_insert_mappings method, which allows for inserting
all requests in a single session. This optimization reduced the
execution time from several minutes to instantaneous.

Test Plan:
PASS: Run a command with '--os-region-name SystemController' on a
scale LAB. Verify that:
   - The proxy creates the orchestration requests for each subcloud
     in the `orch_request` table.
   - The proxy updates the subcloud_sync table for the managed
     subclouds, setting `sync_request` to 'requested'.
   - The changes are synchronized to all the subclouds by the
     `sync_job`.
   - The delay for completing the proxy commandd is reduced
     significantly.

Story: 2011106
Task: 50308

Change-Id: I56d37df8ab78ddbe72f7670555c7a01bcb3d2917
Signed-off-by: Enzo Candotti <enzo.candotti@windriver.com>
This commit is contained in:
Enzo Candotti 2024-06-10 19:30:55 -03:00
parent 91e79d3783
commit dbc1fa3dcd
10 changed files with 98 additions and 65 deletions

View File

@ -66,6 +66,7 @@ class APIController(Middleware):
super(APIController, self).__init__(app)
self.ctxt = k_context.get_admin_context()
self._default_dispatcher = APIDispatcher(app)
self.rpc_worker_client = rpc_client.EngineWorkerClient()
self.rpc_client = rpc_client.EngineClient()
self.response_hander_map = {}
self.sync_endpoint = proxy_utils.get_sync_endpoint(CONF)
@ -91,7 +92,7 @@ class APIController(Middleware):
return construct_url(environ)
def notify(self, environ, endpoint_type):
self.rpc_client.sync_request(self.ctxt, endpoint_type)
self.rpc_worker_client.sync_request(self.ctxt, endpoint_type)
def process_request(self, req):
return self._default_dispatcher

View File

@ -15,6 +15,7 @@
# limitations under the License.
import itertools
import uuid
from oslo_db import exception as oslo_db_exception
from oslo_log import log as logging
@ -23,8 +24,8 @@ import six.moves
from dccommon import consts as dccommon_consts
from dcorch.common import consts
from dcorch.common import exceptions
from dcorch.db import api as db_api
from dcorch.objects import orchjob
from dcorch.objects import orchrequest
from dcorch.objects import resource
from dcorch.objects import subcloud as subcloud_obj
@ -182,13 +183,25 @@ def enqueue_work(context, endpoint_type,
subclouds = [subcloud]
else:
subclouds = subcloud_obj.SubcloudList.get_all(context)
orch_requests = []
for sc in subclouds:
orch_req = orchrequest.OrchRequest(
context=context, state=consts.ORCH_REQUEST_QUEUED,
target_region_name=sc.region_name,
# Create a dictionary for each orchestration request with a unique UUID,
# state = 'queued', the target region name, and the orch_job ID
orch_request = {
'uuid': str(uuid.uuid4()),
'state': consts.ORCH_REQUEST_QUEUED,
'target_region_name': sc.region_name,
# pylint: disable-next=no-member
orch_job_id=orch_job.id)
orch_req.create()
LOG.info("Work order created for {}:{}/{}/{}/{}".format(
'orch_job_id': orch_job.id,
}
orch_requests.append(orch_request)
# Use the bulk_insert_mappings method to insert all orchestration requests
# in a single session
db_api.orch_request_create_bulk(context, orch_requests)
LOG.info(
f"Work order created for {len(subclouds)} subclouds for resource "
# pylint: disable-next=no-member
subcloud, rsrc.id, resource_type, source_resource_id, operation_type))
f"{rsrc.id}/{resource_type}/{source_resource_id}/{operation_type}"
)

View File

@ -339,6 +339,10 @@ def orch_request_create(context, orch_job_id, target_region_name, values):
target_region_name, values)
def orch_request_create_bulk(context, orch_requests):
return IMPL.orch_request_create_bulk(context, orch_requests)
def orch_request_update(context, orch_request_id, values):
return IMPL.orch_request_update(context, orch_request_id, values)

View File

@ -981,6 +981,21 @@ def orch_request_create(context, orch_job_id, target_region_name, values):
return result
def orch_request_create_bulk(context, orch_requests):
for request in orch_requests:
if 'orch_job_id' not in request:
raise exception.ObjectActionError(
action="create_bulk",
reason="cannot create an OrchRequest object without a orch_job_id set")
if 'target_region_name' not in request:
raise exception.ObjectActionError(
action="create_bulk",
reason="cannot create an OrchRequest object without a "
"target_region_name set")
with write_session() as session:
session.bulk_insert_mappings(models.OrchRequest, orch_requests)
@require_admin_context
def orch_request_update(context, orch_request_id, values):
with write_session() as session:

View File

@ -125,13 +125,6 @@ class GenericSyncManager(object):
else:
LOG.debug("No eligible subclouds for audit.")
def sync_request(self, ctxt, endpoint_type):
# Someone has enqueued a sync job. set the endpoint sync_request to
# requested
db_api.subcloud_sync_update_all(
ctxt, dccommon_consts.MANAGEMENT_MANAGED, endpoint_type,
values={'sync_request': dco_consts.SYNC_STATUS_REQUESTED})
def _send_chunk(self, rpc_method, subcloud_sync_chunk):
try:
rpc_method(self.context, subcloud_sync_chunk)

View File

@ -362,3 +362,10 @@ class GenericSyncWorkerManager(object):
db_api.subcloud_sync_update(
self.context, sc_region_name, ept,
values={'audit_status': dco_consts.AUDIT_STATUS_FAILED})
def sync_request(self, ctxt, endpoint_type):
# Someone has enqueued a sync job. set the endpoint sync_request to
# requested
db_api.subcloud_sync_update_all(
ctxt, dccommon_consts.MANAGEMENT_MANAGED, endpoint_type,
values={'sync_request': dco_consts.SYNC_STATUS_REQUESTED})

View File

@ -118,13 +118,6 @@ class EngineService(service.Service):
self.ism.init_actions()
self.TG.start(self.ism.initial_sync_thread)
@request_context
# The sync job info has been written to the DB, alert the sync engine
# that there is work to do.
# TODO(lzhu1): add authentication since ctxt not actually needed later
def sync_request(self, ctxt, endpoint_type):
self.gsm.sync_request(ctxt, endpoint_type)
def periodic_balance_all(self):
# Automated Quota Sync for all the keystone projects
LOG.info("Periodic quota sync job started at: %s",
@ -354,3 +347,10 @@ class EngineWorkerService(service.Service):
# Terminate the engine process
LOG.info("All threads were gone, terminating engine-worker")
super(EngineWorkerService, self).stop()
@request_context
# The sync job info has been written to the DB, alert the sync engine
# that there is work to do.
# TODO(lzhu1): add authentication since ctxt not actually needed later
def sync_request(self, ctxt, endpoint_type):
self.gswm.sync_request(ctxt, endpoint_type)

View File

@ -53,12 +53,6 @@ class EngineClient(object):
client = self._client
return client.cast(ctxt, method, **kwargs)
# The sync job info has been written to the DB, alert the sync engine
# that there is work to do.
def sync_request(self, ctxt, endpoint_type):
return self.cast(
ctxt, self.make_msg('sync_request', endpoint_type=endpoint_type))
def get_usage_for_project_and_user(self, ctxt, endpoint_type,
project_id, user_id=None):
return self.call(ctxt, self.make_msg('get_usage_for_project_and_user',
@ -191,3 +185,9 @@ class EngineWorkerClient(object):
management_ip=management_ip,
),
)
# The sync job info has been written to the DB, alert the sync engine
# that there is work to do.
def sync_request(self, ctxt, endpoint_type):
return self.cast(
ctxt, self.make_msg('sync_request', endpoint_type=endpoint_type))

View File

@ -240,39 +240,3 @@ class TestGenericSyncManager(base.OrchestratorTestCase):
dccommon_consts.ENDPOINT_TYPE_PLATFORM)
self.assertEqual(consts.AUDIT_STATUS_IN_PROGRESS,
subcloud_sync_platform.audit_status)
def test_sync_request(self):
subcloud1 = utils.create_subcloud_static(
self.ctx,
name='subcloud1',
management_state=dccommon_consts.MANAGEMENT_MANAGED,
initial_sync_state=consts.INITIAL_SYNC_STATE_NONE)
utils.create_subcloud_sync_static(
self.ctx,
subcloud1.region_name,
dccommon_consts.ENDPOINT_TYPE_IDENTITY,
subcloud_id=subcloud1.id)
subcloud2 = utils.create_subcloud_static(
self.ctx,
name='subcloud2',
management_state=dccommon_consts.MANAGEMENT_MANAGED,
initial_sync_state=consts.INITIAL_SYNC_STATE_FAILED)
utils.create_subcloud_sync_static(
self.ctx,
subcloud2.region_name,
dccommon_consts.ENDPOINT_TYPE_IDENTITY,
vsubcloud_id=subcloud2.id)
gsm = generic_sync_manager.GenericSyncManager()
gsm.sync_request(self.ctx, dccommon_consts.ENDPOINT_TYPE_IDENTITY)
# Verify the sync_request of the subclouds were updated to requested
subcloud_sync = db_api.subcloud_sync_get(
self.ctx, 'subcloud1', dccommon_consts.ENDPOINT_TYPE_IDENTITY)
self.assertEqual(consts.SYNC_STATUS_REQUESTED,
subcloud_sync.sync_request)
subcloud_sync = db_api.subcloud_sync_get(
self.ctx, 'subcloud2', dccommon_consts.ENDPOINT_TYPE_IDENTITY)
self.assertEqual(consts.SYNC_STATUS_REQUESTED,
subcloud_sync.sync_request)

View File

@ -10,6 +10,7 @@ from oslo_utils import uuidutils
from dccommon import consts as dccommon_consts
from dcorch.common import consts
from dcorch.db.sqlalchemy import api as db_api
from dcorch.engine import generic_sync_worker_manager
from dcorch.tests import base
from dcorch.tests import utils
@ -129,3 +130,38 @@ class TestGenericSyncWorkerManager(base.OrchestratorTestCase):
subcloud_name,
endpoint_type,
mock.ANY)
def test_sync_request(self):
subcloud1 = utils.create_subcloud_static(
self.ctx,
name='subcloud1',
management_state=dccommon_consts.MANAGEMENT_MANAGED,
initial_sync_state=consts.INITIAL_SYNC_STATE_NONE)
utils.create_subcloud_sync_static(
self.ctx,
subcloud1.region_name,
dccommon_consts.ENDPOINT_TYPE_IDENTITY,
subcloud_id=subcloud1.id)
subcloud2 = utils.create_subcloud_static(
self.ctx,
name='subcloud2',
management_state=dccommon_consts.MANAGEMENT_MANAGED,
initial_sync_state=consts.INITIAL_SYNC_STATE_FAILED)
utils.create_subcloud_sync_static(
self.ctx,
subcloud2.region_name,
dccommon_consts.ENDPOINT_TYPE_IDENTITY,
vsubcloud_id=subcloud2.id)
self.gswm.sync_request(self.ctx, dccommon_consts.ENDPOINT_TYPE_IDENTITY)
# Verify the sync_request of the subclouds were updated to requested
subcloud_sync = db_api.subcloud_sync_get(
self.ctx, 'subcloud1', dccommon_consts.ENDPOINT_TYPE_IDENTITY)
self.assertEqual(consts.SYNC_STATUS_REQUESTED,
subcloud_sync.sync_request)
subcloud_sync = db_api.subcloud_sync_get(
self.ctx, 'subcloud2', dccommon_consts.ENDPOINT_TYPE_IDENTITY)
self.assertEqual(consts.SYNC_STATUS_REQUESTED,
subcloud_sync.sync_request)