Merge "Fix migration 112 to use live_data_migration API"
This commit is contained in:
commit
c1bee6c204
@ -205,7 +205,10 @@ class HostCommands(object):
|
||||
class DbCommands(object):
|
||||
"""Class for managing the database."""
|
||||
|
||||
online_migrations = ()
|
||||
online_migrations = (
|
||||
# Added in Queens
|
||||
db.service_uuids_online_data_migration,
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
pass
|
||||
@ -250,13 +253,13 @@ class DbCommands(object):
|
||||
"logs for more details."))
|
||||
sys.exit(1)
|
||||
|
||||
def _run_migration(self, ctxt, max_count, ignore_state):
|
||||
def _run_migration(self, ctxt, max_count):
|
||||
ran = 0
|
||||
migrations = {}
|
||||
for migration_meth in self.online_migrations:
|
||||
count = max_count - ran
|
||||
try:
|
||||
found, done = migration_meth(ctxt, count, ignore_state)
|
||||
found, done = migration_meth(ctxt, count)
|
||||
except Exception:
|
||||
print(_("Error attempting to run %(method)s") %
|
||||
{'method': migration_meth.__name__})
|
||||
@ -283,11 +286,7 @@ class DbCommands(object):
|
||||
|
||||
@args('--max_count', metavar='<number>', dest='max_count', type=int,
|
||||
help='Maximum number of objects to consider.')
|
||||
@args('--ignore_state', action='store_true', dest='ignore_state',
|
||||
help='Force records to migrate even if another operation is '
|
||||
'performed on them. This may be dangerous, please refer to '
|
||||
'release notes for more information.')
|
||||
def online_data_migrations(self, max_count=None, ignore_state=False):
|
||||
def online_data_migrations(self, max_count=None):
|
||||
"""Perform online data migrations for the release in batches."""
|
||||
ctxt = context.get_admin_context()
|
||||
if max_count is not None:
|
||||
@ -303,19 +302,18 @@ class DbCommands(object):
|
||||
ran = None
|
||||
migration_info = {}
|
||||
while ran is None or ran != 0:
|
||||
migrations = self._run_migration(ctxt, max_count, ignore_state)
|
||||
migrations = self._run_migration(ctxt, max_count)
|
||||
migration_info.update(migrations)
|
||||
ran = sum([done for found, done, remaining in migrations.values()])
|
||||
if not unlimited:
|
||||
break
|
||||
headers = ["{}".format(_('Migration')),
|
||||
"{}".format(_('Found')),
|
||||
"{}".format(_('Done')),
|
||||
"{}".format(_('Remaining'))]
|
||||
"{}".format(_('Total Needed')),
|
||||
"{}".format(_('Completed')), ]
|
||||
t = prettytable.PrettyTable(headers)
|
||||
for name in sorted(migration_info.keys()):
|
||||
info = migration_info[name]
|
||||
t.add_row([name, info[0], info[1], info[2]])
|
||||
t.add_row([name, info[0], info[1]])
|
||||
print(t)
|
||||
|
||||
sys.exit(1 if ran else 0)
|
||||
|
@ -93,6 +93,10 @@ def dispose_engine():
|
||||
###################
|
||||
|
||||
|
||||
def service_uuids_online_data_migration(context, max_count):
|
||||
return IMPL.service_uuids_online_data_migration(context, max_count)
|
||||
|
||||
|
||||
def service_destroy(context, service_id):
|
||||
"""Destroy the service or raise if it does not exist."""
|
||||
return IMPL.service_destroy(context, service_id)
|
||||
@ -142,6 +146,14 @@ def service_update(context, service_id, values):
|
||||
return IMPL.service_update(context, service_id, values)
|
||||
|
||||
|
||||
def service_get_by_uuid(context, service_uuid):
|
||||
"""Get a service by it's uuid.
|
||||
|
||||
Return Service ref or raise if it does not exist.
|
||||
"""
|
||||
return IMPL.service_get_by_uuid(context, service_uuid)
|
||||
|
||||
|
||||
###############
|
||||
|
||||
|
||||
|
@ -32,6 +32,7 @@ import uuid
|
||||
from oslo_config import cfg
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_db import options
|
||||
from oslo_db.sqlalchemy import enginefacade
|
||||
from oslo_db.sqlalchemy import session as db_session
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import importutils
|
||||
@ -555,6 +556,16 @@ def service_get_all(context, backend_match_level=None, **filters):
|
||||
return [] if not query else query.all()
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def service_get_by_uuid(context, service_uuid):
|
||||
query = model_query(context, models.Service).fitler_by(uuid=service_uuid)
|
||||
result = query.first()
|
||||
if not result:
|
||||
raise exception.ServiceNotFound(service_id=service_uuid)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
@require_admin_context
|
||||
def service_create(context, values):
|
||||
if not values.get('uuid'):
|
||||
@ -585,6 +596,27 @@ def service_update(context, service_id, values):
|
||||
raise exception.ServiceNotFound(service_id=service_id)
|
||||
|
||||
|
||||
@enginefacade.writer
|
||||
def service_uuids_online_data_migration(context, max_count):
|
||||
from cinder.objects import service
|
||||
|
||||
total = 0
|
||||
updated = 0
|
||||
|
||||
db_services = model_query(context, models.Service).filter_by(
|
||||
uuid=None).limit(max_count)
|
||||
for db_service in db_services:
|
||||
total += 1
|
||||
|
||||
# The conversion in the Service object code
|
||||
# will generate a UUID and save it for us.
|
||||
service_obj = service.Service._from_db_object(
|
||||
context, service.Service(), db_service)
|
||||
if 'uuid' in service_obj:
|
||||
updated += 1
|
||||
return total, updated
|
||||
|
||||
|
||||
###################
|
||||
|
||||
|
||||
@ -605,6 +637,7 @@ def is_backend_frozen(context, host, cluster_name):
|
||||
|
||||
###################
|
||||
|
||||
|
||||
def _cluster_query(context, is_up=None, get_services=False,
|
||||
services_summary=False, read_deleted='no',
|
||||
name_match_level=None, name=None, session=None, **filters):
|
||||
|
@ -10,9 +10,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import six
|
||||
import uuid
|
||||
|
||||
from sqlalchemy import Column
|
||||
from sqlalchemy.engine.reflection import Inspector
|
||||
from sqlalchemy import Index
|
||||
@ -33,9 +30,3 @@ def upgrade(migrate_engine):
|
||||
if uuid_index_name not in (i['name'] for i in indexes):
|
||||
services = Table('services', meta, autoload=True)
|
||||
Index(uuid_index_name, services.c.uuid, unique=True).create()
|
||||
|
||||
service_list = list(services.select().execute())
|
||||
for s in service_list:
|
||||
if not s.uuid:
|
||||
services.update().where(services.c.id == s.id).\
|
||||
values(uuid=six.text_type(uuid.uuid4())).execute()
|
||||
|
@ -135,6 +135,7 @@ OBJ_VERSIONS.add('1.24', {'LogLevel': '1.0', 'LogLevelList': '1.0'})
|
||||
OBJ_VERSIONS.add('1.25', {'Group': '1.2'})
|
||||
OBJ_VERSIONS.add('1.26', {'Snapshot': '1.5'})
|
||||
OBJ_VERSIONS.add('1.27', {'Backup': '1.5', 'BackupImport': '1.5'})
|
||||
OBJ_VERSIONS.add('1.28', {'Service': '1.5'})
|
||||
|
||||
|
||||
class CinderObjectRegistry(base.VersionedObjectRegistry):
|
||||
|
@ -12,6 +12,8 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import uuidutils
|
||||
from oslo_utils import versionutils
|
||||
from oslo_versionedobjects import fields
|
||||
|
||||
@ -24,6 +26,9 @@ from cinder.objects import fields as c_fields
|
||||
from cinder import utils
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@base.CinderObjectRegistry.register
|
||||
class Service(base.CinderPersistentObject, base.CinderObject,
|
||||
base.CinderObjectDictCompat, base.CinderComparableObject,
|
||||
@ -33,7 +38,8 @@ class Service(base.CinderPersistentObject, base.CinderObject,
|
||||
# Version 1.2: Add get_minimum_rpc_version() and get_minimum_obj_version()
|
||||
# Version 1.3: Add replication fields
|
||||
# Version 1.4: Add cluster fields
|
||||
VERSION = '1.4'
|
||||
# Version 1.5: Add UUID field
|
||||
VERSION = '1.5'
|
||||
|
||||
OPTIONAL_FIELDS = ('cluster',)
|
||||
|
||||
@ -59,6 +65,8 @@ class Service(base.CinderPersistentObject, base.CinderObject,
|
||||
'replication_status': c_fields.ReplicationStatusField(nullable=True),
|
||||
'frozen': fields.BooleanField(default=False),
|
||||
'active_backend_id': fields.StringField(nullable=True),
|
||||
|
||||
'uuid': fields.StringField(nullable=True),
|
||||
}
|
||||
|
||||
def obj_make_compatible(self, primitive, target_version):
|
||||
@ -71,6 +79,8 @@ class Service(base.CinderPersistentObject, base.CinderObject,
|
||||
if target_version < (1, 4):
|
||||
for obj_field in ('cluster', 'cluster_name'):
|
||||
primitive.pop(obj_field, None)
|
||||
if target_version < (1, 5) and 'uuid' in primitive:
|
||||
del primitive['uuid']
|
||||
|
||||
@staticmethod
|
||||
def _from_db_object(context, service, db_service, expected_attrs=None):
|
||||
@ -98,6 +108,14 @@ class Service(base.CinderPersistentObject, base.CinderObject,
|
||||
service.cluster = None
|
||||
|
||||
service.obj_reset_changes()
|
||||
|
||||
# TODO(jdg): Remove in S when we're sure all Services have UUID in db
|
||||
if 'uuid' not in service:
|
||||
service.uuid = uuidutils.generate_uuid()
|
||||
LOG.debug('Generated UUID %(uuid)s for service %(id)i',
|
||||
dict(uuid=service.uuid, id=service.id))
|
||||
service.save()
|
||||
|
||||
return service
|
||||
|
||||
def obj_load_attr(self, attrname):
|
||||
@ -131,6 +149,11 @@ class Service(base.CinderPersistentObject, base.CinderObject,
|
||||
db_service = db.service_get(context, host=host, binary=binary_key)
|
||||
return cls._from_db_object(context, cls(context), db_service)
|
||||
|
||||
@classmethod
|
||||
def get_by_uuid(cls, context, service_uuid):
|
||||
db_service = db.service_get_by_uuid(context, service_uuid)
|
||||
return cls._from_db_object(context, cls(), db_service)
|
||||
|
||||
def create(self):
|
||||
if self.obj_attr_is_set('id'):
|
||||
raise exception.ObjectActionError(action='create',
|
||||
@ -139,6 +162,10 @@ class Service(base.CinderPersistentObject, base.CinderObject,
|
||||
if 'cluster' in updates:
|
||||
raise exception.ObjectActionError(
|
||||
action='create', reason=_('cluster assigned'))
|
||||
if 'uuid' not in updates:
|
||||
updates['uuid'] = uuidutils.generate_uuid()
|
||||
self.uuid = updates['uuid']
|
||||
|
||||
db_service = db.service_create(self._context, updates)
|
||||
self._from_db_object(self._context, self, db_service)
|
||||
|
||||
|
@ -34,6 +34,7 @@ def fake_db_service(**updates):
|
||||
'deleted_at': None,
|
||||
'deleted': False,
|
||||
'id': 123,
|
||||
'uuid': 'ce59413f-4061-425c-9ad0-3479bd102ab2',
|
||||
'host': 'fake-host',
|
||||
'binary': 'fake-service',
|
||||
'topic': 'fake-service-topic',
|
||||
|
@ -43,7 +43,7 @@ object_data = {
|
||||
'QualityOfServiceSpecs': '1.0-0b212e0a86ee99092229874e03207fe8',
|
||||
'QualityOfServiceSpecsList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
|
||||
'RequestSpec': '1.1-b0bd1a28d191d75648901fa853e8a733',
|
||||
'Service': '1.4-a6727ccda6d4043f5e38e75c7c518c7f',
|
||||
'Service': '1.5-dfa465098dd9017bad825cab55eacc93',
|
||||
'ServiceList': '1.1-15ecf022a68ddbb8c2a6739cfc9f8f5e',
|
||||
'Snapshot': '1.5-ac1cdbd5b89588f6a8f44afdf6b8b201',
|
||||
'SnapshotList': '1.0-15ecf022a68ddbb8c2a6739cfc9f8f5e',
|
||||
|
@ -62,7 +62,8 @@ class TestService(test_objects.BaseObjectsTestCase):
|
||||
service = objects.Service(context=self.context)
|
||||
service.create()
|
||||
self.assertEqual(db_service['id'], service.id)
|
||||
service_create.assert_called_once_with(self.context, {})
|
||||
service_create.assert_called_once_with(self.context,
|
||||
{'uuid': mock.ANY})
|
||||
|
||||
@mock.patch('cinder.db.service_update')
|
||||
def test_save(self, service_update):
|
||||
|
@ -227,7 +227,7 @@ class TestCinderManageCmd(test.TestCase):
|
||||
exit = self.assertRaises(SystemExit, db_cmds.online_data_migrations)
|
||||
self.assertEqual(0, exit.code)
|
||||
cinder_manage.DbCommands.online_migrations[0].assert_has_calls(
|
||||
(mock.call(mock.ANY, 50, False),) * 2)
|
||||
(mock.call(mock.ANY, 50),) * 2)
|
||||
|
||||
def _fake_db_command(self, migrations=None):
|
||||
if migrations is None:
|
||||
@ -254,17 +254,17 @@ class TestCinderManageCmd(test.TestCase):
|
||||
expected = """\
|
||||
5 rows matched query mock_mig_1, 4 migrated, 1 remaining
|
||||
6 rows matched query mock_mig_2, 6 migrated, 0 remaining
|
||||
+------------+-------+------+-----------+
|
||||
| Migration | Found | Done | Remaining |
|
||||
+------------+-------+------+-----------+
|
||||
| mock_mig_1 | 5 | 4 | 1 |
|
||||
| mock_mig_2 | 6 | 6 | 0 |
|
||||
+------------+-------+------+-----------+
|
||||
+------------+--------------+-----------+
|
||||
| Migration | Total Needed | Completed |
|
||||
+------------+--------------+-----------+
|
||||
| mock_mig_1 | 5 | 4 |
|
||||
| mock_mig_2 | 6 | 6 |
|
||||
+------------+--------------+-----------+
|
||||
"""
|
||||
command.online_migrations[0].assert_has_calls([mock.call(ctxt,
|
||||
10, False)])
|
||||
10)])
|
||||
command.online_migrations[1].assert_has_calls([mock.call(ctxt,
|
||||
6, False)])
|
||||
6)])
|
||||
|
||||
self.assertEqual(expected, sys.stdout.getvalue())
|
||||
|
||||
@ -273,10 +273,10 @@ class TestCinderManageCmd(test.TestCase):
|
||||
def test_db_commands_online_data_migrations_ignore_state_and_max(self):
|
||||
db_cmds = cinder_manage.DbCommands()
|
||||
exit = self.assertRaises(SystemExit, db_cmds.online_data_migrations,
|
||||
2, True)
|
||||
2)
|
||||
self.assertEqual(1, exit.code)
|
||||
cinder_manage.DbCommands.online_migrations[0].assert_called_once_with(
|
||||
mock.ANY, 2, True)
|
||||
mock.ANY, 2)
|
||||
|
||||
@mock.patch('cinder.cmd.manage.DbCommands.online_migrations',
|
||||
(mock.Mock(side_effect=((2, 2), (0, 0)), __name__='foo'),))
|
||||
|
Loading…
x
Reference in New Issue
Block a user