Prepare DC queries for sqlalchemy 1.4 upversion

For subcloud scalability purposes, sqlalchemy will be upgraded from
version 1.3 to 1.4. This version change introduces some
incompatibilities from the current version:

- Session management: When calling model_query without providing a
session, a new one would be created inside a context manager. This
means that, upon exiting model_query, the query would be executed and
any new filtering after would reopen the session. Version 1.3 was more
forgiving regarding transactions and this was not an issue.
On version 1.4, sessions called after transaction ended will still
remain open [1], causing the pool to eventually overflow with sessions
stuck in "idle in transaction" state. To fix this, the session
creation inside model_query is changed to only start the context
manager, which will then be closed only when exiting the db function
itself by a decorator.

- Changed deprecated methods like the removal of count() method for
tables, joinedload_all and how values are accessed in rows [2].

- Writing in read sessions: Although sqlalchemy 1.4 still supports
  autocommit, it's being deprecated and removed in version 2.0. Since
  we also need to upversion oslo.db and newer versions disable
  autocommit by default, this commit replaces all write operations
  being done inside a read session with a write session.

Test plan:
  - PASS: Build a custom ISO with the changes and deploy a DX system
          controller and a SX subcloud. Verify the system works as
          expected.
  - PASS: Manage a subcloud and verify the sync_status is "in-sync".
  - PASS: Soak the system and verify there was no connection leak and
          no sessions stuck in "idle in transaction" state.
  - PASS: Run DC sanity and regression.
  - PASS: Run kube-rootca-update orchestration successfully.
  - PASS: Run subcloud prestage --for-sw-install successfully.
  - PASS: Create a subcloud backup and then restore it successfully.

[1]: https://docs.sqlalchemy.org/en/14/changelog/migration_14.html#session-features-new-autobegin-behavior
[2]: https://docs.sqlalchemy.org/en/14/changelog/migration_14.html#rowproxy-is-no-longer-a-proxy-is-now-called-row-and-behaves-like-an-enhanced-named-tuple

Story: 2011311
Task: 51731

Change-Id: I97c88d8f668e9cfc1d9c5682d43c233193a823d7
Signed-off-by: Victor Romano <victor.gluzromano@windriver.com>
This commit is contained in:
Victor Romano 2025-02-28 15:37:52 -03:00
parent 99291be851
commit 0eda87984f
4 changed files with 316 additions and 41 deletions

View File

@ -13,7 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2019-2022, 2024 Wind River Systems, Inc.
# Copyright (c) 2019-2022, 2024-2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -86,7 +86,7 @@ def get_write_connection():
def row2dict(table, row):
d = {}
for c in table.columns:
c_value = getattr(row, c.name)
c_value = row[c.name]
d[c.name] = c_value
return d

View File

@ -1,5 +1,5 @@
# Copyright (c) 2015 Ericsson AB.
# Copyright (c) 2017-2024 Wind River Systems, Inc.
# Copyright (c) 2017-2025 Wind River Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -19,9 +19,10 @@
Implementation of SQLAlchemy backend.
"""
import functools
import sys
import threading
import eventlet
from oslo_db import exception as db_exc
from oslo_db.exception import DBDuplicateEntry
from oslo_db.sqlalchemy import enginefacade
@ -35,7 +36,7 @@ from sqlalchemy import desc
from sqlalchemy import insert
from sqlalchemy import or_
from sqlalchemy.orm import exc
from sqlalchemy.orm import joinedload_all
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import load_only
from sqlalchemy.sql.expression import true
from sqlalchemy import update
@ -52,7 +53,6 @@ LOG = logging.getLogger(__name__)
_facade = None
_main_context_manager = None
_CONTEXT = threading.local()
def _get_main_context_manager():
@ -71,11 +71,13 @@ def get_session():
def read_session():
return _get_main_context_manager().reader.using(_CONTEXT)
_context = eventlet.greenthread.getcurrent()
return _get_main_context_manager().reader.using(_context)
def write_session():
return _get_main_context_manager().writer.using(_CONTEXT)
_context = eventlet.greenthread.getcurrent()
return _get_main_context_manager().writer.using(_context)
def get_backend():
@ -83,10 +85,63 @@ def get_backend():
return sys.modules[__name__]
def model_query(context, *args):
with read_session() as session:
query = session.query(*args).options(joinedload_all("*"))
return query
def db_session_cleanup(func):
"""Class decorator that automatically adds session cleanup to method"""
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
_context = eventlet.greenthread.getcurrent()
exc_info = (None, None, None)
try:
return func(self, *args, **kwargs)
except Exception:
exc_info = sys.exc_info()
raise
finally:
if (
hasattr(_context, "_db_session_context")
and _context._db_session_context is not None
):
try:
_context._db_session_context.__exit__(*exc_info)
except Exception as e:
LOG.warning(f"Error closing database session: {e}")
# Clear the session
_context._db_session = None
_context._db_session_context = None
return wrapper
# TODO(vgluzrom): Put all db functions inside a class and apply db_session_cleanup
# to the class itself, so it's not necessary for the function caller to apply it
# to each function call.
def model_query(context, *args, **kwargs):
"""Query helper for simpler session usage.
If the session is already provided in the kwargs, use it. Otherwise,
try to get it from thread context. If it's not there, create a new one.
Note: If not providing a session, the calling function need to have
the db_session_cleanup decorator to clean up the session.
:param session: if present, the session to use
"""
session = kwargs.get("session")
if not session:
_context = eventlet.greenthread.getcurrent()
if hasattr(_context, "_db_session") and _context._db_session is not None:
session = _context._db_session
else:
session_context = read_session()
session = session_context.__enter__() # pylint: disable=no-member
_context._db_session = session
# Need to store the session context to call __exit__ method later
_context._db_session_context = session_context
return session.query(*args).options(joinedload("*"))
def _session(context):
@ -147,6 +202,7 @@ def require_context(f):
###################
@db_session_cleanup
@require_context
def subcloud_audits_get(context, subcloud_id):
result = (
@ -162,6 +218,7 @@ def subcloud_audits_get(context, subcloud_id):
return result
@db_session_cleanup
@require_context
def subcloud_audits_get_all(context, subcloud_ids=None):
"""Get subcloud_audits info for subclouds
@ -179,6 +236,7 @@ def subcloud_audits_get_all(context, subcloud_ids=None):
return query.all()
@db_session_cleanup
@require_context
def subcloud_audits_update_all(context, values):
with write_session() as session:
@ -188,6 +246,7 @@ def subcloud_audits_update_all(context, values):
return result
@db_session_cleanup
@require_admin_context
def subcloud_audits_create(context, subcloud_id):
with write_session() as session:
@ -197,6 +256,7 @@ def subcloud_audits_create(context, subcloud_id):
return subcloud_audits_ref
@db_session_cleanup
@require_admin_context
def subcloud_audits_update(context, subcloud_id, values):
with write_session() as session:
@ -206,6 +266,7 @@ def subcloud_audits_update(context, subcloud_id, values):
return subcloud_audits_ref
@db_session_cleanup
@require_context
def subcloud_audits_get_all_need_audit(context, last_audit_threshold):
with read_session() as session:
@ -242,6 +303,7 @@ def subcloud_audits_get_all_need_audit(context, last_audit_threshold):
# by the DB server. If server time is in UTC func.now() might work.
@db_session_cleanup
@require_context
def subcloud_audits_get_and_start_audit(context, subcloud_id):
with write_session() as session:
@ -251,6 +313,7 @@ def subcloud_audits_get_and_start_audit(context, subcloud_id):
return subcloud_audits_ref
@db_session_cleanup
@require_context
def subcloud_audits_bulk_end_audit(context, audits_finished):
"""Update the subcloud's audit end status in a bulk request
@ -286,19 +349,23 @@ def subcloud_audits_bulk_end_audit(context, audits_finished):
return session.bulk_update_mappings(models.SubcloudAudits, update_list)
@db_session_cleanup
@require_context
def subcloud_audits_bulk_update_audit_finished_at(context, subcloud_ids):
values = {"audit_finished_at": timeutils.utcnow()}
with write_session():
model_query(context, models.SubcloudAudits).filter_by(deleted=0).filter(
models.SubcloudAudits.subcloud_id.in_(subcloud_ids)
).update(values, synchronize_session="fetch")
with write_session() as session:
model_query(context, models.SubcloudAudits, session=session).filter_by(
deleted=0
).filter(models.SubcloudAudits.subcloud_id.in_(subcloud_ids)).update(
values, synchronize_session="fetch"
)
# Find and fix up subcloud audits where the audit has taken too long.
# We want to find subclouds that started an audit but never finished
# it and update the "finished at" timestamp to be the same as
# the "started at" timestamp. Returns the number of rows updated.
@db_session_cleanup
@require_context
def subcloud_audits_fix_expired_audits(
context, last_audit_threshold, trigger_audits=False
@ -330,14 +397,10 @@ def subcloud_audits_fix_expired_audits(
###################
@db_session_cleanup
@require_context
def subcloud_get(context, subcloud_id):
result = (
model_query(context, models.Subcloud)
.filter_by(deleted=0)
.filter_by(id=subcloud_id)
.first()
)
result = model_query(context, models.Subcloud).get(subcloud_id)
if not result:
raise exception.SubcloudNotFound(subcloud_id=subcloud_id)
@ -345,6 +408,7 @@ def subcloud_get(context, subcloud_id):
return result
@db_session_cleanup
@require_context
def subcloud_get_with_status(context, subcloud_id):
result = (
@ -366,6 +430,7 @@ def subcloud_get_with_status(context, subcloud_id):
return result
@db_session_cleanup
@require_context
def subcloud_get_by_name(context, name):
result = (
@ -381,6 +446,7 @@ def subcloud_get_by_name(context, name):
return result
@db_session_cleanup
@require_context
def subcloud_get_by_region_name(context, region_name):
result = (
@ -396,6 +462,7 @@ def subcloud_get_by_region_name(context, region_name):
return result
@db_session_cleanup
@require_context
def subcloud_get_by_name_or_region_name(context, name):
result = (
@ -411,11 +478,13 @@ def subcloud_get_by_name_or_region_name(context, name):
return result
@db_session_cleanup
@require_context
def subcloud_get_all(context):
return model_query(context, models.Subcloud).filter_by(deleted=0).all()
@db_session_cleanup
@require_context
def subcloud_get_all_by_group_id(context, group_id):
"""Retrieve all subclouds that belong to the specified group id"""
@ -428,6 +497,7 @@ def subcloud_get_all_by_group_id(context, group_id):
)
@db_session_cleanup
def subcloud_get_all_ordered_by_id(context):
return (
model_query(context, models.Subcloud)
@ -437,6 +507,7 @@ def subcloud_get_all_ordered_by_id(context):
)
@db_session_cleanup
@require_context
def subcloud_get_all_with_status(context):
result = (
@ -458,6 +529,7 @@ def subcloud_get_all_with_status(context):
return result
@db_session_cleanup
@require_context
def subcloud_get_all_valid_for_strategy_step_creation(
context,
@ -511,6 +583,7 @@ def subcloud_get_all_valid_for_strategy_step_creation(
return query.all()
@db_session_cleanup
@require_context
def subcloud_count_invalid_for_strategy_type(
context, endpoint_type, group_id=None, subcloud_name=None, force=False
@ -555,6 +628,7 @@ def subcloud_count_invalid_for_strategy_type(
return query.count()
@db_session_cleanup
@require_admin_context
def subcloud_create(
context,
@ -604,6 +678,7 @@ def subcloud_create(
return subcloud_ref
@db_session_cleanup
@require_admin_context
def subcloud_update(
context,
@ -702,14 +777,18 @@ def subcloud_update(
return subcloud_ref
@db_session_cleanup
@require_admin_context
def subcloud_bulk_update_by_ids(context, subcloud_ids, update_form):
with write_session():
model_query(context, models.Subcloud).filter_by(deleted=0).filter(
models.Subcloud.id.in_(subcloud_ids)
).update(update_form, synchronize_session="fetch")
with write_session() as session:
model_query(context, models.Subcloud, session=session).filter_by(
deleted=0
).filter(models.Subcloud.id.in_(subcloud_ids)).update(
update_form, synchronize_session="fetch"
)
@db_session_cleanup
@require_admin_context
def subcloud_destroy(context, subcloud_id):
with write_session() as session:
@ -720,6 +799,7 @@ def subcloud_destroy(context, subcloud_id):
##########################
@db_session_cleanup
@require_context
def subcloud_status_get(context, subcloud_id, endpoint_type):
result = (
@ -738,6 +818,7 @@ def subcloud_status_get(context, subcloud_id, endpoint_type):
return result
@db_session_cleanup
@require_context
def subcloud_status_get_all(context, subcloud_id):
return (
@ -749,6 +830,7 @@ def subcloud_status_get_all(context, subcloud_id):
)
@db_session_cleanup
@require_context
def _subcloud_status_get_by_endpoint_types(context, subcloud_id, endpoint_types):
return (
@ -760,6 +842,7 @@ def _subcloud_status_get_by_endpoint_types(context, subcloud_id, endpoint_types)
)
@db_session_cleanup
@require_context
def subcloud_status_get_all_by_name(context, name):
return (
@ -771,6 +854,7 @@ def subcloud_status_get_all_by_name(context, name):
)
@db_session_cleanup
@require_admin_context
def subcloud_status_create(context, subcloud_id, endpoint_type):
with write_session() as session:
@ -782,6 +866,7 @@ def subcloud_status_create(context, subcloud_id, endpoint_type):
return subcloud_status_ref
@db_session_cleanup
@require_admin_context
def subcloud_status_create_all(context, subcloud_id):
with write_session() as session:
@ -793,6 +878,7 @@ def subcloud_status_create_all(context, subcloud_id):
session.add(subcloud_status_ref)
@db_session_cleanup
@require_admin_context
def subcloud_status_delete(context, subcloud_id, endpoint_type):
with write_session() as session:
@ -800,6 +886,7 @@ def subcloud_status_delete(context, subcloud_id, endpoint_type):
session.delete(subcloud_status_ref)
@db_session_cleanup
@require_admin_context
def subcloud_status_update(context, subcloud_id, endpoint_type, sync_status):
with write_session() as session:
@ -809,6 +896,7 @@ def subcloud_status_update(context, subcloud_id, endpoint_type, sync_status):
return subcloud_status_ref
@db_session_cleanup
@require_admin_context
def subcloud_status_update_endpoints(
context, subcloud_id, endpoint_type_list, sync_status
@ -833,6 +921,7 @@ def subcloud_status_update_endpoints(
return result
@db_session_cleanup
@require_admin_context
def subcloud_status_bulk_update_endpoints(context, subcloud_id, endpoint_list):
"""Update the status of the specified endpoints for a subcloud
@ -877,6 +966,7 @@ def subcloud_status_bulk_update_endpoints(context, subcloud_id, endpoint_list):
return result
@db_session_cleanup
@require_admin_context
def subcloud_status_destroy_all(context, subcloud_id):
with write_session() as session:
@ -893,6 +983,7 @@ def subcloud_status_destroy_all(context, subcloud_id):
###################
@db_session_cleanup
@require_context
def sw_update_strategy_get(context, update_type=None):
query = model_query(context, models.SwUpdateStrategy).filter_by(deleted=0)
@ -905,6 +996,7 @@ def sw_update_strategy_get(context, update_type=None):
return result
@db_session_cleanup
@require_admin_context
def sw_update_strategy_create(
context,
@ -928,6 +1020,7 @@ def sw_update_strategy_create(
return sw_update_strategy_ref
@db_session_cleanup
@require_admin_context
def sw_update_strategy_update(
context, state=None, update_type=None, additional_args=None
@ -950,6 +1043,7 @@ def sw_update_strategy_update(
return sw_update_strategy_ref
@db_session_cleanup
@require_admin_context
def sw_update_strategy_destroy(context, update_type=None):
with write_session() as session:
@ -962,6 +1056,7 @@ def sw_update_strategy_destroy(context, update_type=None):
##########################
@db_session_cleanup
@require_context
def sw_update_opts_get(context, subcloud_id):
result = (
@ -975,6 +1070,7 @@ def sw_update_opts_get(context, subcloud_id):
return result
@db_session_cleanup
@require_context
def sw_update_opts_get_all_plus_subcloud_info(context):
result = (
@ -992,6 +1088,7 @@ def sw_update_opts_get_all_plus_subcloud_info(context):
return result
@db_session_cleanup
@require_admin_context
def sw_update_opts_create(
context,
@ -1014,6 +1111,7 @@ def sw_update_opts_create(
return sw_update_opts_ref
@db_session_cleanup
@require_admin_context
def sw_update_opts_update(
context,
@ -1040,6 +1138,7 @@ def sw_update_opts_update(
return sw_update_opts_ref
@db_session_cleanup
@require_admin_context
def sw_update_opts_destroy(context, subcloud_id):
with write_session() as session:
@ -1050,6 +1149,7 @@ def sw_update_opts_destroy(context, subcloud_id):
##########################
@db_session_cleanup
@require_context
def sw_update_opts_default_get(context):
result = (
@ -1060,6 +1160,7 @@ def sw_update_opts_default_get(context):
return result
@db_session_cleanup
@require_admin_context
def sw_update_opts_default_create(
context,
@ -1081,6 +1182,7 @@ def sw_update_opts_default_create(
return sw_update_opts_default_ref
@db_session_cleanup
@require_admin_context
def sw_update_opts_default_update(
context,
@ -1106,6 +1208,7 @@ def sw_update_opts_default_update(
return sw_update_opts_default_ref
@db_session_cleanup
@require_admin_context
def sw_update_opts_default_destroy(context):
with write_session() as session:
@ -1116,6 +1219,7 @@ def sw_update_opts_default_destroy(context):
##########################
# system peer
##########################
@db_session_cleanup
@require_context
def system_peer_get(context, peer_id):
try:
@ -1135,6 +1239,7 @@ def system_peer_get(context, peer_id):
return result
@db_session_cleanup
@require_context
def system_peer_get_by_name(context, name):
try:
@ -1155,6 +1260,7 @@ def system_peer_get_by_name(context, name):
return result
@db_session_cleanup
@require_context
def system_peer_get_by_uuid(context, uuid):
try:
@ -1175,6 +1281,7 @@ def system_peer_get_by_uuid(context, uuid):
return result
@db_session_cleanup
@require_context
def system_peer_get_all(context):
result = (
@ -1188,6 +1295,7 @@ def system_peer_get_all(context):
# This method returns all subcloud peer groups for a particular system peer
@db_session_cleanup
@require_context
def peer_group_get_for_system_peer(context, peer_id):
return (
@ -1203,6 +1311,7 @@ def peer_group_get_for_system_peer(context, peer_id):
)
@db_session_cleanup
@require_admin_context
def system_peer_create(
context,
@ -1237,6 +1346,7 @@ def system_peer_create(
return system_peer_ref
@db_session_cleanup
@require_admin_context
def system_peer_update(
context,
@ -1286,6 +1396,7 @@ def system_peer_update(
return system_peer_ref
@db_session_cleanup
@require_admin_context
def system_peer_destroy(context, peer_id):
with write_session() as session:
@ -1296,6 +1407,7 @@ def system_peer_destroy(context, peer_id):
##########################
# subcloud group
##########################
@db_session_cleanup
@require_context
def subcloud_group_get(context, group_id):
try:
@ -1315,6 +1427,7 @@ def subcloud_group_get(context, group_id):
return result
@db_session_cleanup
@require_context
def subcloud_group_get_by_name(context, name):
try:
@ -1336,6 +1449,7 @@ def subcloud_group_get_by_name(context, name):
# This method returns all subclouds for a particular subcloud group
@db_session_cleanup
@require_context
def subcloud_get_for_group(context, group_id):
return (
@ -1347,6 +1461,7 @@ def subcloud_get_for_group(context, group_id):
)
@db_session_cleanup
@require_context
def subcloud_group_get_all(context):
result = (
@ -1359,6 +1474,7 @@ def subcloud_group_get_all(context):
return result
@db_session_cleanup
@require_admin_context
def subcloud_group_create(
context, name, description, update_apply_type, max_parallel_subclouds
@ -1373,6 +1489,7 @@ def subcloud_group_create(
return subcloud_group_ref
@db_session_cleanup
@require_admin_context
def subcloud_group_update(
context,
@ -1402,6 +1519,7 @@ def subcloud_group_update(
return subcloud_group_ref
@db_session_cleanup
@require_admin_context
def subcloud_group_destroy(context, group_id):
with write_session() as session:
@ -1446,6 +1564,7 @@ def initialize_subcloud_group_default(engine):
##########################
# subcloud peer group
##########################
@db_session_cleanup
@require_context
def subcloud_peer_group_get(context, group_id):
try:
@ -1465,6 +1584,7 @@ def subcloud_peer_group_get(context, group_id):
return result
@db_session_cleanup
@require_context
def subcloud_get_for_peer_group(context, peer_group_id):
"""Get all subclouds for a subcloud peer group.
@ -1481,6 +1601,7 @@ def subcloud_get_for_peer_group(context, peer_group_id):
)
@db_session_cleanup
@require_context
def subcloud_peer_group_get_all(context):
result = (
@ -1493,6 +1614,7 @@ def subcloud_peer_group_get_all(context):
return result
@db_session_cleanup
@require_context
def subcloud_peer_group_get_by_name(context, name):
try:
@ -1513,6 +1635,7 @@ def subcloud_peer_group_get_by_name(context, name):
return result
@db_session_cleanup
@require_context
def subcloud_peer_group_get_by_leader_id(context, system_leader_id):
result = (
@ -1526,6 +1649,7 @@ def subcloud_peer_group_get_by_leader_id(context, system_leader_id):
return result
@db_session_cleanup
@require_admin_context
def subcloud_peer_group_create(
context,
@ -1550,6 +1674,7 @@ def subcloud_peer_group_create(
return subcloud_peer_group_ref
@db_session_cleanup
@require_admin_context
def subcloud_peer_group_destroy(context, group_id):
with write_session() as session:
@ -1557,6 +1682,7 @@ def subcloud_peer_group_destroy(context, group_id):
session.delete(subcloud_peer_group_ref)
@db_session_cleanup
@require_admin_context
def subcloud_peer_group_update(
context,
@ -1598,6 +1724,7 @@ def subcloud_peer_group_update(
##########################
# peer group association
##########################
@db_session_cleanup
@require_admin_context
def peer_group_association_create(
context,
@ -1620,6 +1747,7 @@ def peer_group_association_create(
return peer_group_association_ref
@db_session_cleanup
@require_admin_context
def peer_group_association_update(
context, associate_id, peer_group_priority=None, sync_status=None, sync_message=None
@ -1639,6 +1767,7 @@ def peer_group_association_update(
return association_ref
@db_session_cleanup
@require_admin_context
def peer_group_association_destroy(context, association_id):
with write_session() as session:
@ -1646,6 +1775,7 @@ def peer_group_association_destroy(context, association_id):
session.delete(association_ref)
@db_session_cleanup
@require_context
def peer_group_association_get(context, association_id) -> models.PeerGroupAssociation:
try:
@ -1665,6 +1795,7 @@ def peer_group_association_get(context, association_id) -> models.PeerGroupAssoc
return result
@db_session_cleanup
@require_context
def peer_group_association_get_all(context) -> list[models.PeerGroupAssociation]:
result = (
@ -1679,6 +1810,7 @@ def peer_group_association_get_all(context) -> list[models.PeerGroupAssociation]
# Each combination of 'peer_group_id' and 'system_peer_id' is unique
# and appears only once in the entries.
@db_session_cleanup
@require_context
def peer_group_association_get_by_peer_group_and_system_peer_id(
context, peer_group_id, system_peer_id
@ -1704,6 +1836,7 @@ def peer_group_association_get_by_peer_group_and_system_peer_id(
return result
@db_session_cleanup
@require_context
def peer_group_association_get_by_peer_group_id(
context, peer_group_id
@ -1719,6 +1852,7 @@ def peer_group_association_get_by_peer_group_id(
return result
@db_session_cleanup
@require_context
def peer_group_association_get_by_system_peer_id(
context, system_peer_id
@ -1737,6 +1871,7 @@ def peer_group_association_get_by_system_peer_id(
##########################
@db_session_cleanup
@require_context
def strategy_step_get(context, subcloud_id):
result = (
@ -1752,6 +1887,7 @@ def strategy_step_get(context, subcloud_id):
return result
@db_session_cleanup
@require_context
def strategy_step_get_by_name(context, name):
result = (
@ -1768,6 +1904,7 @@ def strategy_step_get_by_name(context, name):
return result
@db_session_cleanup
@require_context
def strategy_step_get_all(context):
result = (
@ -1780,6 +1917,7 @@ def strategy_step_get_all(context):
return result
@db_session_cleanup
@require_admin_context
def strategy_step_bulk_create(context, subcloud_ids, stage, state, details):
"""Creates the strategy step for a list of subclouds
@ -1807,6 +1945,7 @@ def strategy_step_bulk_create(context, subcloud_ids, stage, state, details):
return session.execute(insert(models.StrategyStep), strategy_steps)
@db_session_cleanup
@require_admin_context
def strategy_step_create(context, subcloud_id, stage, state, details):
with write_session() as session:
@ -1819,6 +1958,7 @@ def strategy_step_create(context, subcloud_id, stage, state, details):
return strategy_step_ref
@db_session_cleanup
@require_admin_context
def strategy_step_update(
context,
@ -1845,6 +1985,7 @@ def strategy_step_update(
return strategy_step_ref
@db_session_cleanup
@require_admin_context
def strategy_step_update_all(context, filters, values):
"""Updates all strategy steps
@ -1865,6 +2006,7 @@ def strategy_step_update_all(context, filters, values):
query.update(values)
@db_session_cleanup
@require_admin_context
def strategy_step_destroy_all(context):
with write_session() as session:
@ -1918,6 +2060,7 @@ def add_identity_filter(query, value, use_name=None):
return query.filter_by(name=value)
@db_session_cleanup
@require_context
def _subcloud_alarms_get(context, name):
query = model_query(context, models.SubcloudAlarmSummary).filter_by(deleted=0)
@ -1933,11 +2076,13 @@ def _subcloud_alarms_get(context, name):
)
@db_session_cleanup
@require_context
def subcloud_alarms_get(context, name):
return _subcloud_alarms_get(context, name)
@db_session_cleanup
@require_context
def subcloud_alarms_get_all(context, name=None):
query = model_query(context, models.SubcloudAlarmSummary).filter_by(deleted=0)
@ -1948,6 +2093,7 @@ def subcloud_alarms_get_all(context, name=None):
return query.order_by(desc(models.SubcloudAlarmSummary.id)).all()
@db_session_cleanup
@require_admin_context
def subcloud_alarms_create(context, name, values):
with write_session() as session:
@ -1963,6 +2109,7 @@ def subcloud_alarms_create(context, name, values):
return result
@db_session_cleanup
@require_admin_context
def subcloud_alarms_update(context, name, values):
with write_session() as session:
@ -1972,12 +2119,14 @@ def subcloud_alarms_update(context, name, values):
return result
@db_session_cleanup
@require_admin_context
def subcloud_alarms_delete(context, name):
with write_session() as session:
session.query(models.SubcloudAlarmSummary).filter_by(name=name).delete()
@db_session_cleanup
@require_admin_context
def subcloud_rename_alarms(context, subcloud_name, new_name):
with write_session() as session:

View File

@ -1,5 +1,5 @@
# Copyright (c) 2015 Ericsson AB.
# Copyright (c) 2017-2021, 2023-2024 Wind River Systems, Inc.
# Copyright (c) 2017-2021, 2023-2025 Wind River Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -21,9 +21,10 @@ Implementation of SQLAlchemy backend.
"""
import datetime
import functools
import sys
import threading
import eventlet
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import enginefacade
from oslo_log import log as logging
@ -36,7 +37,7 @@ from sqlalchemy import desc
from sqlalchemy import or_
from sqlalchemy.orm.exc import MultipleResultsFound
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm import joinedload_all
from sqlalchemy.orm import joinedload
from sqlalchemy import select
from sqlalchemy import update
@ -51,7 +52,6 @@ LOG = logging.getLogger(__name__)
_facade = None
_main_context_manager = None
_CONTEXT = threading.local()
def _get_main_context_manager():
@ -70,11 +70,13 @@ def get_session():
def read_session():
return _get_main_context_manager().reader.using(_CONTEXT)
_context = eventlet.greenthread.getcurrent()
return _get_main_context_manager().reader.using(_context)
def write_session():
return _get_main_context_manager().writer.using(_CONTEXT)
_context = eventlet.greenthread.getcurrent()
return _get_main_context_manager().writer.using(_context)
_DEFAULT_QUOTA_NAME = "default"
@ -85,13 +87,63 @@ def get_backend():
return sys.modules[__name__]
def db_session_cleanup(func):
"""Class decorator that automatically adds session cleanup to method"""
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
_context = eventlet.greenthread.getcurrent()
exc_info = (None, None, None)
try:
return func(self, *args, **kwargs)
except Exception:
exc_info = sys.exc_info()
raise
finally:
if (
hasattr(_context, "_db_session_context")
and _context._db_session_context is not None
):
try:
_context._db_session_context.__exit__(*exc_info)
except Exception as e:
LOG.warning(f"Error closing database session: {e}")
# Clear the session
_context._db_session = None
_context._db_session_context = None
return wrapper
# TODO(vgluzrom): Put all db functions inside a class and apply db_session_cleanup
# to the class itself, so it's not necessary for the function caller to apply it
# to each function call.
def model_query(context, *args, **kwargs):
"""Query helper for simpler session usage.
If the session is already provided in the kwargs, use it. Otherwise,
try to get it from thread context. If it's not there, create a new one.
Note: If not providing a session, the calling function need to have
the db_session_cleanup decorator to clean up the session.
:param session: if present, the session to use
"""
session = kwargs.get("session")
if session:
return session.query(*args).options(joinedload_all("*"))
else:
with read_session() as session:
return session.query(*args).options(joinedload_all("*"))
if not session:
_context = eventlet.greenthread.getcurrent()
if hasattr(_context, "_db_session") and _context._db_session is not None:
session = _context._db_session
else:
session_context = read_session()
session = session_context.__enter__() # pylint: disable=no-member
_context._db_session = session
# Need to store the session context to call __exit__ method later
_context._db_session_context = session_context
return session.query(*args).options(joinedload("*"))
def _session(context):
@ -152,10 +204,11 @@ def require_context(f):
###################
@db_session_cleanup
@require_context
def _quota_get(context, project_id, resource, session=None):
result = (
model_query(context, models.Quota)
model_query(context, models.Quota, session=session)
.filter_by(project_id=project_id)
.filter_by(resource=resource)
.first()
@ -167,11 +220,13 @@ def _quota_get(context, project_id, resource, session=None):
return result
@db_session_cleanup
@require_context
def quota_get(context, project_id, resource):
return _quota_get(context, project_id, resource)
@db_session_cleanup
@require_context
def quota_get_all_by_project(context, project_id):
rows = model_query(context, models.Quota).filter_by(project_id=project_id).all()
@ -181,6 +236,7 @@ def quota_get_all_by_project(context, project_id):
return result
@db_session_cleanup
@require_admin_context
def quota_create(context, project_id, resource, limit):
with write_session() as session:
@ -192,6 +248,7 @@ def quota_create(context, project_id, resource, limit):
return quota_ref
@db_session_cleanup
@require_admin_context
def quota_update(context, project_id, resource, limit):
with write_session() as session:
@ -203,6 +260,7 @@ def quota_update(context, project_id, resource, limit):
return quota_ref
@db_session_cleanup
@require_admin_context
def quota_destroy(context, project_id, resource):
with write_session() as session:
@ -212,12 +270,15 @@ def quota_destroy(context, project_id, resource):
session.delete(quota_ref)
@db_session_cleanup
@require_admin_context
def quota_destroy_all(context, project_id):
with write_session() as session:
quotas = (
model_query(context, models.Quota).filter_by(project_id=project_id).all()
model_query(context, models.Quota, session=session)
.filter_by(project_id=project_id)
.all()
)
if not quotas:
@ -230,6 +291,7 @@ def quota_destroy_all(context, project_id):
##########################
@db_session_cleanup
@require_context
def _quota_class_get(context, class_name, resource):
result = (
@ -246,16 +308,19 @@ def _quota_class_get(context, class_name, resource):
return result
@db_session_cleanup
@require_context
def quota_class_get(context, class_name, resource):
return _quota_class_get(context, class_name, resource)
@db_session_cleanup
@require_context
def quota_class_get_default(context):
return quota_class_get_all_by_name(context, _DEFAULT_QUOTA_NAME)
@db_session_cleanup
@require_context
def quota_class_get_all_by_name(context, class_name):
rows = (
@ -272,6 +337,7 @@ def quota_class_get_all_by_name(context, class_name):
return result
@db_session_cleanup
@require_admin_context
def quota_class_create(context, class_name, resource, limit):
with write_session() as session:
@ -283,6 +349,7 @@ def quota_class_create(context, class_name, resource, limit):
return quota_class_ref
@db_session_cleanup
@require_admin_context
def quota_class_update(context, class_name, resource, limit):
with write_session() as session:
@ -300,6 +367,7 @@ def quota_class_update(context, class_name, resource, limit):
return quota_class_ref
@db_session_cleanup
@require_admin_context
def quota_class_destroy_all(context, class_name):
with write_session() as session:
@ -326,6 +394,7 @@ def db_version(engine):
return migration.db_version(engine)
@db_session_cleanup
def service_create(context, service_id, host=None, binary=None, topic=None):
with write_session() as session:
time_now = timeutils.utcnow()
@ -341,6 +410,7 @@ def service_create(context, service_id, host=None, binary=None, topic=None):
return svc
@db_session_cleanup
def service_update(context, service_id, values=None):
with write_session() as session:
service = session.query(models.Service).get(service_id)
@ -356,6 +426,7 @@ def service_update(context, service_id, values=None):
return service
@db_session_cleanup
def service_delete(context, service_id):
with write_session() as session:
session.query(models.Service).filter_by(id=service_id).delete(
@ -363,10 +434,12 @@ def service_delete(context, service_id):
)
@db_session_cleanup
def service_get(context, service_id):
return model_query(context, models.Service).get(service_id)
@db_session_cleanup
def service_get_all(context):
return model_query(context, models.Service).all()
@ -425,6 +498,7 @@ def add_filter_by_many_identities(query, model, values):
)
@db_session_cleanup
@require_context
def _subcloud_get(context, region_id, session=None):
query = model_query(context, models.Subcloud, session=session).filter_by(deleted=0)
@ -440,11 +514,13 @@ def _subcloud_get(context, region_id, session=None):
)
@db_session_cleanup
@require_context
def subcloud_get(context, region_id):
return _subcloud_get(context, region_id)
@db_session_cleanup
@require_context
def subcloud_get_all(
context,
@ -466,6 +542,7 @@ def subcloud_get_all(
return query.all()
@db_session_cleanup
@require_context
def subcloud_capabilities_get_all(
context,
@ -493,6 +570,7 @@ def subcloud_capabilities_get_all(
}
@db_session_cleanup
@require_context
def subcloud_sync_update_all_to_in_progress(
context, management_state, availability_status, initial_sync_state, sync_requests
@ -542,6 +620,7 @@ def subcloud_sync_update_all_to_in_progress(
return updated_rows
@db_session_cleanup
@require_context
def subcloud_audit_update_last_audit_time(context, subcloud_name):
"""Updates the last audit time for all rows of a subcloud"""
@ -552,6 +631,7 @@ def subcloud_audit_update_last_audit_time(context, subcloud_name):
).update({models.SubcloudSync.last_audit_time: timeutils.utcnow()})
@db_session_cleanup
@require_context
def subcloud_audit_update_all_to_in_progress(
context, management_state, availability_status, initial_sync_state, audit_interval
@ -616,6 +696,7 @@ def subcloud_audit_update_all_to_in_progress(
return updated_rows
@db_session_cleanup
@require_admin_context
def subcloud_create(context, region_name, values):
with write_session() as session:
@ -631,6 +712,7 @@ def subcloud_create(context, region_name, values):
return result
@db_session_cleanup
@require_admin_context
def subcloud_update(context, region_name, values):
with write_session() as session:
@ -640,6 +722,7 @@ def subcloud_update(context, region_name, values):
return result
@db_session_cleanup
@require_admin_context
def subcloud_delete(context, region_name):
with write_session() as session:
@ -656,6 +739,7 @@ def subcloud_delete(context, region_name):
raise exception.SubcloudNotFound(region_name=region_name)
@db_session_cleanup
@require_admin_context
def subcloud_update_initial_state(
context, region_name, pre_initial_sync_state, initial_sync_state
@ -670,6 +754,7 @@ def subcloud_update_initial_state(
return result
@db_session_cleanup
@require_admin_context
def subcloud_update_all_initial_state(
context, pre_initial_sync_state, initial_sync_state
@ -684,6 +769,7 @@ def subcloud_update_all_initial_state(
return updated_count
@db_session_cleanup
@require_context
def _resource_get(context, resource_type, master_id, session):
query = model_query(context, models.Resource, session=session).filter_by(deleted=0)
@ -702,12 +788,14 @@ def _resource_get(context, resource_type, master_id, session):
)
@db_session_cleanup
@require_context
def resource_get_by_type_and_master_id(context, resource_type, master_id):
with read_session() as session:
return _resource_get(context, resource_type, master_id, session)
@db_session_cleanup
@require_context
def resource_get_by_id(context, resource_id, session=None):
query = model_query(context, models.Resource, session=session).filter_by(deleted=0)
@ -718,6 +806,7 @@ def resource_get_by_id(context, resource_id, session=None):
raise exception.ResourceNotFound(id=resource_id)
@db_session_cleanup
@require_context
def resource_get_all(context, resource_type=None):
query = model_query(context, models.Resource).filter_by(deleted=0)
@ -728,6 +817,7 @@ def resource_get_all(context, resource_type=None):
return query.all()
@db_session_cleanup
@require_admin_context
def resource_create(context, resource_type, values):
with write_session() as session:
@ -740,6 +830,7 @@ def resource_create(context, resource_type, values):
return result
@db_session_cleanup
@require_admin_context
def resource_update(context, resource_id, values):
with write_session() as session:
@ -749,6 +840,7 @@ def resource_update(context, resource_id, values):
return result
@db_session_cleanup
@require_admin_context
def resource_delete(context, resource_type, master_id):
with write_session() as session:
@ -773,6 +865,7 @@ def add_subcloud_resource_filter_by_subcloud(query, value):
return query.filter(models.Subcloud.uuid == value)
@db_session_cleanup
@require_context
def _subcloud_resource_get(context, subcloud_resource_id, session=None):
query = model_query(context, models.SubcloudResource, session=session).filter_by(
@ -785,11 +878,13 @@ def _subcloud_resource_get(context, subcloud_resource_id, session=None):
raise exception.SubcloudResourceNotFound(resource=subcloud_resource_id)
@db_session_cleanup
@require_context
def subcloud_resource_get(context, subcloud_resource_id):
return _subcloud_resource_get(context, subcloud_resource_id)
@db_session_cleanup
@require_context
def subcloud_resources_get_by_subcloud(context, subcloud_id):
query = model_query(context, models.SubcloudResource).filter_by(deleted=0)
@ -803,6 +898,7 @@ def subcloud_resources_get_by_subcloud(context, subcloud_id):
return query.all()
@db_session_cleanup
@require_context
def subcloud_resources_get_by_resource(context, resource_id):
# query by resource id or uuid, not resource master uuid.
@ -817,11 +913,13 @@ def subcloud_resources_get_by_resource(context, resource_id):
return query.all()
@db_session_cleanup
def subcloud_resources_get_all(context):
query = model_query(context, models.SubcloudResource).filter_by(deleted=0)
return query.all()
@db_session_cleanup
@require_context
def subcloud_resource_get_by_resource_and_subcloud(context, resource_id, subcloud_id):
query = (
@ -843,6 +941,7 @@ def subcloud_resource_get_by_resource_and_subcloud(context, resource_id, subclou
)
@db_session_cleanup
@require_admin_context
def subcloud_resource_create(context, subcloud_id, resource_id, values):
with write_session() as session:
@ -861,6 +960,7 @@ def subcloud_resource_create(context, subcloud_id, resource_id, values):
return result
@db_session_cleanup
@require_admin_context
def subcloud_resource_update(context, subcloud_resource_id, values):
with write_session() as session:
@ -870,6 +970,7 @@ def subcloud_resource_update(context, subcloud_resource_id, values):
return result
@db_session_cleanup
@require_admin_context
def subcloud_resource_delete(context, subcloud_resource_id):
with write_session() as session:
@ -889,6 +990,7 @@ def add_orch_job_filter_by_resource(query, value):
return query.filter(models.OrchJob.uuid == value)
@db_session_cleanup
@require_context
def _orch_job_get(context, orch_job_id, session=None):
query = model_query(context, models.OrchJob, session=session).filter_by(deleted=0)
@ -899,11 +1001,13 @@ def _orch_job_get(context, orch_job_id, session=None):
raise exception.OrchJobNotFound(orch_job=orch_job_id)
@db_session_cleanup
@require_context
def orch_job_get(context, orch_job_id):
return _orch_job_get(context, orch_job_id)
@db_session_cleanup
@require_context
def orch_job_get_all(context, resource_id=None):
query = model_query(context, models.OrchJob).filter_by(deleted=0)
@ -917,6 +1021,7 @@ def orch_job_get_all(context, resource_id=None):
return query.all()
@db_session_cleanup
@require_admin_context
def orch_job_create(context, resource_id, endpoint_type, operation_type, values):
with write_session() as session:
@ -938,6 +1043,7 @@ def orch_job_create(context, resource_id, endpoint_type, operation_type, values)
return result
@db_session_cleanup
@require_admin_context
def orch_job_update(context, orch_job_id, values):
with write_session() as session:
@ -947,6 +1053,7 @@ def orch_job_update(context, orch_job_id, values):
return result
@db_session_cleanup
@require_admin_context
def orch_job_delete(context, orch_job_id):
with write_session() as session:
@ -966,6 +1073,7 @@ def add_orch_request_filter_by_resource(query, value):
return query.filter(models.OrchRequest.uuid == value)
@db_session_cleanup
@require_context
def _orch_request_get(context, orch_request_id, session=None):
query = model_query(context, models.OrchRequest, session=session).filter_by(
@ -978,11 +1086,13 @@ def _orch_request_get(context, orch_request_id, session=None):
raise exception.OrchRequestNotFound(orch_request=orch_request_id)
@db_session_cleanup
@require_context
def orch_request_get(context, orch_request_id):
return _orch_request_get(context, orch_request_id)
@db_session_cleanup
@require_context
def orch_request_get_most_recent_failed_request(context):
query = (
@ -997,6 +1107,7 @@ def orch_request_get_most_recent_failed_request(context):
return None
@db_session_cleanup
@require_context
def orch_request_get_all(context, orch_job_id=None):
query = model_query(context, models.OrchRequest).filter_by(deleted=0)
@ -1010,6 +1121,7 @@ def orch_request_get_all(context, orch_job_id=None):
return query.all()
@db_session_cleanup
@require_context
def orch_request_get_by_attrs(
context, endpoint_type, resource_type=None, target_region_name=None, states=None
@ -1047,6 +1159,7 @@ def orch_request_get_by_attrs(
return query
@db_session_cleanup
@require_admin_context
def orch_request_create(context, orch_job_id, target_region_name, values):
with write_session() as session:
@ -1065,6 +1178,7 @@ def orch_request_create(context, orch_job_id, target_region_name, values):
return result
@db_session_cleanup
def orch_request_create_bulk(context, orch_requests):
for request in orch_requests:
if "orch_job_id" not in request:
@ -1082,6 +1196,7 @@ def orch_request_create_bulk(context, orch_requests):
session.bulk_insert_mappings(models.OrchRequest, orch_requests)
@db_session_cleanup
@require_admin_context
def orch_request_update(context, orch_request_id, values):
with write_session() as session:
@ -1091,6 +1206,7 @@ def orch_request_update(context, orch_request_id, values):
return result
@db_session_cleanup
@require_admin_context
def orch_request_destroy(context, orch_request_id):
with write_session() as session:
@ -1103,6 +1219,7 @@ def orch_request_destroy(context, orch_request_id):
session.delete(orch_request_ref)
@db_session_cleanup
@require_admin_context
def orch_request_delete_by_subcloud(context, region_name):
"""Delete all orch_request entries for a given subcloud.
@ -1116,6 +1233,7 @@ def orch_request_delete_by_subcloud(context, region_name):
).delete()
@db_session_cleanup
@require_admin_context
def orch_request_delete_previous_failed_requests(context, delete_timestamp):
"""Soft delete orch_request entries.
@ -1136,6 +1254,7 @@ def orch_request_delete_previous_failed_requests(context, delete_timestamp):
LOG.info("%d previously failed sync requests soft deleted", count)
@db_session_cleanup
@require_admin_context
def purge_deleted_records(context, age_in_days):
deleted_age = timeutils.utcnow() - datetime.timedelta(days=age_in_days)
@ -1182,6 +1301,7 @@ def purge_deleted_records(context, age_in_days):
LOG.info("%d records were purged from resource table.", count)
@db_session_cleanup
def _subcloud_sync_get(context, subcloud_name, endpoint_type, session=None):
query = (
model_query(context, models.SubcloudSync, session=session)
@ -1202,10 +1322,12 @@ def _subcloud_sync_get(context, subcloud_name, endpoint_type, session=None):
raise exception.InvalidParameterValue(err=err)
@db_session_cleanup
def subcloud_sync_get(context, subcloud_name, endpoint_type):
return _subcloud_sync_get(context, subcloud_name, endpoint_type)
@db_session_cleanup
def subcloud_sync_create(context, subcloud_name, endpoint_type, values):
with write_session() as session:
result = models.SubcloudSync()
@ -1221,6 +1343,7 @@ def subcloud_sync_create(context, subcloud_name, endpoint_type, values):
return result
@db_session_cleanup
def subcloud_sync_update(context, subcloud_name, endpoint_type, values):
with write_session() as session:
result = _subcloud_sync_get(context, subcloud_name, endpoint_type, session)
@ -1229,6 +1352,7 @@ def subcloud_sync_update(context, subcloud_name, endpoint_type, values):
return result
@db_session_cleanup
def subcloud_sync_update_all_except_in_progress(
context, management_state, endpoint_type, values
):
@ -1254,6 +1378,7 @@ def subcloud_sync_update_all_except_in_progress(
return result.rowcount
@db_session_cleanup
def subcloud_sync_delete(context, subcloud_name, endpoint_type):
with write_session() as session:
results = (

View File

@ -1,4 +1,4 @@
# Copyright (c) 2017-2018, 2024 Wind River Inc.
# Copyright (c) 2017-2018, 2024-2025 Wind River Inc.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -109,7 +109,8 @@ def upgrade(migrate_engine):
raise
rows = (
quota_classes.count() # pylint: disable=no-value-for-parameter
sqlalchemy.select([sqlalchemy.func.count()])
.select_from(quota_classes)
.where(quota_classes.c.class_name == "default")
.execute()
.scalar()