Merge "dcorch-engine endpoint type handling improvements"
This commit is contained in:
commit
67029e0d75
@ -138,9 +138,17 @@ class GenericSyncManager(object):
|
|||||||
for sc in subclouds:
|
for sc in subclouds:
|
||||||
if sc.region_name in self.sync_objs.keys():
|
if sc.region_name in self.sync_objs.keys():
|
||||||
sc_names.append(sc.region_name)
|
sc_names.append(sc.region_name)
|
||||||
for e in self.sync_objs[sc.region_name].keys():
|
for ept in self.sync_objs[sc.region_name].keys():
|
||||||
self.sync_subcloud(self.context, engine_id, sc.region_name,
|
try:
|
||||||
e, 'sync')
|
self.sync_subcloud(self.context, engine_id, sc.region_name,
|
||||||
|
ept, 'sync')
|
||||||
|
except exceptions.SubcloudSyncNotFound:
|
||||||
|
# The endpoint in subcloud_sync has been removed
|
||||||
|
LOG.info("Engine id:(%s/%s) SubcloudSyncNotFound "
|
||||||
|
"remove from sync_obj endpoint_type %s" %
|
||||||
|
(engine_id, sc.region_name, ept))
|
||||||
|
self.sync_objs[sc.region_name].pop(ept, None)
|
||||||
|
|
||||||
LOG.debug('Engine id:(%s) Waiting for sync_subclouds %s to complete.'
|
LOG.debug('Engine id:(%s) Waiting for sync_subclouds %s to complete.'
|
||||||
% (engine_id, sc_names))
|
% (engine_id, sc_names))
|
||||||
for thread in self.subcloud_threads:
|
for thread in self.subcloud_threads:
|
||||||
@ -176,6 +184,7 @@ class GenericSyncManager(object):
|
|||||||
# precheck if the sync_state is still started
|
# precheck if the sync_state is still started
|
||||||
subcloud_sync = db_api.subcloud_sync_get(context, subcloud_name,
|
subcloud_sync = db_api.subcloud_sync_get(context, subcloud_name,
|
||||||
endpoint_type)
|
endpoint_type)
|
||||||
|
|
||||||
if subcloud_sync.sync_request in [dco_consts.SYNC_STATUS_REQUESTED,
|
if subcloud_sync.sync_request in [dco_consts.SYNC_STATUS_REQUESTED,
|
||||||
dco_consts.SYNC_STATUS_FAILED]:
|
dco_consts.SYNC_STATUS_FAILED]:
|
||||||
self.mutex_start_thread(
|
self.mutex_start_thread(
|
||||||
@ -368,6 +377,7 @@ class GenericSyncManager(object):
|
|||||||
if capabilities.get('endpoint_types') is None:
|
if capabilities.get('endpoint_types') is None:
|
||||||
# assign back if 'endpoint_types' is not in capabilities
|
# assign back if 'endpoint_types' is not in capabilities
|
||||||
capabilities['endpoint_types'] = c_endpoint_type_list
|
capabilities['endpoint_types'] = c_endpoint_type_list
|
||||||
|
sc.capabilities = capabilities
|
||||||
sc.save()
|
sc.save()
|
||||||
|
|
||||||
# Create objects for the endpoint types
|
# Create objects for the endpoint types
|
||||||
@ -389,8 +399,9 @@ class GenericSyncManager(object):
|
|||||||
|
|
||||||
# skip creation if a sync_obj of this endpoint type already
|
# skip creation if a sync_obj of this endpoint type already
|
||||||
# exists
|
# exists
|
||||||
if not self.sync_objs[subcloud_name].get(
|
sync_obj = self.sync_objs[subcloud_name].get(
|
||||||
endpoint_type == endpoint_type):
|
endpoint_type == endpoint_type)
|
||||||
|
if not sync_obj:
|
||||||
LOG.info("add_subcloud_sync_endpoint_type "
|
LOG.info("add_subcloud_sync_endpoint_type "
|
||||||
"subcloud_name=%s, sync_obj add=%s" %
|
"subcloud_name=%s, sync_obj add=%s" %
|
||||||
(subcloud_name, endpoint_type))
|
(subcloud_name, endpoint_type))
|
||||||
@ -441,6 +452,7 @@ class GenericSyncManager(object):
|
|||||||
for endpoint_type in endpoint_type_list:
|
for endpoint_type in endpoint_type_list:
|
||||||
if endpoint_type in c_endpoint_type_list:
|
if endpoint_type in c_endpoint_type_list:
|
||||||
c_endpoint_type_list.remove(endpoint_type)
|
c_endpoint_type_list.remove(endpoint_type)
|
||||||
|
sc.capabilities = capabilities
|
||||||
sc.save()
|
sc.save()
|
||||||
|
|
||||||
def update_subcloud_version(self, context, subcloud_name, sw_version):
|
def update_subcloud_version(self, context, subcloud_name, sw_version):
|
||||||
|
@ -1,214 +0,0 @@
|
|||||||
# Copyright 2017-2020 Wind River
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
||||||
# implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
import threading
|
|
||||||
|
|
||||||
|
|
||||||
from dccommon import consts as dccommon_consts
|
|
||||||
from dcmanager.common import consts as dcm_consts
|
|
||||||
from dcorch.common import consts as dco_consts
|
|
||||||
from dcorch.common import context as dc_context
|
|
||||||
from dcorch.engine.sync_services.identity import IdentitySyncThread
|
|
||||||
from dcorch.engine.sync_services.sysinv import SysinvSyncThread
|
|
||||||
from dcorch.objects.subcloud import Subcloud
|
|
||||||
from oslo_log import log as logging
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
|
||||||
|
|
||||||
# sync thread endpoint type and subclass mappings
|
|
||||||
syncthread_subclass_map = {
|
|
||||||
dco_consts.ENDPOINT_TYPE_PLATFORM: SysinvSyncThread,
|
|
||||||
dco_consts.ENDPOINT_TYPE_IDENTITY: IdentitySyncThread,
|
|
||||||
dccommon_consts.ENDPOINT_TYPE_IDENTITY_OS: IdentitySyncThread
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
class SubCloudEngine(object):
|
|
||||||
def __init__(self, context=None, name=None,
|
|
||||||
version=None, subcloud=None):
|
|
||||||
"""Init a subcloud
|
|
||||||
|
|
||||||
:param context: operational context
|
|
||||||
:param name: subcloud name, currently must match region name
|
|
||||||
:param version: TiC software version running in subcloud
|
|
||||||
:param subcloud: an embedded Subcloud object, persisted to DB
|
|
||||||
"""
|
|
||||||
super(SubCloudEngine, self).__init__()
|
|
||||||
if subcloud is not None:
|
|
||||||
self.subcloud = subcloud
|
|
||||||
else:
|
|
||||||
capabilities = {}
|
|
||||||
endpoint_type_list = dco_consts.SYNC_ENDPOINT_TYPES_LIST[:]
|
|
||||||
capabilities.update({'endpoint_types': endpoint_type_list})
|
|
||||||
self.subcloud = Subcloud(
|
|
||||||
context, region_name=name, software_version=version,
|
|
||||||
capabilities=capabilities)
|
|
||||||
self.subcloud.create()
|
|
||||||
self.lock = threading.Lock() # protects the status
|
|
||||||
self.capabilities_lock = threading.Lock() # protects capabilities
|
|
||||||
self.sync_threads_lock = threading.Lock() # protects sync_threads
|
|
||||||
self.sync_threads = [] # the individual SyncThread objects
|
|
||||||
self.context = context if context else dc_context.get_admin_context()
|
|
||||||
self.name = name
|
|
||||||
|
|
||||||
def set_version(self, version):
|
|
||||||
subcloud = Subcloud.get_by_name(self.context, self.name)
|
|
||||||
# todo make sure we only increase the version
|
|
||||||
subcloud.software_version = version
|
|
||||||
subcloud.save()
|
|
||||||
|
|
||||||
def is_managed(self):
|
|
||||||
# is this subcloud managed
|
|
||||||
subcloud = Subcloud.get_by_name(self.context, self.name)
|
|
||||||
return subcloud.management_state == dcm_consts.MANAGEMENT_MANAGED
|
|
||||||
|
|
||||||
def is_enabled(self):
|
|
||||||
# is this subcloud enabled
|
|
||||||
subcloud = Subcloud.get_by_name(self.context, self.name)
|
|
||||||
# We only enable syncing if the subcloud is online and the initial
|
|
||||||
# sync has completed.
|
|
||||||
if subcloud.availability_status == dcm_consts.AVAILABILITY_ONLINE and \
|
|
||||||
subcloud.initial_sync_state == \
|
|
||||||
dco_consts.INITIAL_SYNC_STATE_COMPLETED:
|
|
||||||
return True
|
|
||||||
else:
|
|
||||||
return False
|
|
||||||
|
|
||||||
def is_ready(self):
|
|
||||||
# is this subcloud ready for synchronization
|
|
||||||
return self.is_managed() and self.is_enabled()
|
|
||||||
|
|
||||||
def state_matches(self, management_state=None, availability_status=None,
|
|
||||||
initial_sync_state=None):
|
|
||||||
# compare subcloud states
|
|
||||||
match = True
|
|
||||||
subcloud = Subcloud.get_by_name(self.context, self.name)
|
|
||||||
if management_state is not None:
|
|
||||||
if subcloud.management_state != management_state:
|
|
||||||
match = False
|
|
||||||
if match and availability_status is not None:
|
|
||||||
if subcloud.availability_status != availability_status:
|
|
||||||
match = False
|
|
||||||
if match and initial_sync_state is not None:
|
|
||||||
if subcloud.initial_sync_state != initial_sync_state:
|
|
||||||
match = False
|
|
||||||
return match
|
|
||||||
|
|
||||||
def update_state(self, management_state=None, availability_status=None,
|
|
||||||
initial_sync_state=None):
|
|
||||||
subcloud = Subcloud.get_by_name(self.context, self.name)
|
|
||||||
if management_state is not None:
|
|
||||||
subcloud.management_state = management_state
|
|
||||||
if availability_status is not None:
|
|
||||||
subcloud.availability_status = availability_status
|
|
||||||
if initial_sync_state is not None:
|
|
||||||
subcloud.initial_sync_state = initial_sync_state
|
|
||||||
subcloud.save()
|
|
||||||
|
|
||||||
def enable(self):
|
|
||||||
# enable syncing for this subcloud
|
|
||||||
for thread in self.sync_threads:
|
|
||||||
thread.enable()
|
|
||||||
|
|
||||||
def wake(self, endpoint_type):
|
|
||||||
# wake specific endpoint type sync thread to process work
|
|
||||||
if self.is_enabled():
|
|
||||||
for thread in self.sync_threads:
|
|
||||||
if thread.endpoint_type == endpoint_type:
|
|
||||||
thread.wake()
|
|
||||||
|
|
||||||
def disable(self):
|
|
||||||
# nothing to do here at the moment
|
|
||||||
pass
|
|
||||||
|
|
||||||
def delete(self):
|
|
||||||
# first update the state of the subcloud
|
|
||||||
self.update_state(management_state=dcm_consts.MANAGEMENT_UNMANAGED,
|
|
||||||
availability_status=dcm_consts.AVAILABILITY_OFFLINE)
|
|
||||||
# shutdown, optionally deleting queued work
|
|
||||||
while self.sync_threads:
|
|
||||||
thread = self.sync_threads.pop()
|
|
||||||
thread.shutdown()
|
|
||||||
# delete this subcloud
|
|
||||||
Subcloud.delete_subcloud_by_name(self.context, self.name)
|
|
||||||
|
|
||||||
def initial_sync(self):
|
|
||||||
# initial synchronization of the subcloud
|
|
||||||
for thread in self.sync_threads:
|
|
||||||
thread.initial_sync()
|
|
||||||
|
|
||||||
def run_sync_audit(self, engine_id):
|
|
||||||
# run periodic sync audit on all threads in this subcloud
|
|
||||||
for thread in self.sync_threads:
|
|
||||||
thread.run_sync_audit(engine_id)
|
|
||||||
|
|
||||||
def add_sync_endpoint_type(self, endpoint_type_list):
|
|
||||||
# add the endpoint types into subcloud capabilities
|
|
||||||
with self.capabilities_lock:
|
|
||||||
capabilities = self.subcloud.capabilities
|
|
||||||
c_endpoint_type_list = capabilities.get('endpoint_types', [])
|
|
||||||
|
|
||||||
if endpoint_type_list:
|
|
||||||
for endpoint_type in endpoint_type_list:
|
|
||||||
if endpoint_type not in c_endpoint_type_list:
|
|
||||||
c_endpoint_type_list.append(endpoint_type)
|
|
||||||
if capabilities.get('endpoint_types') is None:
|
|
||||||
# assign back if 'endpoint_types' is not in capabilities
|
|
||||||
capabilities['endpoint_types'] = c_endpoint_type_list
|
|
||||||
self.subcloud.save()
|
|
||||||
|
|
||||||
# Start threads for the endpoint types
|
|
||||||
if endpoint_type_list:
|
|
||||||
with self.sync_threads_lock:
|
|
||||||
for endpoint_type in endpoint_type_list:
|
|
||||||
# skip creation if a thread of this endpoint type already
|
|
||||||
# exists
|
|
||||||
endpoint_thread_exist = False
|
|
||||||
for exist_thread in self.sync_threads:
|
|
||||||
if endpoint_type == exist_thread.endpoint_type:
|
|
||||||
endpoint_thread_exist = True
|
|
||||||
break
|
|
||||||
if endpoint_thread_exist:
|
|
||||||
continue
|
|
||||||
|
|
||||||
thread = syncthread_subclass_map[endpoint_type](
|
|
||||||
self, endpoint_type=endpoint_type)
|
|
||||||
|
|
||||||
self.sync_threads.append(thread)
|
|
||||||
thread.start()
|
|
||||||
if self.is_ready():
|
|
||||||
thread.enable()
|
|
||||||
thread.initial_sync()
|
|
||||||
|
|
||||||
def remove_sync_endpoint_type(self, endpoint_type_list):
|
|
||||||
# Stop threads for endpoint types to be removed
|
|
||||||
if endpoint_type_list:
|
|
||||||
with self.sync_threads_lock:
|
|
||||||
for endpoint_type in endpoint_type_list:
|
|
||||||
for thread in self.sync_threads:
|
|
||||||
if thread.endpoint_type == endpoint_type:
|
|
||||||
self.sync_threads.remove(thread)
|
|
||||||
thread.shutdown()
|
|
||||||
|
|
||||||
# remove the endpoint types from subcloud capabilities
|
|
||||||
with self.capabilities_lock:
|
|
||||||
capabilities = self.subcloud.capabilities
|
|
||||||
c_endpoint_type_list = capabilities.get('endpoint_types', [])
|
|
||||||
|
|
||||||
if endpoint_type_list and c_endpoint_type_list:
|
|
||||||
for endpoint_type in endpoint_type_list:
|
|
||||||
if endpoint_type in c_endpoint_type_list:
|
|
||||||
c_endpoint_type_list.remove(endpoint_type)
|
|
||||||
self.subcloud.save()
|
|
@ -279,9 +279,9 @@ class SyncThread(object):
|
|||||||
subcloud_sync.sync_status_report_time, timeutils.utcnow())
|
subcloud_sync.sync_status_report_time, timeutils.utcnow())
|
||||||
if delta < 3600:
|
if delta < 3600:
|
||||||
if subcloud_sync.sync_status_reported == sync_status:
|
if subcloud_sync.sync_status_reported == sync_status:
|
||||||
LOG.info("skip set_sync_status sync_status_reported=%s, sync_status=%s " %
|
LOG.debug("skip set_sync_status sync_status_reported=%s, sync_status=%s " %
|
||||||
(subcloud_sync.sync_status_reported, sync_status, ),
|
(subcloud_sync.sync_status_reported, sync_status, ),
|
||||||
extra=self.log_extra)
|
extra=self.log_extra)
|
||||||
return
|
return
|
||||||
|
|
||||||
LOG.info("{}: set_sync_status {}".format(self.subcloud_name, sync_status),
|
LOG.info("{}: set_sync_status {}".format(self.subcloud_name, sync_status),
|
||||||
@ -351,7 +351,16 @@ class SyncThread(object):
|
|||||||
# we were processing work for it.
|
# we were processing work for it.
|
||||||
raise exceptions.EndpointNotReachable()
|
raise exceptions.EndpointNotReachable()
|
||||||
request.state = consts.ORCH_REQUEST_STATE_IN_PROGRESS
|
request.state = consts.ORCH_REQUEST_STATE_IN_PROGRESS
|
||||||
request.save() # save to DB
|
try:
|
||||||
|
request.save() # save to DB
|
||||||
|
except exceptions.OrchRequestNotFound:
|
||||||
|
# This case is handled in loop below, but should also be
|
||||||
|
# handled here as well.
|
||||||
|
LOG.info("Orch request already deleted request uuid=%s state=%s" %
|
||||||
|
(request.uuid, request.state),
|
||||||
|
extra=self.log_extra)
|
||||||
|
continue
|
||||||
|
|
||||||
retry_count = 0
|
retry_count = 0
|
||||||
while retry_count < self.MAX_RETRY:
|
while retry_count < self.MAX_RETRY:
|
||||||
try:
|
try:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user