Merge "PowerMax Driver - Unit Test Refactoring"

This commit is contained in:
Zuul 2019-03-09 00:32:52 +00:00 committed by Gerrit Code Review
commit 5d6814de4f
12 changed files with 9523 additions and 9401 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,333 @@
# Copyright (c) 2017-2019 Dell Inc. or its subsidiaries.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ast
import requests
from cinder import exception
from cinder.tests.unit.volume.drivers.dell_emc.powermax import (
powermax_data as tpd)
class FakeLookupService(object):
def get_device_mapping_from_network(self, initiator_wwns, target_wwns):
return tpd.PowerMaxData.device_map
class FakeResponse(object):
def __init__(self, status_code, return_object):
self.status_code = status_code
self.return_object = return_object
def json(self):
if self.return_object:
return self.return_object
else:
raise ValueError
def get_status_code(self):
return self.status_code()
def raise_for_status(self):
if 200 <= self.status_code <= 204:
return False
else:
return True
class FakeRequestsSession(object):
def __init__(self, *args, **kwargs):
self.data = tpd.PowerMaxData()
def request(self, method, url, params=None, data=None):
return_object = ''
status_code = 200
if method == 'GET':
status_code, return_object = self._get_request(url, params)
elif method == 'POST' or method == 'PUT':
status_code, return_object = self._post_or_put(url, data)
elif method == 'DELETE':
status_code, return_object = self._delete(url)
elif method == 'TIMEOUT':
raise requests.Timeout
elif method == 'EXCEPTION':
raise Exception
elif method == 'CONNECTION':
raise requests.ConnectionError
elif method == 'HTTP':
raise requests.HTTPError
elif method == 'SSL':
raise requests.exceptions.SSLError
elif method == 'EXCEPTION':
raise exception.VolumeBackendAPIException
return FakeResponse(status_code, return_object)
def _get_request(self, url, params):
status_code = 200
return_object = None
if self.data.failed_resource in url:
status_code = 500
return_object = self.data.job_list[2]
elif 'sloprovisioning' in url:
if 'volume' in url:
return_object = self._sloprovisioning_volume(url, params)
elif 'storagegroup' in url:
return_object = self._sloprovisioning_sg(url)
elif 'maskingview' in url:
return_object = self._sloprovisioning_mv(url)
elif 'portgroup' in url:
return_object = self._sloprovisioning_pg(url)
elif 'host' in url:
return_object = self._sloprovisioning_ig(url)
elif 'initiator' in url:
return_object = self._sloprovisioning_initiator(url)
elif 'service_level_demand_report' in url:
return_object = self.data.srp_slo_details
elif 'srp' in url:
return_object = self.data.srp_details
elif 'workloadtype' in url:
return_object = self.data.workloadtype
elif 'compressionCapable' in url:
return_object = self.data.compression_info
elif 'slo' in url:
return_object = self.data.powermax_slo_details
elif 'replication' in url:
return_object = self._replication(url)
elif 'system' in url:
if 'director' in url:
return_object = self._system_port(url)
else:
return_object = self._system(url)
elif 'headroom' in url:
return_object = self.data.headroom
return status_code, return_object
def _sloprovisioning_volume(self, url, params):
return_object = self.data.volume_list[2]
if '/private' in url:
return_object = self.data.private_vol_details
elif params:
if '1' in params.values():
return_object = self.data.volume_list[0]
elif '2' in params.values():
return_object = self.data.volume_list[1]
else:
for vol in self.data.volume_details:
if vol['volumeId'] in url:
return_object = vol
break
return return_object
def _sloprovisioning_sg(self, url):
return_object = self.data.sg_list
for sg in self.data.sg_details:
if sg['storageGroupId'] in url:
return_object = sg
break
return return_object
def _sloprovisioning_mv(self, url):
if self.data.masking_view_name_i in url:
return_object = self.data.maskingview[1]
else:
return_object = self.data.maskingview[0]
return return_object
def _sloprovisioning_pg(self, url):
return_object = None
for pg in self.data.portgroup:
if pg['portGroupId'] in url:
return_object = pg
break
return return_object
def _system_port(self, url):
return_object = None
for port in self.data.port_list:
if port['symmetrixPort']['symmetrixPortKey']['directorId'] in url:
return_object = port
break
return return_object
def _sloprovisioning_ig(self, url):
return_object = None
for ig in self.data.inititiatorgroup:
if ig['hostId'] in url:
return_object = ig
break
return return_object
def _sloprovisioning_initiator(self, url):
return_object = self.data.initiator_list[2]
if self.data.wwpn1 in url:
return_object = self.data.initiator_list[0]
elif self.data.initiator in url:
return_object = self.data.initiator_list[1]
return return_object
def _replication(self, url):
return_object = None
if 'storagegroup' in url:
return_object = self._replication_sg(url)
elif 'rdf_group' in url:
if self.data.device_id in url:
return_object = self.data.rdf_group_vol_details
elif self.data.rdf_group_no in url:
return_object = self.data.rdf_group_details
else:
return_object = self.data.rdf_group_list
elif 'snapshot' in url:
return_object = self.data.volume_snap_vx
elif 'capabilities' in url:
return_object = self.data.capabilities
return return_object
def _replication_sg(self, url):
return_object = None
if 'generation' in url:
return_object = self.data.group_snap_vx
elif 'rdf_group' in url:
for sg in self.data.sg_rdf_details:
if sg['storageGroupName'] in url:
return_object = sg
break
elif 'storagegroup' in url:
return_object = self.data.sg_details_rep[0]
return return_object
def _system(self, url):
return_object = None
if 'job' in url:
for job in self.data.job_list:
if job['jobId'] in url:
return_object = job
break
elif 'version' in url:
return_object = self.data.version_details
else:
for symm in self.data.symmetrix:
if symm['symmetrixId'] in url:
return_object = symm
break
return return_object
def _post_or_put(self, url, payload):
return_object = self.data.job_list[0]
status_code = 201
if self.data.failed_resource in url:
status_code = 500
return_object = self.data.job_list[2]
elif payload:
payload = ast.literal_eval(payload)
if self.data.failed_resource in payload.values():
status_code = 500
return_object = self.data.job_list[2]
if payload.get('executionOption'):
status_code = 202
return status_code, return_object
def _delete(self, url):
if self.data.failed_resource in url:
status_code = 500
return_object = self.data.job_list[2]
else:
status_code = 204
return_object = None
return status_code, return_object
def session(self):
return FakeRequestsSession()
def close(self):
pass
class FakeConfiguration(object):
def __init__(self, emc_file=None, volume_backend_name=None,
interval=0, retries=0, replication_device=None, **kwargs):
self.cinder_dell_emc_config_file = emc_file
self.interval = interval
self.retries = retries
self.volume_backend_name = volume_backend_name
self.config_group = volume_backend_name
self.san_is_local = False
if replication_device:
self.replication_device = [replication_device]
for key, value in kwargs.items():
if key == 'san_login':
self.san_login = value
elif key == 'san_password':
self.san_password = value
elif key == 'san_ip':
self.san_ip = value
elif key == 'san_api_port':
self.san_api_port = value
elif key == 'san_rest_port':
self.san_rest_port = value
elif key == 'vmax_srp':
self.vmax_srp = value
elif key == 'vmax_service_level':
self.vmax_service_level = value
elif key == 'vmax_workload':
self.vmax_workload = value
elif key == 'vmax_port_groups':
self.vmax_port_groups = value
elif key == 'vmax_array':
self.vmax_array = value
elif key == 'use_chap_auth':
self.use_chap_auth = value
elif key == 'chap_username':
self.chap_username = value
elif key == 'chap_password':
self.chap_password = value
elif key == 'driver_ssl_cert_verify':
self.driver_ssl_cert_verify = value
elif key == 'driver_ssl_cert_path':
self.driver_ssl_cert_path = value
elif key == 'u4p_failover_target':
self.u4p_failover_target = value
elif key == 'u4p_failover_backoff_factor':
self.u4p_failover_backoff_factor = value
elif key == 'u4p_failover_retries':
self.u4p_failover_retries = value
elif key == 'u4p_failover_timeout':
self.u4p_failover_timeout = value
elif key == 'u4p_primary':
self.u4p_primary = value
def safe_get(self, key):
try:
return getattr(self, key)
except Exception:
return None
def append_config_values(self, values):
pass

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,286 @@
# Copyright (c) 2017-2019 Dell Inc. or its subsidiaries.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from cinder import test
from cinder.tests.unit.volume.drivers.dell_emc.powermax import (
powermax_data as tpd)
from cinder.tests.unit.volume.drivers.dell_emc.powermax import (
powermax_fake_objects as tpfo)
from cinder.volume.drivers.dell_emc.powermax import common
from cinder.volume.drivers.dell_emc.powermax import fc
from cinder.volume.drivers.dell_emc.powermax import rest
from cinder.volume import utils as volume_utils
from cinder.zonemanager import utils as fczm_utils
class PowerMaxFCTest(test.TestCase):
def setUp(self):
self.data = tpd.PowerMaxData()
super(PowerMaxFCTest, self).setUp()
volume_utils.get_max_over_subscription_ratio = mock.Mock()
self.configuration = tpfo.FakeConfiguration(
None, 'FCTests', 1, 1, san_ip='1.1.1.1', san_login='smc',
vmax_array=self.data.array, vmax_srp='SRP_1', san_password='smc',
san_api_port=8443, vmax_port_groups=[self.data.port_group_name_i])
rest.PowerMaxRest._establish_rest_session = mock.Mock(
return_value=tpfo.FakeRequestsSession())
driver = fc.PowerMaxFCDriver(configuration=self.configuration)
self.driver = driver
self.common = self.driver.common
self.masking = self.common.masking
self.utils = self.common.utils
self.utils.get_volumetype_extra_specs = (
mock.Mock(return_value=self.data.vol_type_extra_specs))
def test_create_volume(self):
with mock.patch.object(self.common, 'create_volume') as mock_create:
self.driver.create_volume(self.data.test_volume)
mock_create.assert_called_once_with(self.data.test_volume)
def test_create_volume_from_snapshot(self):
volume = self.data.test_clone_volume
snapshot = self.data.test_snapshot
with mock.patch.object(
self.common, 'create_volume_from_snapshot') as mock_create:
self.driver.create_volume_from_snapshot(volume, snapshot)
mock_create.assert_called_once_with(volume, snapshot)
def test_create_cloned_volume(self):
volume = self.data.test_clone_volume
src_volume = self.data.test_volume
with mock.patch.object(
self.common, 'create_cloned_volume') as mock_create:
self.driver.create_cloned_volume(volume, src_volume)
mock_create.assert_called_once_with(volume, src_volume)
def test_delete_volume(self):
with mock.patch.object(self.common, 'delete_volume') as mock_delete:
self.driver.delete_volume(self.data.test_volume)
mock_delete.assert_called_once_with(self.data.test_volume)
def test_create_snapshot(self):
with mock.patch.object(self.common, 'create_snapshot') as mock_create:
self.driver.create_snapshot(self.data.test_snapshot)
mock_create.assert_called_once_with(
self.data.test_snapshot, self.data.test_snapshot.volume)
def test_delete_snapshot(self):
with mock.patch.object(self.common, 'delete_snapshot') as mock_delete:
self.driver.delete_snapshot(self.data.test_snapshot)
mock_delete.assert_called_once_with(
self.data.test_snapshot, self.data.test_snapshot.volume)
def test_initialize_connection(self):
with mock.patch.object(
self.common, 'initialize_connection',
return_value=self.data.fc_device_info) as mock_initialize:
with mock.patch.object(
self.driver, 'populate_data') as mock_populate:
self.driver.initialize_connection(
self.data.test_volume, self.data.connector)
mock_initialize.assert_called_once_with(
self.data.test_volume, self.data.connector)
mock_populate.assert_called_once_with(
self.data.fc_device_info, self.data.test_volume,
self.data.connector)
def test_populate_data(self):
with mock.patch.object(self.driver, '_build_initiator_target_map',
return_value=([], {})) as mock_build:
ref_data = {
'driver_volume_type': 'fibre_channel',
'data': {'target_lun': self.data.fc_device_info['hostlunid'],
'target_discovered': True,
'target_wwn': [],
'initiator_target_map': {}}}
data = self.driver.populate_data(self.data.fc_device_info,
self.data.test_volume,
self.data.connector)
self.assertEqual(ref_data, data)
mock_build.assert_called_once_with(
self.data.test_volume, self.data.connector)
def test_terminate_connection(self):
with mock.patch.object(
self.common, 'terminate_connection') as mock_terminate:
self.driver.terminate_connection(
self.data.test_volume, self.data.connector)
mock_terminate.assert_called_once_with(
self.data.test_volume, self.data.connector)
def test_terminate_connection_no_zoning_mappings(self):
with mock.patch.object(self.driver, '_get_zoning_mappings',
return_value=None):
with mock.patch.object(
self.common, 'terminate_connection') as mock_terminate:
self.driver.terminate_connection(self.data.test_volume,
self.data.connector)
mock_terminate.assert_not_called()
def test_get_zoning_mappings(self):
ref_mappings = self.data.zoning_mappings
zoning_mappings = self.driver._get_zoning_mappings(
self.data.test_volume, self.data.connector)
self.assertEqual(ref_mappings, zoning_mappings)
# Legacy vol
zoning_mappings2 = self.driver._get_zoning_mappings(
self.data.test_legacy_vol, self.data.connector)
self.assertEqual(ref_mappings, zoning_mappings2)
def test_get_zoning_mappings_no_mv(self):
with mock.patch.object(self.common, 'get_masking_views_from_volume',
return_value=(None, False)):
zoning_mappings = self.driver._get_zoning_mappings(
self.data.test_volume, self.data.connector)
self.assertEqual({}, zoning_mappings)
@mock.patch.object(
common.PowerMaxCommon, 'get_masking_views_from_volume',
return_value=([tpd.PowerMaxData.masking_view_name_f], True))
def test_get_zoning_mappings_metro(self, mock_mv):
ref_mappings = self.data.zoning_mappings_metro
zoning_mappings = self.driver._get_zoning_mappings(
self.data.test_volume, self.data.connector)
self.assertEqual(ref_mappings, zoning_mappings)
def test_cleanup_zones_other_vols_mapped(self):
ref_data = {'driver_volume_type': 'fibre_channel',
'data': {}}
data = self.driver._cleanup_zones(self.data.zoning_mappings)
self.assertEqual(ref_data, data)
def test_cleanup_zones_no_vols_mapped(self):
zoning_mappings = self.data.zoning_mappings
ref_data = {'driver_volume_type': 'fibre_channel',
'data': {'target_wwn': zoning_mappings['target_wwns'],
'initiator_target_map':
zoning_mappings['init_targ_map']}}
with mock.patch.object(self.common, 'get_common_masking_views',
return_value=[]):
data = self.driver._cleanup_zones(self.data.zoning_mappings)
self.assertEqual(ref_data, data)
def test_build_initiator_target_map(self):
ref_target_map = {'123456789012345': ['543210987654321'],
'123456789054321': ['123450987654321']}
with mock.patch.object(fczm_utils, 'create_lookup_service',
return_value=tpfo.FakeLookupService()):
driver = fc.PowerMaxFCDriver(configuration=self.configuration)
with mock.patch.object(driver.common,
'get_target_wwns_from_masking_view',
return_value=(self.data.target_wwns, [])):
targets, target_map = driver._build_initiator_target_map(
self.data.test_volume, self.data.connector)
self.assertEqual(ref_target_map, target_map)
def test_extend_volume(self):
with mock.patch.object(self.common, 'extend_volume') as mock_extend:
self.driver.extend_volume(self.data.test_volume, '3')
mock_extend.assert_called_once_with(self.data.test_volume, '3')
def test_get_volume_stats(self):
with mock.patch.object(
self.driver, 'update_volume_stats') as mock_update:
# no refresh
self.driver.get_volume_stats()
mock_update.assert_not_called()
# with refresh
self.driver.get_volume_stats(True)
mock_update.assert_called_once_with()
def test_update_volume_stats(self):
with mock.patch.object(self.common, 'update_volume_stats',
return_value={}) as mock_update:
self.driver.update_volume_stats()
mock_update.assert_called_once_with()
def test_check_for_setup_error(self):
self.driver.check_for_setup_error()
def test_ensure_export(self):
self.driver.ensure_export('context', 'volume')
def test_create_export(self):
self.driver.create_export('context', 'volume', 'connector')
def test_remove_export(self):
self.driver.remove_export('context', 'volume')
def test_check_for_export(self):
self.driver.check_for_export('context', 'volume_id')
def test_manage_existing(self):
with mock.patch.object(self.common, 'manage_existing',
return_value={}) as mock_manage:
external_ref = {u'source-name': u'00002'}
self.driver.manage_existing(self.data.test_volume, external_ref)
mock_manage.assert_called_once_with(
self.data.test_volume, external_ref)
def test_manage_existing_get_size(self):
with mock.patch.object(self.common, 'manage_existing_get_size',
return_value='1') as mock_manage:
external_ref = {u'source-name': u'00002'}
self.driver.manage_existing_get_size(
self.data.test_volume, external_ref)
mock_manage.assert_called_once_with(
self.data.test_volume, external_ref)
def test_unmanage_volume(self):
with mock.patch.object(self.common, 'unmanage',
return_value={}) as mock_unmanage:
self.driver.unmanage(self.data.test_volume)
mock_unmanage.assert_called_once_with(
self.data.test_volume)
def test_retype(self):
host = {'host': self.data.new_host}
new_type = {'extra_specs': {}}
with mock.patch.object(self.common, 'retype',
return_value=True) as mck_retype:
self.driver.retype({}, self.data.test_volume, new_type, '', host)
mck_retype.assert_called_once_with(
self.data.test_volume, new_type, host)
def test_failover_host(self):
with mock.patch.object(
self.common, 'failover_host',
return_value=(self.data.remote_array, [], [])) as mock_fo:
self.driver.failover_host(self.data.ctx, [self.data.test_volume])
mock_fo.assert_called_once_with([self.data.test_volume], None,
None)
def test_enable_replication(self):
with mock.patch.object(
self.common, 'enable_replication') as mock_er:
self.driver.enable_replication(
self.data.ctx, self.data.test_group, [self.data.test_volume])
mock_er.assert_called_once()
def test_disable_replication(self):
with mock.patch.object(
self.common, 'disable_replication') as mock_dr:
self.driver.disable_replication(
self.data.ctx, self.data.test_group, [self.data.test_volume])
mock_dr.assert_called_once()
def test_failover_replication(self):
with mock.patch.object(
self.common, 'failover_replication') as mock_fo:
self.driver.failover_replication(
self.data.ctx, self.data.test_group, [self.data.test_volume])
mock_fo.assert_called_once()

View File

@ -0,0 +1,334 @@
# Copyright (c) 2017-2019 Dell Inc. or its subsidiaries.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from copy import deepcopy
import mock
from cinder import exception
from cinder import test
from cinder.tests.unit.volume.drivers.dell_emc.powermax import (
powermax_data as tpd)
from cinder.tests.unit.volume.drivers.dell_emc.powermax import (
powermax_fake_objects as tpfo)
from cinder.volume.drivers.dell_emc.powermax import iscsi
from cinder.volume.drivers.dell_emc.powermax import rest
from cinder.volume import utils as volume_utils
class PowerMaxISCSITest(test.TestCase):
def setUp(self):
self.data = tpd.PowerMaxData()
super(PowerMaxISCSITest, self).setUp()
volume_utils.get_max_over_subscription_ratio = mock.Mock()
configuration = tpfo.FakeConfiguration(
None, 'ISCSITests', 1, 1, san_ip='1.1.1.1', san_login='smc',
vmax_array=self.data.array, vmax_srp='SRP_1', san_password='smc',
san_api_port=8443, vmax_port_groups=[self.data.port_group_name_i])
rest.PowerMaxRest._establish_rest_session = mock.Mock(
return_value=tpfo.FakeRequestsSession())
driver = iscsi.PowerMaxISCSIDriver(configuration=configuration)
self.driver = driver
self.common = self.driver.common
self.masking = self.common.masking
self.utils = self.common.utils
self.utils.get_volumetype_extra_specs = (
mock.Mock(return_value=self.data.vol_type_extra_specs))
def test_create_volume(self):
with mock.patch.object(self.common, 'create_volume') as mock_create:
self.driver.create_volume(self.data.test_volume)
mock_create.assert_called_once_with(
self.data.test_volume)
def test_create_volume_from_snapshot(self):
volume = self.data.test_clone_volume
snapshot = self.data.test_snapshot
with mock.patch.object(
self.common, 'create_volume_from_snapshot') as mock_create:
self.driver.create_volume_from_snapshot(volume, snapshot)
mock_create.assert_called_once_with(
volume, snapshot)
def test_create_cloned_volume(self):
volume = self.data.test_clone_volume
src_volume = self.data.test_volume
with mock.patch.object(
self.common, 'create_cloned_volume') as mock_create:
self.driver.create_cloned_volume(volume, src_volume)
mock_create.assert_called_once_with(volume, src_volume)
def test_delete_volume(self):
with mock.patch.object(self.common, 'delete_volume') as mock_delete:
self.driver.delete_volume(self.data.test_volume)
mock_delete.assert_called_once_with(
self.data.test_volume)
def test_create_snapshot(self):
with mock.patch.object(self.common, 'create_snapshot') as mock_create:
self.driver.create_snapshot(self.data.test_snapshot)
mock_create.assert_called_once_with(
self.data.test_snapshot, self.data.test_snapshot.volume)
def test_delete_snapshot(self):
with mock.patch.object(self.common, 'delete_snapshot') as mock_delete:
self.driver.delete_snapshot(self.data.test_snapshot)
mock_delete.assert_called_once_with(
self.data.test_snapshot, self.data.test_snapshot.volume)
def test_initialize_connection(self):
ref_dict = {'maskingview': self.data.masking_view_name_f,
'array': self.data.array, 'hostlunid': 3,
'device_id': self.data.device_id,
'ip_and_iqn': [{'ip': self.data.ip,
'iqn': self.data.initiator}],
'is_multipath': False}
with mock.patch.object(self.driver, 'get_iscsi_dict') as mock_get:
with mock.patch.object(
self.common, 'get_port_group_from_masking_view',
return_value=self.data.port_group_name_i):
self.driver.initialize_connection(self.data.test_volume,
self.data.connector)
mock_get.assert_called_once_with(
ref_dict, self.data.test_volume)
def test_get_iscsi_dict_success(self):
ip_and_iqn = self.common._find_ip_and_iqns(
self.data.array, self.data.port_group_name_i)
host_lun_id = self.data.iscsi_device_info['hostlunid']
volume = self.data.test_volume
device_info = self.data.iscsi_device_info
ref_data = {'driver_volume_type': 'iscsi', 'data': {}}
with mock.patch.object(
self.driver, 'vmax_get_iscsi_properties',
return_value={}) as mock_get:
data = self.driver.get_iscsi_dict(device_info, volume)
self.assertEqual(ref_data, data)
mock_get.assert_called_once_with(
volume, ip_and_iqn, True, host_lun_id, None, None)
def test_get_iscsi_dict_exception(self):
device_info = {'ip_and_iqn': ''}
self.assertRaises(exception.VolumeBackendAPIException,
self.driver.get_iscsi_dict,
device_info, self.data.test_volume)
def test_get_iscsi_dict_metro(self):
ip_and_iqn = self.common._find_ip_and_iqns(
self.data.array, self.data.port_group_name_i)
host_lun_id = self.data.iscsi_device_info_metro['hostlunid']
volume = self.data.test_volume
device_info = self.data.iscsi_device_info_metro
ref_data = {'driver_volume_type': 'iscsi', 'data': {}}
with mock.patch.object(self.driver, 'vmax_get_iscsi_properties',
return_value={}) as mock_get:
data = self.driver.get_iscsi_dict(device_info, volume)
self.assertEqual(ref_data, data)
mock_get.assert_called_once_with(
volume, ip_and_iqn, True, host_lun_id,
self.data.iscsi_device_info_metro['metro_ip_and_iqn'],
self.data.iscsi_device_info_metro['metro_hostlunid'])
def test_vmax_get_iscsi_properties_one_target_no_auth(self):
vol = deepcopy(self.data.test_volume)
ip_and_iqn = self.common._find_ip_and_iqns(
self.data.array, self.data.port_group_name_i)
host_lun_id = self.data.iscsi_device_info['hostlunid']
ref_properties = {
'target_discovered': True,
'target_iqn': ip_and_iqn[0]['iqn'].split(',')[0],
'target_portal': ip_and_iqn[0]['ip'] + ':3260',
'target_lun': host_lun_id,
'volume_id': self.data.test_volume.id}
iscsi_properties = self.driver.vmax_get_iscsi_properties(
vol, ip_and_iqn, True, host_lun_id, [], None)
self.assertEqual(type(ref_properties), type(iscsi_properties))
self.assertEqual(ref_properties, iscsi_properties)
def test_vmax_get_iscsi_properties_multiple_targets(self):
ip_and_iqn = [{'ip': self.data.ip, 'iqn': self.data.initiator},
{'ip': self.data.ip, 'iqn': self.data.iqn}]
host_lun_id = self.data.iscsi_device_info['hostlunid']
ref_properties = {
'target_portals': (
[t['ip'] + ':3260' for t in ip_and_iqn]),
'target_iqns': (
[t['iqn'].split(',')[0] for t in ip_and_iqn]),
'target_luns': [host_lun_id] * len(ip_and_iqn),
'target_discovered': True,
'target_iqn': ip_and_iqn[0]['iqn'].split(',')[0],
'target_portal': ip_and_iqn[0]['ip'] + ':3260',
'target_lun': host_lun_id,
'volume_id': self.data.test_volume.id}
iscsi_properties = self.driver.vmax_get_iscsi_properties(
self.data.test_volume, ip_and_iqn, True, host_lun_id, [], None)
self.assertEqual(ref_properties, iscsi_properties)
def test_vmax_get_iscsi_properties_auth(self):
vol = deepcopy(self.data.test_volume)
backup_conf = self.common.configuration
configuration = tpfo.FakeConfiguration(
None, 'ISCSITests', 1, 1, san_ip='1.1.1.1', san_login='smc',
vmax_array=self.data.array, vmax_srp='SRP_1', san_password='smc',
san_rest_port=8443, use_chap_auth=True,
chap_username='auth_username', chap_password='auth_secret',
vmax_port_groups=[self.data.port_group_name_i])
self.driver.configuration = configuration
ip_and_iqn = [{'ip': self.data.ip, 'iqn': self.data.initiator},
{'ip': self.data.ip, 'iqn': self.data.iqn}]
host_lun_id = self.data.iscsi_device_info['hostlunid']
ref_properties = {
'target_portals': (
[t['ip'] + ':3260' for t in ip_and_iqn]),
'target_iqns': (
[t['iqn'].split(',')[0] for t in ip_and_iqn]),
'target_luns': [host_lun_id] * len(ip_and_iqn),
'target_discovered': True,
'target_iqn': ip_and_iqn[0]['iqn'].split(',')[0],
'target_portal': ip_and_iqn[0]['ip'] + ':3260',
'target_lun': host_lun_id,
'volume_id': self.data.test_volume.id,
'auth_method': 'CHAP',
'auth_username': 'auth_username',
'auth_password': 'auth_secret'}
iscsi_properties = self.driver.vmax_get_iscsi_properties(
vol, ip_and_iqn, True, host_lun_id, None, None)
self.assertEqual(ref_properties, iscsi_properties)
self.driver.configuration = backup_conf
def test_vmax_get_iscsi_properties_metro(self):
ip_and_iqn = [{'ip': self.data.ip, 'iqn': self.data.iqn}]
total_ip_list = [{'ip': self.data.ip, 'iqn': self.data.iqn},
{'ip': self.data.ip2, 'iqn': self.data.iqn2}]
host_lun_id = self.data.iscsi_device_info['hostlunid']
host_lun_id2 = self.data.iscsi_device_info_metro['metro_hostlunid']
ref_properties = {
'target_portals': (
[t['ip'] + ':3260' for t in total_ip_list]),
'target_iqns': (
[t['iqn'].split(',')[0] for t in total_ip_list]),
'target_luns': [host_lun_id, host_lun_id2],
'target_discovered': True,
'target_iqn': ip_and_iqn[0]['iqn'].split(',')[0],
'target_portal': ip_and_iqn[0]['ip'] + ':3260',
'target_lun': host_lun_id,
'volume_id': self.data.test_volume.id}
iscsi_properties = self.driver.vmax_get_iscsi_properties(
self.data.test_volume, ip_and_iqn, True, host_lun_id,
self.data.iscsi_device_info_metro['metro_ip_and_iqn'],
self.data.iscsi_device_info_metro['metro_hostlunid'])
self.assertEqual(ref_properties, iscsi_properties)
def test_terminate_connection(self):
with mock.patch.object(
self.common, 'terminate_connection') as mock_terminate:
self.driver.terminate_connection(self.data.test_volume,
self.data.connector)
mock_terminate.assert_called_once_with(
self.data.test_volume, self.data.connector)
def test_extend_volume(self):
with mock.patch.object(
self.common, 'extend_volume') as mock_extend:
self.driver.extend_volume(self.data.test_volume, '3')
mock_extend.assert_called_once_with(self.data.test_volume, '3')
def test_get_volume_stats(self):
with mock.patch.object(
self.driver, 'update_volume_stats') as mock_update:
# no refresh
self.driver.get_volume_stats()
mock_update.assert_not_called()
# with refresh
self.driver.get_volume_stats(True)
mock_update.assert_called_once_with()
def test_update_volume_stats(self):
with mock.patch.object(self.common, 'update_volume_stats',
return_value={}) as mock_update:
self.driver.update_volume_stats()
mock_update.assert_called_once_with()
def test_check_for_setup_error(self):
self.driver.check_for_setup_error()
def test_ensure_export(self):
self.driver.ensure_export('context', 'volume')
def test_create_export(self):
self.driver.create_export('context', 'volume', 'connector')
def test_remove_export(self):
self.driver.remove_export('context', 'volume')
def test_check_for_export(self):
self.driver.check_for_export('context', 'volume_id')
def test_manage_existing(self):
with mock.patch.object(self.common, 'manage_existing',
return_value={}) as mock_manage:
external_ref = {u'source-name': u'00002'}
self.driver.manage_existing(self.data.test_volume, external_ref)
mock_manage.assert_called_once_with(
self.data.test_volume, external_ref)
def test_manage_existing_get_size(self):
with mock.patch.object(self.common, 'manage_existing_get_size',
return_value='1') as mock_manage:
external_ref = {u'source-name': u'00002'}
self.driver.manage_existing_get_size(
self.data.test_volume, external_ref)
mock_manage.assert_called_once_with(
self.data.test_volume, external_ref)
def test_unmanage_volume(self):
with mock.patch.object(self.common, 'unmanage',
return_value={}) as mock_unmanage:
self.driver.unmanage(self.data.test_volume)
mock_unmanage.assert_called_once_with(self.data.test_volume)
def test_retype(self):
host = {'host': self.data.new_host}
new_type = {'extra_specs': {}}
with mock.patch.object(self.common, 'retype',
return_value=True) as mck_retype:
self.driver.retype({}, self.data.test_volume, new_type, '', host)
mck_retype.assert_called_once_with(
self.data.test_volume, new_type, host)
def test_failover_host(self):
with mock.patch.object(self.common, 'failover_host',
return_value={}) as mock_fo:
self.driver.failover_host({}, [self.data.test_volume])
mock_fo.assert_called_once_with([self.data.test_volume], None,
None)
def test_enable_replication(self):
with mock.patch.object(self.common, 'enable_replication') as mock_er:
self.driver.enable_replication(
self.data.ctx, self.data.test_group, [self.data.test_volume])
mock_er.assert_called_once()
def test_disable_replication(self):
with mock.patch.object(self.common, 'disable_replication') as mock_dr:
self.driver.disable_replication(
self.data.ctx, self.data.test_group, [self.data.test_volume])
mock_dr.assert_called_once()
def test_failover_replication(self):
with mock.patch.object(self.common, 'failover_replication') as mock_fo:
self.driver.failover_replication(
self.data.ctx, self.data.test_group, [self.data.test_volume])
mock_fo.assert_called_once()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,257 @@
# Copyright (c) 2017-2019 Dell Inc. or its subsidiaries.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import platform
from cinder.objects import fields
from cinder import test
from cinder.tests.unit.volume.drivers.dell_emc.powermax import (
powermax_data as tpd)
from cinder import version as openstack_version
from cinder.volume.drivers.dell_emc.powermax import metadata
from cinder.volume.drivers.dell_emc.powermax import rest
from cinder.volume.drivers.dell_emc.powermax import utils
class PowerMaxVolumeMetadataNoDebugTest(test.TestCase):
def setUp(self):
self.data = tpd.PowerMaxData()
super(PowerMaxVolumeMetadataNoDebugTest, self).setUp()
is_debug = False
self.volume_metadata = metadata.PowerMaxVolumeMetadata(
rest.PowerMaxRest, '3.1', is_debug)
@mock.patch.object(
metadata.PowerMaxVolumeMetadata, '_fill_volume_trace_dict',
return_value={})
def test_gather_volume_info(self, mock_fvtd):
self.volume_metadata.gather_volume_info(
self.data.volume_id, 'create', False, volume_size=1)
mock_fvtd.assert_not_called()
class PowerMaxVolumeMetadataDebugTest(test.TestCase):
def setUp(self):
self.data = tpd.PowerMaxData()
super(PowerMaxVolumeMetadataDebugTest, self).setUp()
is_debug = True
self.volume_metadata = metadata.PowerMaxVolumeMetadata(
rest.PowerMaxRest, '3.1', is_debug)
self.utils = self.volume_metadata.utils
self.rest = self.volume_metadata.rest
@mock.patch.object(
metadata.PowerMaxVolumeMetadata, '_fill_volume_trace_dict',
return_value={})
def test_gather_volume_info(self, mock_fvtd):
self.volume_metadata.gather_volume_info(
self.data.volume_id, 'create', False, volume_size=1)
mock_fvtd.assert_called_once()
@mock.patch.object(
metadata.PowerMaxVolumeMetadata, 'update_volume_info_metadata',
return_value={})
def test_capture_attach_info(self, mock_uvim):
self.volume_metadata.capture_attach_info(
self.data.test_volume, self.data.extra_specs,
self.data.masking_view_dict, self.data.fake_host,
False, False)
mock_uvim.assert_called_once()
@mock.patch.object(
metadata.PowerMaxVolumeMetadata, 'update_volume_info_metadata',
return_value={})
def test_capture_create_volume(self, mock_uvim):
self.volume_metadata.capture_create_volume(
self.data.device_id, self.data.test_volume, 'test_group',
'test_group_id', self.data.extra_specs, {}, 'create', None)
mock_uvim.assert_called_once()
@mock.patch.object(
metadata.PowerMaxVolumeMetadata, 'update_volume_info_metadata',
return_value={})
def test_capture_delete_info(self, mock_uvim):
self.volume_metadata.capture_delete_info(self.data.test_volume)
mock_uvim.assert_called_once()
@mock.patch.object(
metadata.PowerMaxVolumeMetadata, 'update_volume_info_metadata',
return_value={})
def test_capture_manage_existing(self, mock_uvim):
self.volume_metadata.capture_manage_existing(
self.data.test_volume, {}, self.data.device_id,
self.data.extra_specs)
mock_uvim.assert_called_once()
@mock.patch.object(
metadata.PowerMaxVolumeMetadata, 'update_volume_info_metadata',
return_value={})
def test_capture_failover_volume(self, mock_uvim):
self.volume_metadata.capture_failover_volume(
self.data.test_volume, self.data.device_id2,
self.data.remote_array, self.data.rdf_group_name,
self.data.device_id, self.data.array,
self.data.extra_specs, True, None,
fields.ReplicationStatus.FAILED_OVER, utils.REP_SYNC)
mock_uvim.assert_called_once()
@mock.patch.object(
metadata.PowerMaxVolumeMetadata, 'update_volume_info_metadata',
return_value={})
def test_capture_modify_group(self, mock_uvim):
self.volume_metadata.capture_modify_group(
'test_group', 'test_group_id', [self.data.test_volume],
[], self.data.array)
mock_uvim.assert_called_once()
@mock.patch.object(
metadata.PowerMaxVolumeMetadata, 'update_volume_info_metadata',
return_value={})
def test_capture_extend_info(self, mock_uvim):
self.volume_metadata.capture_extend_info(
self.data.test_volume, 5, self.data.device_id,
self.data.extra_specs, self.data.array)
mock_uvim.assert_called_once()
@mock.patch.object(
metadata.PowerMaxVolumeMetadata, 'update_volume_info_metadata',
return_value={})
def test_capture_detach_info(self, mock_uvim):
self.volume_metadata.capture_detach_info(
self.data.test_volume, self.data.extra_specs, self.data.device_id,
None, None)
mock_uvim.assert_called_once()
@mock.patch.object(
metadata.PowerMaxVolumeMetadata, 'update_volume_info_metadata',
return_value={})
def test_capture_snapshot_info(self, mock_uvim):
self.volume_metadata.capture_snapshot_info(
self.data.test_volume, self.data.extra_specs, 'createSnapshot',
'ss-test-vol')
mock_uvim.assert_called_once()
@mock.patch.object(
metadata.PowerMaxVolumeMetadata, 'update_volume_info_metadata',
return_value={})
def test_capture_retype_info(self, mock_uvim):
self.volume_metadata.capture_retype_info(
self.data.test_volume, self.data.device_id, self.data.array,
self.data.srp, self.data.slo, self.data.workload,
self.data.storagegroup_name_target, False, None,
False)
mock_uvim.assert_called_once()
def test_update_volume_info_metadata(self):
volume_metadata = self.volume_metadata.update_volume_info_metadata(
self.data.data_dict, self.data.version_dict)
self.assertEqual('2.7.12', volume_metadata['python_version'])
self.assertEqual('VMAX250F', volume_metadata['storage_model'])
self.assertEqual('DSS', volume_metadata['workload'])
self.assertEqual('OS-fibre-PG', volume_metadata['port_group'])
def test_fill_volume_trace_dict(self):
datadict = {}
volume_trace_dict = {}
volume_key_value = {}
result_dict = {'successful_operation': 'create',
'volume_id': self.data.test_volume.id}
volume_metadata = self.volume_metadata._fill_volume_trace_dict(
self.data.test_volume.id, 'create', False, target_name=None,
datadict=datadict, volume_key_value=volume_key_value,
volume_trace_dict=volume_trace_dict)
self.assertEqual(result_dict, volume_metadata)
def test_fill_volume_trace_dict_multi_attach(self):
mv_list = ['mv1', 'mv2', 'mv3']
sg_list = ['sg1', 'sg2', 'sg3']
datadict = {}
volume_trace_dict = {}
volume_key_value = {}
result_dict = {
'masking_view_1': 'mv1', 'masking_view_2': 'mv2',
'masking_view_3': 'mv3', 'successful_operation': 'attach',
'storage_group_1': 'sg1', 'storage_group_2': 'sg2',
'storage_group_3': 'sg3', 'volume_id': self.data.test_volume.id}
volume_metadata = self.volume_metadata._fill_volume_trace_dict(
self.data.test_volume.id, 'attach', False, target_name=None,
datadict=datadict, volume_trace_dict=volume_trace_dict,
volume_key_value=volume_key_value, mv_list=mv_list,
sg_list=sg_list)
self.assertEqual(result_dict, volume_metadata)
@mock.patch.object(utils.PowerMaxUtils, 'merge_dicts',
return_value={})
def test_consolidate_volume_trace_list(self, mock_m2d):
self.volume_metadata.volume_trace_list = [self.data.data_dict]
volume_trace_dict = {'volume_updated_time': '2018-03-06 16:51:40',
'operation': 'delete',
'volume_id': self.data.volume_id}
volume_key_value = {self.data.volume_id: volume_trace_dict}
self.volume_metadata._consolidate_volume_trace_list(
self.data.volume_id, volume_trace_dict, volume_key_value)
mock_m2d.assert_called_once()
def test_merge_dicts_multiple(self):
d1 = {'a': 1, 'b': 2}
d2 = {'c': 3, 'd': 4}
d3 = {'e': 5, 'f': 6}
res_d = {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6}
result_dict = self.utils.merge_dicts(
d1, d2, d3)
self.assertEqual(res_d, result_dict)
def test_merge_dicts_multiple_2(self):
d1 = {'a': 1, 'b': 2}
d2 = {'b': 3, 'd': 4}
d3 = {'d': 5, 'e': 6}
res_d = {'a': 1, 'b': 2, 'd': 4, 'e': 6}
result_dict = self.utils.merge_dicts(
d1, d2, d3)
self.assertEqual(res_d, result_dict)
def test_merge_dicts(self):
self.volume_metadata.volume_trace_list = [self.data.data_dict]
volume_trace_dict = {'volume_updated_time': '2018-03-06 16:51:40',
'operation': 'delete',
'volume_id': self.data.volume_id}
result_dict = self.utils.merge_dicts(
volume_trace_dict, self.data.volume_info_dict)
self.assertEqual('delete', result_dict['operation'])
self.assertEqual(
'2018-03-06 16:51:40', result_dict['volume_updated_time'])
self.assertEqual('OS-fibre-PG', result_dict['port_group'])
@mock.patch.object(platform, 'platform',
return_value=tpd.PowerMaxData.platform)
@mock.patch.object(platform, 'python_version',
return_value=tpd.PowerMaxData.python_version)
@mock.patch.object(openstack_version.version_info, 'version_string',
return_value=tpd.PowerMaxData.openstack_version)
@mock.patch.object(openstack_version.version_info, 'release_string',
return_value=tpd.PowerMaxData.openstack_release)
@mock.patch.object(
rest.PowerMaxRest, 'get_unisphere_version',
return_value={'version': tpd.PowerMaxData.unisphere_version})
@mock.patch.object(
rest.PowerMaxRest, 'get_array_serial',
return_value={'ucode': tpd.PowerMaxData.vmax_firmware_version,
'model': tpd.PowerMaxData.vmax_model})
def test_gather_version_info(
self, mock_vi, mock_ur, mock_or, mock_ov, mock_pv, mock_p):
self.volume_metadata.gather_version_info(self.data.array)
self.assertEqual(
self.data.version_dict, self.volume_metadata.version_dict)

View File

@ -0,0 +1,557 @@
# Copyright (c) 2017-2019 Dell Inc. or its subsidiaries.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from copy import deepcopy
import mock
from cinder import exception
from cinder import test
from cinder.tests.unit import utils as test_utils
from cinder.tests.unit.volume.drivers.dell_emc.powermax import (
powermax_data as tpd)
from cinder.tests.unit.volume.drivers.dell_emc.powermax import (
powermax_fake_objects as tpfo)
from cinder.volume.drivers.dell_emc.powermax import iscsi
from cinder.volume.drivers.dell_emc.powermax import provision
from cinder.volume.drivers.dell_emc.powermax import rest
from cinder.volume.drivers.dell_emc.powermax import utils
from cinder.volume import utils as volume_utils
class PowerMaxProvisionTest(test.TestCase):
def setUp(self):
self.data = tpd.PowerMaxData()
super(PowerMaxProvisionTest, self).setUp()
volume_utils.get_max_over_subscription_ratio = mock.Mock()
configuration = tpfo.FakeConfiguration(
None, 'ProvisionTests', 1, 1, san_ip='1.1.1.1', san_login='smc',
vmax_array=self.data.array, vmax_srp='SRP_1', san_password='smc',
san_api_port=8443, vmax_port_groups=[self.data.port_group_name_i])
rest.PowerMaxRest._establish_rest_session = mock.Mock(
return_value=tpfo.FakeRequestsSession())
driver = iscsi.PowerMaxISCSIDriver(configuration=configuration)
self.driver = driver
self.common = self.driver.common
self.provision = self.common.provision
self.utils = self.common.utils
self.rest = self.common.rest
@mock.patch.object(rest.PowerMaxRest, 'create_storage_group',
return_value=tpd.PowerMaxData.storagegroup_name_f)
@mock.patch.object(rest.PowerMaxRest, 'get_storage_group',
side_effect=[
tpd.PowerMaxData.storagegroup_name_f, None])
def test_create_storage_group(self, mock_get_sg, mock_create):
array = self.data.array
storagegroup_name = self.data.storagegroup_name_f
srp = self.data.srp
slo = self.data.slo
workload = self.data.workload
extra_specs = self.data.extra_specs
for x in range(0, 2):
storagegroup = self.provision.create_storage_group(
array, storagegroup_name, srp, slo, workload, extra_specs)
self.assertEqual(storagegroup_name, storagegroup)
mock_create.assert_called_once()
def test_create_volume_from_sg(self):
array = self.data.array
storagegroup_name = self.data.storagegroup_name_f
volume_id = self.data.test_volume.id
volume_name = self.utils.get_volume_element_name(volume_id)
volume_size = self.data.test_volume.size
extra_specs = self.data.extra_specs
ref_dict = self.data.provider_location
volume_dict = self.provision.create_volume_from_sg(
array, volume_name, storagegroup_name, volume_size, extra_specs)
self.assertEqual(ref_dict, volume_dict)
def test_delete_volume_from_srp(self):
array = self.data.array
device_id = self.data.device_id
volume_name = self.data.volume_details[0]['volume_identifier']
with mock.patch.object(self.provision.rest, 'delete_volume'):
self.provision.delete_volume_from_srp(
array, device_id, volume_name)
self.provision.rest.delete_volume.assert_called_once_with(
array, device_id)
def test_create_volume_snap_vx(self):
array = self.data.array
source_device_id = self.data.device_id
snap_name = self.data.snap_location['snap_name']
extra_specs = self.data.extra_specs
ttl = 0
with mock.patch.object(self.provision.rest, 'create_volume_snap'):
self.provision.create_volume_snapvx(
array, source_device_id, snap_name, extra_specs)
self.provision.rest.create_volume_snap.assert_called_once_with(
array, snap_name, source_device_id, extra_specs, ttl)
def test_create_volume_replica_create_snap_true(self):
array = self.data.array
source_device_id = self.data.device_id
target_device_id = self.data.device_id2
snap_name = self.data.snap_location['snap_name']
extra_specs = self.data.extra_specs
# TTL of 1 hours
ttl = 1
with mock.patch.object(
self.provision, 'create_volume_snapvx') as mock_create_snapvx:
with mock.patch.object(
self.provision.rest, 'modify_volume_snap') as mock_modify:
self.provision.create_volume_replica(
array, source_device_id, target_device_id,
snap_name, extra_specs, create_snap=True)
mock_modify.assert_called_once_with(
array, source_device_id, target_device_id, snap_name,
extra_specs, link=True)
mock_create_snapvx.assert_called_once_with(
array, source_device_id, snap_name, extra_specs, ttl=ttl)
def test_create_volume_replica_create_snap_false(self):
array = self.data.array
source_device_id = self.data.device_id
target_device_id = self.data.device_id2
snap_name = self.data.snap_location['snap_name']
extra_specs = self.data.extra_specs
with mock.patch.object(
self.provision, 'create_volume_snapvx') as mock_create_snapvx:
with mock.patch.object(
self.provision.rest, 'modify_volume_snap') as mock_modify:
self.provision.create_volume_replica(
array, source_device_id, target_device_id,
snap_name, extra_specs, create_snap=False)
mock_modify.assert_called_once_with(
array, source_device_id, target_device_id, snap_name,
extra_specs, link=True)
mock_create_snapvx.assert_not_called()
def test_break_replication_relationship(self):
array = self.data.array
source_device_id = self.data.device_id
target_device_id = self.data.device_id2
snap_name = self.data.snap_location['snap_name']
extra_specs = self.data.extra_specs
with mock.patch.object(
self.provision.rest, 'modify_volume_snap') as mock_modify:
self.provision.break_replication_relationship(
array, target_device_id, source_device_id, snap_name,
extra_specs)
mock_modify.assert_called_once_with(
array, source_device_id, target_device_id,
snap_name, extra_specs, list_volume_pairs=None,
unlink=True, generation=0)
@mock.patch('oslo_service.loopingcall.FixedIntervalLoopingCall',
new=test_utils.ZeroIntervalLoopingCall)
def test_unlink_volume(self):
with mock.patch.object(self.rest, 'modify_volume_snap') as mock_mod:
self.provision._unlink_volume(
self.data.array, self.data.device_id, self.data.device_id2,
self.data.snap_location['snap_name'], self.data.extra_specs)
mock_mod.assert_called_once_with(
self.data.array, self.data.device_id, self.data.device_id2,
self.data.snap_location['snap_name'], self.data.extra_specs,
list_volume_pairs=None, unlink=True, generation=0)
@mock.patch('oslo_service.loopingcall.FixedIntervalLoopingCall',
new=test_utils.ZeroIntervalLoopingCall)
def test_unlink_volume_exception(self):
with mock.patch.object(
self.rest, 'modify_volume_snap',
side_effect=[exception.VolumeBackendAPIException(data=''), '']
) as mock_mod:
self.provision._unlink_volume(
self.data.array, self.data.device_id, self.data.device_id2,
self.data.snap_location['snap_name'], self.data.extra_specs)
self.assertEqual(2, mock_mod.call_count)
def test_delete_volume_snap(self):
array = self.data.array
source_device_id = self.data.device_id
snap_name = self.data.snap_location['snap_name']
generation = 0
with mock.patch.object(self.provision.rest, 'delete_volume_snap'):
self.provision.delete_volume_snap(
array, snap_name, source_device_id)
self.provision.rest.delete_volume_snap.assert_called_once_with(
array, snap_name, source_device_id, False, generation)
def test_delete_volume_snap_restore(self):
array = self.data.array
source_device_id = self.data.device_id
snap_name = self.data.snap_location['snap_name']
restored = True
generation = 0
with mock.patch.object(self.provision.rest, 'delete_volume_snap'):
self.provision.delete_volume_snap(
array, snap_name, source_device_id, restored)
self.provision.rest.delete_volume_snap.assert_called_once_with(
array, snap_name, source_device_id, True, generation)
@mock.patch('oslo_service.loopingcall.FixedIntervalLoopingCall',
new=test_utils.ZeroIntervalLoopingCall)
def test_restore_complete(self):
array = self.data.array
source_device_id = self.data.device_id
snap_name = self.data.snap_location['snap_name']
extra_specs = self.data.extra_specs
with mock.patch.object(
self.provision, '_is_restore_complete',
return_value=True):
isrestored = self.provision.is_restore_complete(
array, source_device_id, snap_name, extra_specs)
self.assertTrue(isrestored)
with mock.patch.object(
self.provision, '_is_restore_complete',
side_effect=exception.CinderException):
self.assertRaises(exception.VolumeBackendAPIException,
self.provision.is_restore_complete,
array, source_device_id, snap_name, extra_specs)
def test_is_restore_complete(self):
array = self.data.array
source_device_id = self.data.device_id
snap_name = self.data.snap_location['snap_name']
snap_details = {
'linkedDevices':
[{'targetDevice': source_device_id, 'state': 'Restored'}]}
with mock.patch.object(self.provision.rest,
'get_volume_snap', return_value=snap_details):
isrestored = self.provision._is_restore_complete(
array, source_device_id, snap_name)
self.assertTrue(isrestored)
snap_details['linkedDevices'][0]['state'] = 'Restoring'
with mock.patch.object(self.provision.rest,
'get_volume_snap', return_value=snap_details):
isrestored = self.provision._is_restore_complete(
array, source_device_id, snap_name)
self.assertFalse(isrestored)
def test_revert_volume_snapshot(self):
array = self.data.array
source_device_id = self.data.device_id
snap_name = self.data.snap_location['snap_name']
extra_specs = self.data.extra_specs
with mock.patch.object(
self.provision.rest, 'modify_volume_snap', return_value=None):
self.provision.revert_volume_snapshot(
array, source_device_id, snap_name, extra_specs)
self.provision.rest.modify_volume_snap.assert_called_once_with(
array, source_device_id, "", snap_name,
extra_specs, restore=True)
def test_extend_volume(self):
array = self.data.array
device_id = self.data.device_id
new_size = '3'
extra_specs = self.data.extra_specs
with mock.patch.object(self.provision.rest, 'extend_volume'
) as mock_ex:
self.provision.extend_volume(array, device_id, new_size,
extra_specs)
mock_ex.assert_called_once_with(
array, device_id, new_size, extra_specs)
mock_ex.reset_mock()
# Pass in rdf group
self.provision.extend_volume(array, device_id, new_size,
extra_specs, self.data.rdf_group_no)
mock_ex.assert_called_once_with(
array, device_id, new_size, extra_specs)
def test_get_srp_pool_stats(self):
array = self.data.array
array_info = self.common.pool_info['arrays_info'][0]
srp_capacity = self.data.srp_details['srp_capacity']
ref_stats = ((srp_capacity['usable_total_tb'] * 1024),
float((srp_capacity['usable_total_tb'] * 1024)
- (srp_capacity['usable_used_tb'] * 1024)),
(srp_capacity['subscribed_total_tb'] * 1024),
self.data.srp_details['reserved_cap_percent'])
stats = self.provision.get_srp_pool_stats(array, array_info)
self.assertEqual(ref_stats, stats)
def test_get_srp_pool_stats_errors(self):
# cannot retrieve srp
array = self.data.array
array_info = {'srpName': self.data.failed_resource}
ref_stats = (0, 0, 0, 0, False)
stats = self.provision.get_srp_pool_stats(array, array_info)
self.assertEqual(ref_stats, stats)
# cannot report on all stats
with mock.patch.object(
self.provision.rest, 'get_srp_by_name',
return_value={'srp_capacity': {'usable_total_tb': 33}}):
ref_stats = (33 * 1024, 0, 0, 0)
stats = self.provision.get_srp_pool_stats(array, array_info)
self.assertEqual(ref_stats, stats)
def test_verify_slo_workload_true(self):
# with slo and workload
array = self.data.array
slo = self.data.slo
workload = self.data.workload
srp = self.data.srp
valid_slo, valid_workload = self.provision.verify_slo_workload(
array, slo, workload, srp)
self.assertTrue(valid_slo)
self.assertTrue(valid_workload)
# slo and workload = none
slo2 = None
workload2 = None
valid_slo2, valid_workload2 = self.provision.verify_slo_workload(
array, slo2, workload2, srp)
self.assertTrue(valid_slo2)
self.assertTrue(valid_workload2)
slo2 = None
workload2 = 'None'
valid_slo2, valid_workload2 = self.provision.verify_slo_workload(
array, slo2, workload2, srp)
self.assertTrue(valid_slo2)
self.assertTrue(valid_workload2)
def test_verify_slo_workload_false(self):
# Both wrong
array = self.data.array
slo = 'Diamante'
workload = 'DSSS'
srp = self.data.srp
valid_slo, valid_workload = self.provision.verify_slo_workload(
array, slo, workload, srp)
self.assertFalse(valid_slo)
self.assertFalse(valid_workload)
# Workload set, no slo set
valid_slo, valid_workload = self.provision.verify_slo_workload(
array, None, self.data.workload, srp)
self.assertTrue(valid_slo)
self.assertFalse(valid_workload)
def test_get_slo_workload_settings_from_storage_group(self):
ref_settings = 'Diamond+DSS'
sg_slo_settings = (
self.provision.get_slo_workload_settings_from_storage_group(
self.data.array, self.data.defaultstoragegroup_name))
self.assertEqual(ref_settings, sg_slo_settings)
# No workload
with mock.patch.object(self.provision.rest, 'get_storage_group',
return_value={'slo': 'Silver'}):
ref_settings2 = 'Silver+NONE'
sg_slo_settings2 = (
self.provision.get_slo_workload_settings_from_storage_group(
self.data.array, 'no_workload_sg'))
self.assertEqual(ref_settings2, sg_slo_settings2)
# NextGen Array
with mock.patch.object(self.rest, 'is_next_gen_array',
return_value=True):
ref_settings3 = 'Diamond+NONE'
sg_slo_settings3 = (
self.provision.get_slo_workload_settings_from_storage_group(
self.data.array, self.data.defaultstoragegroup_name))
self.assertEqual(ref_settings3, sg_slo_settings3)
@mock.patch.object(rest.PowerMaxRest, 'wait_for_rdf_consistent_state')
@mock.patch.object(rest.PowerMaxRest, 'delete_rdf_pair')
@mock.patch.object(rest.PowerMaxRest, 'modify_rdf_device_pair')
def test_break_rdf_relationship(self, mock_mod, mock_del, mock_wait):
array = self.data.array
device_id = self.data.device_id
target_device = self.data.device_id2
rdf_group_name = self.data.rdf_group_name
rep_extra_specs = self.data.rep_extra_specs
# State is suspended
self.provision.break_rdf_relationship(
array, device_id, target_device,
rdf_group_name, rep_extra_specs, 'Suspended')
mock_mod.assert_not_called()
mock_del.assert_called_once_with(
array, device_id, rdf_group_name)
mock_del.reset_mock()
# State is synchronized
self.provision.break_rdf_relationship(
array, device_id, target_device,
rdf_group_name, rep_extra_specs, 'Synchronized')
mock_mod.assert_called_once_with(
array, device_id, rdf_group_name, rep_extra_specs,
suspend=True)
mock_del.assert_called_once_with(
array, device_id, rdf_group_name)
# sync still in progress
self.provision.break_rdf_relationship(
array, device_id, target_device,
rdf_group_name, rep_extra_specs, 'SyncInProg')
mock_wait.assert_called_once()
@mock.patch.object(provision.PowerMaxProvision,
'disable_group_replication')
@mock.patch.object(provision.PowerMaxProvision, 'delete_rdf_pair')
def test_break_metro_rdf_pair(self, mock_del, mock_disable):
self.provision.break_metro_rdf_pair(
self.data.array, self.data.device_id, self.data.device_id2,
self.data.rdf_group_no, self.data.rep_extra_specs, 'metro_grp')
mock_del.assert_called_once()
def test_delete_rdf_pair_async(self):
with mock.patch.object(
self.provision.rest, 'delete_rdf_pair') as mock_del_rdf:
extra_specs = deepcopy(self.data.extra_specs)
extra_specs[utils.REP_MODE] = utils.REP_ASYNC
self.provision.delete_rdf_pair(
self.data.array, self.data.device_id,
self.data.rdf_group_no, self.data.device_id2, extra_specs)
mock_del_rdf.assert_called_once()
@mock.patch.object(rest.PowerMaxRest, 'get_storage_group',
return_value=None)
def test_create_volume_group_success(self, mock_get_sg):
array = self.data.array
group_name = self.data.storagegroup_name_source
extra_specs = self.data.extra_specs
ref_value = self.data.storagegroup_name_source
storagegroup = self.provision.create_volume_group(
array, group_name, extra_specs)
self.assertEqual(ref_value, storagegroup)
def test_create_group_replica(self):
array = self.data.array
source_group = self.data.storagegroup_name_source
snap_name = self.data.group_snapshot_name
extra_specs = self.data.extra_specs
with mock.patch.object(
self.provision,
'create_group_replica') as mock_create_replica:
self.provision.create_group_replica(
array, source_group, snap_name, extra_specs)
mock_create_replica.assert_called_once_with(
array, source_group, snap_name, extra_specs)
def test_delete_group_replica(self):
array = self.data.array
snap_name = self.data.group_snapshot_name
source_group_name = self.data.storagegroup_name_source
extra_specs = self.data.extra_specs
src_dev_ids = [self.data.device_id]
with mock.patch.object(
self.provision,
'delete_group_replica') as mock_delete_replica:
self.provision.delete_group_replica(
array, snap_name, source_group_name, src_dev_ids, extra_specs)
mock_delete_replica.assert_called_once_with(
array, snap_name, source_group_name, src_dev_ids, extra_specs)
def test_link_and_break_replica(self):
array = self.data.array
source_group_name = self.data.storagegroup_name_source
target_group_name = self.data.target_group_name
snap_name = self.data.group_snapshot_name
extra_specs = self.data.extra_specs
delete_snapshot = False
with mock.patch.object(
self.provision,
'link_and_break_replica') as mock_link_and_break_replica:
self.provision.link_and_break_replica(
array, source_group_name,
target_group_name, snap_name,
extra_specs, delete_snapshot)
mock_link_and_break_replica.assert_called_once_with(
array, source_group_name,
target_group_name, snap_name,
extra_specs, delete_snapshot)
@mock.patch.object(rest.PowerMaxRest, 'get_storage_group',
side_effect=[None,
tpd.PowerMaxData.sg_details[1]])
@mock.patch.object(provision.PowerMaxProvision, 'create_volume_group')
def test_get_or_create_volume_group(self, mock_create, mock_sg):
for x in range(0, 2):
self.provision.get_or_create_volume_group(
self.data.array, self.data.test_group, self.data.extra_specs)
self.assertEqual(2, mock_sg.call_count)
self.assertEqual(1, mock_create.call_count)
@mock.patch.object(rest.PowerMaxRest, 'create_resource',
return_value=(202, tpd.PowerMaxData.job_list[0]))
def test_replicate_group(self, mock_create):
self.rest.replicate_group(
self.data.array, self.data.test_rep_group,
self.data.rdf_group_no, self.data.remote_array,
self.data.extra_specs)
mock_create.assert_called_once()
def test_enable_group_replication(self):
with mock.patch.object(self.rest,
'modify_storagegroup_rdf') as mock_mod:
self.provision.enable_group_replication(
self.data.array, self.data.test_vol_grp_name,
self.data.rdf_group_no, self.data.extra_specs)
mock_mod.assert_called_once()
def test_disable_group_replication(self):
with mock.patch.object(self.rest,
'modify_storagegroup_rdf') as mock_mod:
self.provision.disable_group_replication(
self.data.array, self.data.test_vol_grp_name,
self.data.rdf_group_no, self.data.extra_specs)
mock_mod.assert_called_once()
def test_failover_group(self):
with mock.patch.object(self.rest,
'modify_storagegroup_rdf') as mock_fo:
# Failover
self.provision.failover_group(
self.data.array, self.data.test_vol_grp_name,
self.data.rdf_group_no, self.data.extra_specs)
mock_fo.assert_called_once_with(
self.data.array, self.data.test_vol_grp_name,
self.data.rdf_group_no, 'Failover', self.data.extra_specs)
mock_fo.reset_mock()
# Failback
self.provision.failover_group(
self.data.array, self.data.test_vol_grp_name,
self.data.rdf_group_no, self.data.extra_specs, False)
mock_fo.assert_called_once_with(
self.data.array, self.data.test_vol_grp_name,
self.data.rdf_group_no, 'Failback', self.data.extra_specs)
@mock.patch.object(rest.PowerMaxRest, 'modify_storagegroup_rdf')
@mock.patch.object(rest.PowerMaxRest, 'delete_storagegroup_rdf')
def test_delete_group_replication(self, mock_mod, mock_del):
self.provision.delete_group_replication(
self.data.array, self.data.test_vol_grp_name,
self.data.rdf_group_no, self.data.extra_specs)
mock_mod.assert_called_once()
mock_del.assert_called_once()
@mock.patch.object(
rest.PowerMaxRest, 'get_snap_linked_device_list',
side_effect=[[{'targetDevice': tpd.PowerMaxData.device_id2}],
[{'targetDevice': tpd.PowerMaxData.device_id2},
{'targetDevice': tpd.PowerMaxData.device_id3}]])
@mock.patch.object(provision.PowerMaxProvision, '_unlink_volume')
def test_delete_volume_snap_check_for_links(self, mock_unlink, mock_tgts):
self.provision.delete_volume_snap_check_for_links(
self.data.array, self.data.test_snapshot_snap_name,
self.data.device_id, self.data.extra_specs)
mock_unlink.assert_called_once_with(
self.data.array, "", "", self.data.test_snapshot_snap_name,
self.data.extra_specs, list_volume_pairs=[
(self.data.device_id, tpd.PowerMaxData.device_id2)],
generation=0)
mock_unlink.reset_mock()
self.provision.delete_volume_snap_check_for_links(
self.data.array, self.data.test_snapshot_snap_name,
self.data.device_id, self.data.extra_specs)
self.assertEqual(2, mock_unlink.call_count)

View File

@ -0,0 +1,975 @@
# Copyright (c) 2017-2019 Dell Inc. or its subsidiaries.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ast
from copy import deepcopy
import mock
import six
from cinder import exception
from cinder import objects
from cinder.objects import fields
from cinder.objects import group
from cinder import test
from cinder.tests.unit.volume.drivers.dell_emc.powermax import (
powermax_data as tpd)
from cinder.tests.unit.volume.drivers.dell_emc.powermax import (
powermax_fake_objects as tpfo)
from cinder.volume.drivers.dell_emc.powermax import common
from cinder.volume.drivers.dell_emc.powermax import fc
from cinder.volume.drivers.dell_emc.powermax import iscsi
from cinder.volume.drivers.dell_emc.powermax import masking
from cinder.volume.drivers.dell_emc.powermax import provision
from cinder.volume.drivers.dell_emc.powermax import rest
from cinder.volume.drivers.dell_emc.powermax import utils
from cinder.volume import utils as volume_utils
class PowerMaxReplicationTest(test.TestCase):
def setUp(self):
self.data = tpd.PowerMaxData()
super(PowerMaxReplicationTest, self).setUp()
self.replication_device = {
'target_device_id': self.data.remote_array,
'remote_port_group': self.data.port_group_name_f,
'remote_pool': self.data.srp2,
'rdf_group_label': self.data.rdf_group_name,
'allow_extend': 'True'}
volume_utils.get_max_over_subscription_ratio = mock.Mock()
configuration = tpfo.FakeConfiguration(
None, 'CommonReplicationTests', 1, 1, san_ip='1.1.1.1',
san_login='smc', vmax_array=self.data.array, vmax_srp='SRP_1',
san_password='smc', san_api_port=8443,
vmax_port_groups=[self.data.port_group_name_f],
replication_device=self.replication_device)
rest.PowerMaxRest._establish_rest_session = mock.Mock(
return_value=tpfo.FakeRequestsSession())
driver = fc.PowerMaxFCDriver(configuration=configuration)
iscsi_config = tpfo.FakeConfiguration(
None, 'CommonReplicationTests', 1, 1, san_ip='1.1.1.1',
san_login='smc', vmax_array=self.data.array, vmax_srp='SRP_1',
san_password='smc', san_api_port=8443,
vmax_port_groups=[self.data.port_group_name_i],
replication_device=self.replication_device)
iscsi_driver = iscsi.PowerMaxISCSIDriver(configuration=iscsi_config)
self.iscsi_common = iscsi_driver.common
self.driver = driver
self.common = self.driver.common
self.masking = self.common.masking
self.provision = self.common.provision
self.rest = self.common.rest
self.utils = self.common.utils
self.utils.get_volumetype_extra_specs = (
mock.Mock(
return_value=self.data.vol_type_extra_specs_rep_enabled))
self.extra_specs = deepcopy(self.data.extra_specs_rep_enabled)
self.extra_specs['retries'] = 1
self.extra_specs['interval'] = 1
self.extra_specs['rep_mode'] = 'Synchronous'
self.async_rep_device = {
'target_device_id': self.data.remote_array,
'remote_port_group': self.data.port_group_name_f,
'remote_pool': self.data.srp2,
'rdf_group_label': self.data.rdf_group_name,
'allow_extend': 'True', 'mode': 'async'}
async_configuration = tpfo.FakeConfiguration(
None, 'CommonReplicationTests', 1, 1, san_ip='1.1.1.1',
san_login='smc', vmax_array=self.data.array, vmax_srp='SRP_1',
san_password='smc', san_api_port=8443,
vmax_port_groups=[self.data.port_group_name_f],
replication_device=self.async_rep_device)
self.async_driver = fc.PowerMaxFCDriver(
configuration=async_configuration)
self.metro_rep_device = {
'target_device_id': self.data.remote_array,
'remote_port_group': self.data.port_group_name_f,
'remote_pool': self.data.srp2,
'rdf_group_label': self.data.rdf_group_name,
'allow_extend': 'True', 'mode': 'metro'}
metro_configuration = tpfo.FakeConfiguration(
None, 'CommonReplicationTests', 1, 1, san_ip='1.1.1.1',
san_login='smc', vmax_array=self.data.array, vmax_srp='SRP_1',
san_password='smc', san_api_port=8443,
vmax_port_groups=[self.data.port_group_name_f],
replication_device=self.metro_rep_device)
self.metro_driver = fc.PowerMaxFCDriver(
configuration=metro_configuration)
def test_get_replication_info(self):
self.common._get_replication_info()
self.assertTrue(self.common.replication_enabled)
@mock.patch.object(volume_utils, 'is_group_a_cg_snapshot_type',
return_value=False)
@mock.patch.object(objects.group.Group, 'get_by_id',
return_value=tpd.PowerMaxData.test_rep_group)
@mock.patch.object(volume_utils, 'is_group_a_type', return_value=True)
@mock.patch.object(utils.PowerMaxUtils, 'check_replication_matched',
return_value=True)
@mock.patch.object(masking.PowerMaxMasking, 'add_volume_to_storage_group')
@mock.patch.object(
common.PowerMaxCommon, '_replicate_volume',
return_value=({
'replication_driver_data':
tpd.PowerMaxData.test_volume.replication_driver_data}, {}))
def test_create_replicated_volume(self, mock_rep, mock_add, mock_match,
mock_check, mock_get, mock_cg):
extra_specs = deepcopy(self.extra_specs)
extra_specs[utils.PORTGROUPNAME] = self.data.port_group_name_f
vol_identifier = self.utils.get_volume_element_name(
self.data.test_volume.id)
self.common.create_volume(self.data.test_volume)
volume_dict = self.data.provider_location
mock_rep.assert_called_once_with(
self.data.test_volume, vol_identifier, volume_dict,
extra_specs)
# Add volume to replication group
self.common.create_volume(self.data.test_volume_group_member)
mock_add.assert_called_once()
@mock.patch.object(
common.PowerMaxCommon, '_replicate_volume',
return_value=({
'replication_driver_data':
tpd.PowerMaxData.test_volume.replication_driver_data}, {}))
@mock.patch.object(utils.PowerMaxUtils, 'is_replication_enabled',
return_value=True)
@mock.patch.object(rest.PowerMaxRest, 'get_rdf_group_number',
side_effect=['4', None])
def test_create_replicated_vol_side_effect(
self, mock_rdf_no, mock_rep_enabled, mock_rep_vol):
self.common.rep_config = self.utils.get_replication_config(
[self.replication_device])
ref_rep_data = {'array': six.text_type(self.data.remote_array),
'device_id': self.data.device_id2}
ref_model_update = {
'provider_location': six.text_type(
self.data.test_volume.provider_location),
'replication_driver_data': six.text_type(ref_rep_data)}
model_update = self.common.create_volume(self.data.test_volume)
self.assertEqual(ref_model_update, model_update)
self.assertRaises(exception.VolumeBackendAPIException,
self.common.create_volume,
self.data.test_volume)
def test_create_cloned_replicated_volume(self):
extra_specs = deepcopy(self.extra_specs)
extra_specs[utils.PORTGROUPNAME] = self.data.port_group_name_f
with mock.patch.object(self.common, '_replicate_volume',
return_value=({}, {})) as mock_rep:
self.common.create_cloned_volume(
self.data.test_clone_volume, self.data.test_volume)
volume_dict = self.data.provider_location_clone
mock_rep.assert_called_once_with(
self.data.test_clone_volume,
self.data.test_clone_volume.name, volume_dict, extra_specs)
def test_create_replicated_volume_from_snap(self):
extra_specs = deepcopy(self.extra_specs)
extra_specs[utils.PORTGROUPNAME] = self.data.port_group_name_f
with mock.patch.object(self.common, '_replicate_volume',
return_value=({}, {})) as mock_rep:
self.common.create_volume_from_snapshot(
self.data.test_clone_volume, self.data.test_snapshot)
volume_dict = self.data.provider_location_snapshot
mock_rep.assert_called_once_with(
self.data.test_clone_volume,
'snapshot-%s' % self.data.snapshot_id, volume_dict,
extra_specs)
def test_replicate_volume(self):
volume_dict = self.data.provider_location
rs_enabled = fields.ReplicationStatus.ENABLED
with mock.patch.object(
self.common, 'setup_volume_replication',
return_value=(rs_enabled, {}, {})) as mock_setup:
self.common._replicate_volume(
self.data.test_volume, '1', volume_dict, self.extra_specs)
mock_setup.assert_called_once_with(
self.data.array, self.data.test_volume,
self.data.device_id, self.extra_specs)
def test_replicate_volume_exception(self):
volume_dict = self.data.provider_location
with mock.patch.object(
self.common, 'setup_volume_replication',
side_effect=exception.VolumeBackendAPIException(data='')):
with mock.patch.object(
self.common, '_cleanup_replication_source') as mock_clean:
self.assertRaises(
exception.VolumeBackendAPIException,
self.common._replicate_volume, self.data.test_volume,
'1', volume_dict, self.extra_specs)
mock_clean.assert_called_once_with(
self.data.array, self.data.test_volume, '1',
volume_dict, self.extra_specs)
@mock.patch.object(common.PowerMaxCommon, '_remove_members')
@mock.patch.object(
common.PowerMaxCommon, '_get_replication_extra_specs',
return_value=tpd.PowerMaxData.rep_extra_specs)
@mock.patch.object(
utils.PowerMaxUtils, 'is_volume_failed_over', return_value=True)
def test_unmap_lun_volume_failed_over(self, mock_fo, mock_es, mock_rm):
extra_specs = deepcopy(self.extra_specs)
extra_specs[utils.PORTGROUPNAME] = self.data.port_group_name_f
rep_config = self.utils.get_replication_config(
[self.replication_device])
self.common._unmap_lun(self.data.test_volume, self.data.connector)
mock_es.assert_called_once_with(extra_specs, rep_config)
@mock.patch.object(common.PowerMaxCommon, '_remove_members')
@mock.patch.object(
common.PowerMaxCommon, '_get_replication_extra_specs',
return_value=tpd.PowerMaxData.rep_extra_specs)
@mock.patch.object(
utils.PowerMaxUtils, 'is_metro_device', return_value=True)
def test_unmap_lun_metro(self, mock_md, mock_es, mock_rm):
extra_specs = deepcopy(self.extra_specs)
extra_specs[utils.PORTGROUPNAME] = self.data.port_group_name_f
self.common._unmap_lun(self.data.test_volume, self.data.connector)
self.assertEqual(2, mock_rm.call_count)
@mock.patch.object(
utils.PowerMaxUtils, 'is_volume_failed_over', return_value=True)
def test_initialize_connection_vol_failed_over(self, mock_fo):
extra_specs = deepcopy(self.extra_specs)
extra_specs[utils.PORTGROUPNAME] = self.data.port_group_name_f
rep_extra_specs = deepcopy(tpd.PowerMaxData.rep_extra_specs)
rep_extra_specs[utils.PORTGROUPNAME] = self.data.port_group_name_f
rep_config = self.utils.get_replication_config(
[self.replication_device])
with mock.patch.object(self.common, '_get_replication_extra_specs',
return_value=rep_extra_specs) as mock_es:
self.common.initialize_connection(
self.data.test_volume, self.data.connector)
mock_es.assert_called_once_with(extra_specs, rep_config)
@mock.patch.object(utils.PowerMaxUtils, 'is_metro_device',
return_value=True)
def test_initialize_connection_vol_metro(self, mock_md):
metro_connector = deepcopy(self.data.connector)
metro_connector['multipath'] = True
info_dict = self.common.initialize_connection(
self.data.test_volume, metro_connector)
ref_dict = {'array': self.data.array,
'device_id': self.data.device_id,
'hostlunid': 3,
'maskingview': self.data.masking_view_name_f,
'metro_hostlunid': 3}
self.assertEqual(ref_dict, info_dict)
@mock.patch.object(rest.PowerMaxRest, 'get_iscsi_ip_address_and_iqn',
return_value=([tpd.PowerMaxData.ip],
tpd.PowerMaxData.initiator))
@mock.patch.object(common.PowerMaxCommon, '_get_replication_extra_specs',
return_value=tpd.PowerMaxData.rep_extra_specs)
@mock.patch.object(utils.PowerMaxUtils, 'is_metro_device',
return_value=True)
def test_initialize_connection_vol_metro_iscsi(self, mock_md, mock_es,
mock_ip):
metro_connector = deepcopy(self.data.connector)
metro_connector['multipath'] = True
info_dict = self.iscsi_common.initialize_connection(
self.data.test_volume, metro_connector)
ref_dict = {'array': self.data.array,
'device_id': self.data.device_id,
'hostlunid': 3,
'maskingview': self.data.masking_view_name_f,
'ip_and_iqn': [{'ip': self.data.ip,
'iqn': self.data.initiator}],
'metro_hostlunid': 3,
'is_multipath': True,
'metro_ip_and_iqn': [{'ip': self.data.ip,
'iqn': self.data.initiator}]}
self.assertEqual(ref_dict, info_dict)
@mock.patch.object(utils.PowerMaxUtils, 'is_metro_device',
return_value=True)
def test_initialize_connection_no_multipath_iscsi(self, mock_md):
info_dict = self.iscsi_common.initialize_connection(
self.data.test_volume, self.data.connector)
self.assertIsNone(info_dict)
@mock.patch.object(
masking.PowerMaxMasking, 'pre_multiattach',
return_value=tpd.PowerMaxData.masking_view_dict_multiattach)
def test_attach_metro_volume(self, mock_pre):
rep_extra_specs = deepcopy(tpd.PowerMaxData.rep_extra_specs)
rep_extra_specs[utils.PORTGROUPNAME] = self.data.port_group_name_f
hostlunid, remote_port_group = self.common._attach_metro_volume(
self.data.test_volume, self.data.connector, False,
self.data.extra_specs, rep_extra_specs)
self.assertEqual(self.data.port_group_name_f, remote_port_group)
# Multiattach case
self.common._attach_metro_volume(
self.data.test_volume, self.data.connector, True,
self.data.extra_specs, rep_extra_specs)
mock_pre.assert_called_once()
@mock.patch.object(rest.PowerMaxRest, 'is_vol_in_rep_session',
return_value=(False, False, None))
@mock.patch.object(common.PowerMaxCommon, 'extend_volume_is_replicated')
@mock.patch.object(common.PowerMaxCommon, '_sync_check')
def test_extend_volume_rep_enabled(self, mock_sync, mock_ex_re,
mock_is_re):
extra_specs = deepcopy(self.extra_specs)
extra_specs[utils.PORTGROUPNAME] = self.data.port_group_name_f
volume_name = self.data.test_volume.name
self.common.extend_volume(self.data.test_volume, '5')
mock_ex_re.assert_called_once_with(
self.data.array, self.data.test_volume,
self.data.device_id, volume_name, '5', extra_specs)
def test_set_config_file_get_extra_specs_rep_enabled(self):
extra_specs, _ = self.common._set_config_file_and_get_extra_specs(
self.data.test_volume)
self.assertTrue(extra_specs['replication_enabled'])
def test_populate_masking_dict_is_re(self):
extra_specs = deepcopy(self.extra_specs)
extra_specs[utils.PORTGROUPNAME] = self.data.port_group_name_f
masking_dict = self.common._populate_masking_dict(
self.data.test_volume, self.data.connector, extra_specs)
self.assertTrue(masking_dict['replication_enabled'])
self.assertEqual('OS-HostX-SRP_1-DiamondDSS-OS-fibre-PG-RE',
masking_dict[utils.SG_NAME])
@mock.patch.object(common.PowerMaxCommon,
'_replicate_volume',
return_value=({}, {}))
def test_manage_existing_is_replicated(self, mock_rep):
extra_specs = deepcopy(self.extra_specs)
extra_specs[utils.PORTGROUPNAME] = self.data.port_group_name_f
external_ref = {u'source-name': u'00002'}
volume_name = self.utils.get_volume_element_name(
self.data.test_volume.id)
provider_location = {'device_id': u'00002', 'array': self.data.array}
with mock.patch.object(
self.common, '_check_lun_valid_for_cinder_management',
return_value=(volume_name, 'test_sg')):
self.common.manage_existing(
self.data.test_volume, external_ref)
mock_rep.assert_called_once_with(
self.data.test_volume, volume_name, provider_location,
extra_specs, delete_src=False)
@mock.patch.object(masking.PowerMaxMasking, 'remove_and_reset_members')
def test_setup_volume_replication(self, mock_rm):
rep_status, rep_data, __ = self.common.setup_volume_replication(
self.data.array, self.data.test_volume, self.data.device_id,
self.extra_specs)
self.assertEqual(fields.ReplicationStatus.ENABLED, rep_status)
self.assertEqual({'array': self.data.remote_array,
'device_id': self.data.device_id}, rep_data)
@mock.patch.object(masking.PowerMaxMasking, 'remove_and_reset_members')
@mock.patch.object(common.PowerMaxCommon, '_create_volume')
def test_setup_volume_replication_target(self, mock_create, mock_rm):
rep_status, rep_data, __ = self.common.setup_volume_replication(
self.data.array, self.data.test_volume, self.data.device_id,
self.extra_specs, self.data.device_id2)
self.assertEqual(fields.ReplicationStatus.ENABLED, rep_status)
self.assertEqual({'array': self.data.remote_array,
'device_id': self.data.device_id2}, rep_data)
mock_create.assert_not_called()
@mock.patch.object(common.PowerMaxCommon, 'get_rdf_details',
return_value=(tpd.PowerMaxData.rdf_group_no,
tpd.PowerMaxData.remote_array))
@mock.patch.object(rest.PowerMaxRest, 'get_size_of_device_on_array',
return_value=2)
@mock.patch.object(common.PowerMaxCommon, '_get_replication_extra_specs',
return_value=tpd.PowerMaxData.rep_extra_specs)
@mock.patch.object(common.PowerMaxCommon, '_create_volume',
return_value=tpd.PowerMaxData.provider_location)
@mock.patch.object(common.PowerMaxCommon, '_sync_check')
@mock.patch.object(rest.PowerMaxRest, 'create_rdf_device_pair',
return_value=tpd.PowerMaxData.rdf_group_details)
def test_setup_inuse_volume_replication(self, mck_create_rdf_pair,
mck_sync_chk, mck_create_vol,
mck_rep_specs, mck_get_vol_size,
mck_get_rdf_info):
array = self.data.array
device_id = self.data.device_id
volume = self.data.test_attached_volume
extra_specs = self.data.extra_specs_migrate
self.rep_config = self.data.rep_extra_specs4
rep_status, rep_data, __ = (
self.common.setup_inuse_volume_replication(
array, volume, device_id, extra_specs))
self.assertEqual('enabled', rep_status)
self.assertEqual(self.data.rdf_group_details, rep_data)
@mock.patch.object(common.PowerMaxCommon, '_cleanup_remote_target')
def test_cleanup_lun_replication_success(self, mock_clean):
rep_extra_specs = deepcopy(self.data.rep_extra_specs)
rep_extra_specs[utils.PORTGROUPNAME] = self.data.port_group_name_f
self.common.cleanup_lun_replication(
self.data.test_volume, '1', self.data.device_id,
self.extra_specs)
mock_clean.assert_called_once_with(
self.data.array, self.data.test_volume,
self.data.remote_array, self.data.device_id,
self.data.device_id2, self.data.rdf_group_no, '1',
rep_extra_specs)
# Cleanup legacy replication
self.common.cleanup_lun_replication(
self.data.test_legacy_vol, '1', self.data.device_id,
self.extra_specs)
mock_clean.assert_called_once_with(
self.data.array, self.data.test_volume,
self.data.remote_array, self.data.device_id,
self.data.device_id2, self.data.rdf_group_no, '1',
rep_extra_specs)
@mock.patch.object(common.PowerMaxCommon, '_cleanup_remote_target')
def test_cleanup_lun_replication_no_target(self, mock_clean):
with mock.patch.object(self.common, 'get_remote_target_device',
return_value=(None, '', '', '', '')):
self.common.cleanup_lun_replication(
self.data.test_volume, '1', self.data.device_id,
self.extra_specs)
mock_clean.assert_not_called()
@mock.patch.object(
common.PowerMaxCommon, 'get_remote_target_device',
return_value=(tpd.PowerMaxData.device_id2, '', '', '', ''))
@mock.patch.object(common.PowerMaxCommon,
'_add_volume_to_async_rdf_managed_grp')
def test_cleanup_lun_replication_exception(self, mock_add, mock_tgt):
self.assertRaises(exception.VolumeBackendAPIException,
self.common.cleanup_lun_replication,
self.data.test_volume, '1', self.data.device_id,
self.extra_specs)
# is metro or async volume
extra_specs = deepcopy(self.extra_specs)
extra_specs[utils.REP_MODE] = utils.REP_METRO
self.assertRaises(exception.VolumeBackendAPIException,
self.common.cleanup_lun_replication,
self.data.test_volume, '1', self.data.device_id,
extra_specs)
mock_add.assert_called_once()
@mock.patch.object(common.PowerMaxCommon, '_cleanup_metro_target')
@mock.patch.object(masking.PowerMaxMasking,
'remove_vol_from_storage_group')
@mock.patch.object(common.PowerMaxCommon, '_delete_from_srp')
@mock.patch.object(provision.PowerMaxProvision, 'break_rdf_relationship')
def test_cleanup_remote_target(self, mock_break, mock_del,
mock_rm, mock_clean_metro):
with mock.patch.object(self.rest, 'are_vols_rdf_paired',
return_value=(False, '', '')):
self.common._cleanup_remote_target(
self.data.array, self.data.test_volume,
self.data.remote_array, self.data.device_id,
self.data.device_id2, self.data.rdf_group_name,
'vol1', self.data.rep_extra_specs)
mock_break.assert_not_called()
self.common._cleanup_remote_target(
self.data.array, self.data.test_volume,
self.data.remote_array, self.data.device_id,
self.data.device_id2, self.data.rdf_group_name,
'vol1', self.data.rep_extra_specs)
mock_break.assert_called_once_with(
self.data.array, self.data.device_id,
self.data.device_id2, self.data.rdf_group_name,
self.data.rep_extra_specs, 'Synchronized')
# is metro volume
with mock.patch.object(self.utils, 'is_metro_device',
return_value=True):
self.common._cleanup_remote_target(
self.data.array, self.data.test_volume,
self.data.remote_array, self.data.device_id,
self.data.device_id2, self.data.rdf_group_name,
'vol1', self.data.rep_extra_specs)
mock_clean_metro.assert_called_once()
def test_cleanup_remote_target_exception(self):
extra_specs = deepcopy(self.data.rep_extra_specs)
extra_specs['mode'] = utils.REP_METRO
self.assertRaises(exception.VolumeBackendAPIException,
self.metro_driver.common._cleanup_remote_target,
self.data.array, self.data.test_volume,
self.data.remote_array,
self.data.device_id, self.data.device_id2,
self.data.rdf_group_name, 'vol1', extra_specs)
@mock.patch.object(provision.PowerMaxProvision, 'enable_group_replication')
@mock.patch.object(rest.PowerMaxRest, 'get_num_vols_in_sg',
side_effect=[2, 0])
def test_cleanup_metro_target(self, mock_vols, mock_enable):
# allow delete is True
specs = {'allow_del_metro': True}
for x in range(0, 2):
self.common._cleanup_metro_target(
self.data.array, self.data.device_id, self.data.device_id2,
self.data.rdf_group_no, specs)
mock_enable.assert_called_once()
# allow delete is False
specs['allow_del_metro'] = False
self.assertRaises(exception.VolumeBackendAPIException,
self.common._cleanup_metro_target,
self.data.array, self.data.device_id,
self.data.device_id2,
self.data.rdf_group_no, specs)
@mock.patch.object(common.PowerMaxCommon,
'_remove_vol_and_cleanup_replication')
@mock.patch.object(masking.PowerMaxMasking,
'remove_vol_from_storage_group')
@mock.patch.object(common.PowerMaxCommon, '_delete_from_srp')
def test_cleanup_replication_source(self, mock_del, mock_rm, mock_clean):
self.common._cleanup_replication_source(
self.data.array, self.data.test_volume, 'vol1',
{'device_id': self.data.device_id}, self.extra_specs)
mock_del.assert_called_once_with(
self.data.array, self.data.device_id, 'vol1', self.extra_specs)
def test_get_rdf_details(self):
rdf_group_no, remote_array = self.common.get_rdf_details(
self.data.array)
self.assertEqual(self.data.rdf_group_no, rdf_group_no)
self.assertEqual(self.data.remote_array, remote_array)
def test_get_rdf_details_exception(self):
with mock.patch.object(self.rest, 'get_rdf_group_number',
return_value=None):
self.assertRaises(exception.VolumeBackendAPIException,
self.common.get_rdf_details, self.data.array)
def test_failover_host(self):
volumes = [self.data.test_volume, self.data.test_clone_volume]
with mock.patch.object(self.common, '_failover_replication',
return_value=(None, {})) as mock_fo:
self.common.failover_host(volumes)
mock_fo.assert_called_once()
@mock.patch.object(common.PowerMaxCommon, 'failover_replication',
return_value=({}, {}))
def test_failover_host_groups(self, mock_fg):
volumes = [self.data.test_volume_group_member]
group1 = self.data.test_group
self.common.failover_host(volumes, None, [group1])
mock_fg.assert_called_once()
def test_get_remote_target_device(self):
target_device1, _, _, _, _ = (
self.common.get_remote_target_device(
self.data.array, self.data.test_volume, self.data.device_id))
self.assertEqual(self.data.device_id2, target_device1)
target_device2, _, _, _, _ = (
self.common.get_remote_target_device(
self.data.array, self.data.test_clone_volume,
self.data.device_id))
self.assertIsNone(target_device2)
with mock.patch.object(self.rest, 'are_vols_rdf_paired',
return_value=(False, '')):
target_device3, _, _, _, _ = (
self.common.get_remote_target_device(
self.data.array, self.data.test_volume,
self.data.device_id))
self.assertIsNone(target_device3)
with mock.patch.object(self.rest, 'get_volume',
return_value=None):
target_device4, _, _, _, _ = (
self.common.get_remote_target_device(
self.data.array, self.data.test_volume,
self.data.device_id))
self.assertIsNone(target_device4)
@mock.patch.object(common.PowerMaxCommon, 'setup_volume_replication')
@mock.patch.object(provision.PowerMaxProvision, 'extend_volume')
@mock.patch.object(provision.PowerMaxProvision, 'break_rdf_relationship')
@mock.patch.object(masking.PowerMaxMasking, 'remove_and_reset_members')
def test_extend_volume_is_replicated(self, mock_remove,
mock_break, mock_extend, mock_setup):
self.common.extend_volume_is_replicated(
self.data.array, self.data.test_volume, self.data.device_id,
'vol1', '5', self.data.extra_specs_rep_enabled)
self.assertEqual(2, mock_remove.call_count)
self.assertEqual(2, mock_extend.call_count)
mock_remove.reset_mock()
mock_extend.reset_mock()
with mock.patch.object(self.rest, 'is_next_gen_array',
return_value=True):
self.common.extend_volume_is_replicated(
self.data.array, self.data.test_volume, self.data.device_id,
'vol1', '5', self.data.extra_specs_rep_enabled)
mock_remove.assert_not_called()
self.assertEqual(2, mock_extend.call_count)
def test_extend_volume_is_replicated_exception(self):
self.assertRaises(exception.VolumeBackendAPIException,
self.common.extend_volume_is_replicated,
self.data.failed_resource, self.data.test_volume,
self.data.device_id, 'vol1', '1',
self.data.extra_specs_rep_enabled)
with mock.patch.object(self.utils, 'is_metro_device',
return_value=True):
self.assertRaises(exception.VolumeBackendAPIException,
self.common.extend_volume_is_replicated,
self.data.array, self.data.test_volume,
self.data.device_id, 'vol1', '1',
self.data.extra_specs_rep_enabled)
@mock.patch.object(common.PowerMaxCommon,
'add_volume_to_replication_group')
@mock.patch.object(masking.PowerMaxMasking, 'remove_and_reset_members')
def test_enable_rdf(self, mock_remove, mock_add):
rep_config = self.utils.get_replication_config(
[self.replication_device])
self.common.enable_rdf(
self.data.array, self.data.test_volume, self.data.device_id,
self.data.rdf_group_no, rep_config, 'OS-1',
self.data.remote_array, self.data.device_id2, self.extra_specs)
self.assertEqual(2, mock_remove.call_count)
self.assertEqual(2, mock_add.call_count)
@mock.patch.object(masking.PowerMaxMasking,
'remove_vol_from_storage_group')
@mock.patch.object(common.PowerMaxCommon, '_cleanup_remote_target')
def test_enable_rdf_exception(self, mock_cleanup, mock_rm):
rep_config = self.utils.get_replication_config(
[self.replication_device])
self.assertRaises(
exception.VolumeBackendAPIException, self.common.enable_rdf,
self.data.array, self.data.test_volume, self.data.device_id,
self.data.failed_resource, rep_config, 'OS-1',
self.data.remote_array, self.data.device_id2, self.extra_specs)
self.assertEqual(1, mock_cleanup.call_count)
def test_add_volume_to_replication_group(self):
sg_name = self.common.add_volume_to_replication_group(
self.data.array, self.data.device_id, 'vol1',
self.extra_specs)
self.assertEqual(self.data.default_sg_re_enabled, sg_name)
@mock.patch.object(masking.PowerMaxMasking,
'get_or_create_default_storage_group',
side_effect=exception.VolumeBackendAPIException)
def test_add_volume_to_replication_group_exception(self, mock_get):
self.assertRaises(
exception.VolumeBackendAPIException,
self.common.add_volume_to_replication_group,
self.data.array, self.data.device_id, 'vol1',
self.extra_specs)
def test_get_replication_extra_specs(self):
rep_config = self.utils.get_replication_config(
[self.replication_device])
# Path one - disable compression
extra_specs1 = deepcopy(self.extra_specs)
extra_specs1[utils.DISABLECOMPRESSION] = 'true'
ref_specs1 = deepcopy(self.data.rep_extra_specs2)
rep_extra_specs1 = self.common._get_replication_extra_specs(
extra_specs1, rep_config)
self.assertEqual(ref_specs1, rep_extra_specs1)
# Path two - disable compression, not all flash
ref_specs2 = deepcopy(self.data.rep_extra_specs2)
with mock.patch.object(self.rest, 'is_compression_capable',
return_value=False):
rep_extra_specs2 = self.common._get_replication_extra_specs(
extra_specs1, rep_config)
self.assertEqual(ref_specs2, rep_extra_specs2)
def test_get_replication_extra_specs_powermax(self):
rep_config = self.utils.get_replication_config(
[self.replication_device])
rep_specs = deepcopy(self.data.rep_extra_specs2)
extra_specs = deepcopy(self.extra_specs)
# SLO not valid, both SLO and Workload set to NONE
rep_specs['slo'] = None
rep_specs['workload'] = None
with mock.patch.object(self.provision, 'verify_slo_workload',
return_value=(False, False)):
rep_extra_specs = self.common._get_replication_extra_specs(
extra_specs, rep_config)
self.assertEqual(rep_specs, rep_extra_specs)
# SL valid, workload invalid, only workload set to NONE
rep_specs['slo'] = 'Diamond'
rep_specs['workload'] = None
with mock.patch.object(self.provision, 'verify_slo_workload',
return_value=(True, False)):
rep_extra_specs = self.common._get_replication_extra_specs(
extra_specs, rep_config)
self.assertEqual(rep_specs, rep_extra_specs)
def test_get_secondary_stats(self):
rep_config = self.utils.get_replication_config(
[self.replication_device])
array_map = self.common.get_attributes_from_cinder_config()
finalarrayinfolist = self.common._get_slo_workload_combinations(
array_map)
array_info = finalarrayinfolist[0]
ref_info = deepcopy(array_info)
ref_info['SerialNumber'] = six.text_type(rep_config['array'])
ref_info['srpName'] = rep_config['srp']
secondary_info = self.common.get_secondary_stats_info(
rep_config, array_info)
self.assertEqual(ref_info, secondary_info)
def test_replicate_group(self):
volume_model_update = {
'id': self.data.test_volume.id,
'provider_location': self.data.test_volume.provider_location}
vols_model_update = self.common._replicate_group(
self.data.array, [volume_model_update],
self.data.test_vol_grp_name, self.extra_specs)
ref_rep_data = {'array': self.data.remote_array,
'device_id': self.data.device_id2}
ref_vol_update = {
'id': self.data.test_volume.id,
'provider_location': self.data.test_volume.provider_location,
'replication_driver_data': ref_rep_data,
'replication_status': fields.ReplicationStatus.ENABLED}
# Decode string representations of dicts into dicts, because
# the string representations are randomly ordered and therefore
# hard to compare.
vols_model_update[0]['replication_driver_data'] = ast.literal_eval(
vols_model_update[0]['replication_driver_data'])
self.assertEqual(ref_vol_update, vols_model_update[0])
@mock.patch.object(volume_utils, 'is_group_a_cg_snapshot_type',
return_value=False)
@mock.patch.object(volume_utils, 'is_group_a_type', return_value=True)
def test_create_replicaton_group(self, mock_type, mock_cg_type):
ref_model_update = {
'status': fields.GroupStatus.AVAILABLE,
'replication_status': fields.ReplicationStatus.ENABLED}
model_update = self.common.create_group(None, self.data.test_group_1)
self.assertEqual(ref_model_update, model_update)
# Replication mode is async
self.assertRaises(exception.InvalidInput,
self.async_driver.common.create_group,
None, self.data.test_group_1)
def test_enable_replication(self):
# Case 1: Group not replicated
with mock.patch.object(volume_utils, 'is_group_a_type',
return_value=False):
self.assertRaises(NotImplementedError,
self.common.enable_replication,
None, self.data.test_group,
[self.data.test_volume])
with mock.patch.object(volume_utils, 'is_group_a_type',
return_value=True):
# Case 2: Empty group
model_update, __ = self.common.enable_replication(
None, self.data.test_group, [])
self.assertEqual({}, model_update)
# Case 3: Successfully enabled
model_update, __ = self.common.enable_replication(
None, self.data.test_group, [self.data.test_volume])
self.assertEqual(fields.ReplicationStatus.ENABLED,
model_update['replication_status'])
# Case 4: Exception
model_update, __ = self.common.enable_replication(
None, self.data.test_group_failed, [self.data.test_volume])
self.assertEqual(fields.ReplicationStatus.ERROR,
model_update['replication_status'])
def test_disable_replication(self):
# Case 1: Group not replicated
with mock.patch.object(volume_utils, 'is_group_a_type',
return_value=False):
self.assertRaises(NotImplementedError,
self.common.disable_replication,
None, self.data.test_group,
[self.data.test_volume])
with mock.patch.object(volume_utils, 'is_group_a_type',
return_value=True):
# Case 2: Empty group
model_update, __ = self.common.disable_replication(
None, self.data.test_group, [])
self.assertEqual({}, model_update)
# Case 3: Successfully disabled
model_update, __ = self.common.disable_replication(
None, self.data.test_group, [self.data.test_volume])
self.assertEqual(fields.ReplicationStatus.DISABLED,
model_update['replication_status'])
# Case 4: Exception
model_update, __ = self.common.disable_replication(
None, self.data.test_group_failed, [self.data.test_volume])
self.assertEqual(fields.ReplicationStatus.ERROR,
model_update['replication_status'])
def test_failover_replication(self):
with mock.patch.object(volume_utils, 'is_group_a_type',
return_value=True):
# Case 1: Empty group
model_update, __ = self.common.failover_replication(
None, self.data.test_group, [])
self.assertEqual({}, model_update)
# Case 2: Successfully failed over
model_update, __ = self.common.failover_replication(
None, self.data.test_group, [self.data.test_volume])
self.assertEqual(fields.ReplicationStatus.FAILED_OVER,
model_update['replication_status'])
# Case 3: Successfully failed back
model_update, __ = self.common.failover_replication(
None, self.data.test_group, [self.data.test_volume],
secondary_backend_id='default')
self.assertEqual(fields.ReplicationStatus.ENABLED,
model_update['replication_status'])
# Case 4: Exception
model_update, __ = self.common.failover_replication(
None, self.data.test_group_failed, [self.data.test_volume])
self.assertEqual(fields.ReplicationStatus.ERROR,
model_update['replication_status'])
@mock.patch.object(provision.PowerMaxProvision, 'failover_group')
def test_failover_replication_metro(self, mock_fo):
volumes = [self.data.test_volume]
_, vol_model_updates = self.common._failover_replication(
volumes, group, None, host=True, is_metro=True)
mock_fo.assert_not_called()
@mock.patch.object(utils.PowerMaxUtils, 'get_volume_group_utils',
return_value=(tpd.PowerMaxData.array, {}))
@mock.patch.object(common.PowerMaxCommon, '_cleanup_group_replication')
@mock.patch.object(volume_utils, 'is_group_a_type', return_value=True)
def test_delete_replication_group(self, mock_check,
mock_cleanup, mock_utils):
self.common._delete_group(self.data.test_rep_group, [])
mock_cleanup.assert_called_once()
@mock.patch.object(masking.PowerMaxMasking,
'remove_volumes_from_storage_group')
@mock.patch.object(utils.PowerMaxUtils, 'check_rep_status_enabled')
@mock.patch.object(common.PowerMaxCommon,
'_remove_remote_vols_from_volume_group')
@mock.patch.object(masking.PowerMaxMasking,
'add_remote_vols_to_volume_group')
@mock.patch.object(volume_utils, 'is_group_a_type', return_value=True)
@mock.patch.object(volume_utils, 'is_group_a_cg_snapshot_type',
return_value=True)
def test_update_replicated_group(self, mock_cg_type, mock_type_check,
mock_add, mock_remove, mock_check,
mock_rm):
add_vols = [self.data.test_volume]
remove_vols = [self.data.test_clone_volume]
self.common.update_group(
self.data.test_group_1, add_vols, remove_vols)
mock_add.assert_called_once()
mock_remove.assert_called_once()
@mock.patch.object(masking.PowerMaxMasking,
'remove_volumes_from_storage_group')
def test_remove_remote_vols_from_volume_group(self, mock_rm):
self.common._remove_remote_vols_from_volume_group(
self.data.remote_array, [self.data.test_volume],
self.data.test_rep_group, self.data.rep_extra_specs)
mock_rm.assert_called_once()
@mock.patch.object(masking.PowerMaxMasking, 'remove_and_reset_members')
@mock.patch.object(masking.PowerMaxMasking,
'remove_volumes_from_storage_group')
def test_cleanup_group_replication(self, mock_rm, mock_rm_reset):
self.common._cleanup_group_replication(
self.data.array, self.data.test_vol_grp_name,
[self.data.device_id], self.extra_specs)
mock_rm.assert_called_once()
@mock.patch.object(masking.PowerMaxMasking, 'add_volume_to_storage_group')
def test_add_volume_to_async_group(self, mock_add):
extra_specs = deepcopy(self.extra_specs)
extra_specs['rep_mode'] = utils.REP_ASYNC
self.async_driver.common._add_volume_to_async_rdf_managed_grp(
self.data.array, self.data.device_id, 'name',
self.data.remote_array, self.data.device_id2, extra_specs)
self.assertEqual(2, mock_add.call_count)
def test_add_volume_to_async_group_exception(self):
extra_specs = deepcopy(self.extra_specs)
extra_specs['rep_mode'] = utils.REP_ASYNC
self.assertRaises(
exception.VolumeBackendAPIException,
self.async_driver.common._add_volume_to_async_rdf_managed_grp,
self.data.failed_resource, self.data.device_id, 'name',
self.data.remote_array, self.data.device_id2, extra_specs)
@mock.patch.object(common.PowerMaxCommon,
'_add_volume_to_async_rdf_managed_grp')
@mock.patch.object(masking.PowerMaxMasking, 'remove_and_reset_members')
def test_setup_volume_replication_async(self, mock_rm, mock_add):
extra_specs = deepcopy(self.extra_specs)
extra_specs['rep_mode'] = utils.REP_ASYNC
rep_status, rep_data, __ = (
self.async_driver.common.setup_volume_replication(
self.data.array, self.data.test_volume,
self.data.device_id, extra_specs))
self.assertEqual(fields.ReplicationStatus.ENABLED, rep_status)
self.assertEqual({'array': self.data.remote_array,
'device_id': self.data.device_id}, rep_data)
mock_add.assert_called_once()
@mock.patch.object(common.PowerMaxCommon, '_failover_replication',
return_value=({}, {}))
def test_failover_host_async(self, mock_fg):
volumes = [self.data.test_volume]
extra_specs = deepcopy(self.extra_specs)
extra_specs['rep_mode'] = utils.REP_ASYNC
with mock.patch.object(common.PowerMaxCommon, '_initial_setup',
return_value=extra_specs):
self.async_driver.common.failover_host(volumes, None, [])
mock_fg.assert_called_once()
@mock.patch.object(common.PowerMaxCommon, '_retype_volume',
return_value=True)
@mock.patch.object(masking.PowerMaxMasking,
'remove_vol_from_storage_group')
@mock.patch.object(common.PowerMaxCommon, '_retype_remote_volume',
return_value=True)
@mock.patch.object(
common.PowerMaxCommon, 'setup_volume_replication',
return_value=('', tpd.PowerMaxData.provider_location2, ''))
@mock.patch.object(common.PowerMaxCommon,
'_remove_vol_and_cleanup_replication')
@mock.patch.object(utils.PowerMaxUtils, 'is_replication_enabled',
side_effect=[False, True, True, False, True, True])
def test_migrate_volume_replication(self, mock_re, mock_rm_rep,
mock_setup, mock_retype,
mock_rm, mock_rt):
new_type = {'extra_specs': {}}
for x in range(0, 3):
success, model_update = self.common._migrate_volume(
self.data.array, self.data.test_volume, self.data.device_id,
self.data.srp, 'OLTP', 'Silver', self.data.test_volume.name,
new_type, self.data.extra_specs)
self.assertTrue(success)
mock_rm_rep.assert_called_once()
mock_setup.assert_called_once()
mock_retype.assert_called_once()
@mock.patch.object(
common.PowerMaxCommon, '_get_replication_extra_specs',
return_value=tpd.PowerMaxData.extra_specs_rep_enabled)
@mock.patch.object(rest.PowerMaxRest, 'get_storage_groups_from_volume',
side_effect=[tpd.PowerMaxData.storagegroup_list,
['OS-SRP_1-Diamond-DSS-RE-SG']])
@mock.patch.object(common.PowerMaxCommon, '_retype_volume',
return_value=True)
def test_retype_volume_replication(self, mock_retype, mock_sg, mock_es):
for x in range(0, 2):
self.common._retype_remote_volume(
self.data.array, self.data.test_volume, self.data.device_id,
self.data.test_volume.name, utils.REP_SYNC,
True, self.data.extra_specs)
mock_retype.assert_called_once()

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,502 @@
# Copyright (c) 2017-2019 Dell Inc. or its subsidiaries.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from copy import deepcopy
import datetime
from ddt import data
from ddt import ddt
import mock
import six
from cinder import exception
from cinder.objects import fields
from cinder import test
from cinder.tests.unit.volume.drivers.dell_emc.powermax import (
powermax_data as tpd)
from cinder.tests.unit.volume.drivers.dell_emc.powermax import (
powermax_fake_objects as tpfo)
from cinder.volume.drivers.dell_emc.powermax import iscsi
from cinder.volume.drivers.dell_emc.powermax import rest
from cinder.volume.drivers.dell_emc.powermax import utils
from cinder.volume import utils as volume_utils
from cinder.volume import volume_types
@ddt
class PowerMaxUtilsTest(test.TestCase):
def setUp(self):
self.data = tpd.PowerMaxData()
volume_utils.get_max_over_subscription_ratio = mock.Mock()
super(PowerMaxUtilsTest, self).setUp()
configuration = tpfo.FakeConfiguration(
None, 'UtilsTests', 1, 1, san_ip='1.1.1.1', san_login='smc',
vmax_array=self.data.array, vmax_srp='SRP_1', san_password='smc',
san_api_port=8443, vmax_port_groups=[self.data.port_group_name_i])
rest.PowerMaxRest._establish_rest_session = mock.Mock(
return_value=tpfo.FakeRequestsSession())
driver = iscsi.PowerMaxISCSIDriver(configuration=configuration)
self.driver = driver
self.common = self.driver.common
self.utils = self.common.utils
def test_get_volumetype_extra_specs(self):
with mock.patch.object(volume_types, 'get_volume_type_extra_specs',
return_value={'specs'}) as type_mock:
# path 1: volume_type_id not passed in
self.data.test_volume.volume_type_id = (
self.data.test_volume_type.id)
self.utils.get_volumetype_extra_specs(self.data.test_volume)
type_mock.assert_called_once_with(self.data.test_volume_type.id)
type_mock.reset_mock()
# path 2: volume_type_id passed in
self.utils.get_volumetype_extra_specs(self.data.test_volume, '123')
type_mock.assert_called_once_with('123')
type_mock.reset_mock()
# path 3: no type_id
self.utils.get_volumetype_extra_specs(self.data.test_clone_volume)
type_mock.assert_not_called()
def test_get_volumetype_extra_specs_exception(self):
extra_specs = self.utils.get_volumetype_extra_specs(
{'name': 'no_type_id'})
self.assertEqual({}, extra_specs)
def test_get_host_short_name(self):
host_under_16_chars = 'host_13_chars'
host1 = self.utils.get_host_short_name(
host_under_16_chars)
self.assertEqual(host_under_16_chars, host1)
host_over_16_chars = (
'host_over_16_chars_host_over_16_chars_host_over_16_chars')
# Check that the same md5 value is retrieved from multiple calls
host2 = self.utils.get_host_short_name(
host_over_16_chars)
host3 = self.utils.get_host_short_name(
host_over_16_chars)
self.assertEqual(host2, host3)
host_with_period = 'hostname.with.many.parts'
ref_host_name = self.utils.generate_unique_trunc_host('hostname')
host4 = self.utils.get_host_short_name(host_with_period)
self.assertEqual(ref_host_name, host4)
def test_get_volume_element_name(self):
volume_id = 'ea95aa39-080b-4f11-9856-a03acf9112ad'
volume_element_name = self.utils.get_volume_element_name(volume_id)
expect_vol_element_name = ('OS-' + volume_id)
self.assertEqual(expect_vol_element_name, volume_element_name)
def test_truncate_string(self):
# string is less than max number
str_to_truncate = 'string'
response = self.utils.truncate_string(str_to_truncate, 10)
self.assertEqual(str_to_truncate, response)
def test_get_default_oversubscription_ratio(self):
default_ratio = 20.0
max_over_sub_ratio1 = 30.0
returned_max = self.utils.get_default_oversubscription_ratio(
max_over_sub_ratio1)
self.assertEqual(max_over_sub_ratio1, returned_max)
max_over_sub_ratio2 = 0.5
returned_max = self.utils.get_default_oversubscription_ratio(
max_over_sub_ratio2)
self.assertEqual(default_ratio, returned_max)
def test_get_default_storage_group_name_slo_workload(self):
srp_name = self.data.srp
slo = self.data.slo
workload = self.data.workload
sg_name = self.utils.get_default_storage_group_name(
srp_name, slo, workload)
self.assertEqual(self.data.defaultstoragegroup_name, sg_name)
def test_get_default_storage_group_name_no_slo(self):
srp_name = self.data.srp
slo = None
workload = None
sg_name = self.utils.get_default_storage_group_name(
srp_name, slo, workload)
self.assertEqual(self.data.default_sg_no_slo, sg_name)
def test_get_default_storage_group_name_compr_disabled(self):
srp_name = self.data.srp
slo = self.data.slo
workload = self.data.workload
sg_name = self.utils.get_default_storage_group_name(
srp_name, slo, workload, True)
self.assertEqual(self.data.default_sg_compr_disabled, sg_name)
def test_get_time_delta(self):
start_time = 1487781721.09
end_time = 1487781758.16
delta = end_time - start_time
ref_delta = six.text_type(datetime.timedelta(seconds=int(delta)))
time_delta = self.utils.get_time_delta(start_time, end_time)
self.assertEqual(ref_delta, time_delta)
def test_get_short_protocol_type(self):
# iscsi
short_i_protocol = self.utils.get_short_protocol_type('iscsi')
self.assertEqual('I', short_i_protocol)
# fc
short_f_protocol = self.utils.get_short_protocol_type('FC')
self.assertEqual('F', short_f_protocol)
# else
other_protocol = self.utils.get_short_protocol_type('OTHER')
self.assertEqual('OTHER', other_protocol)
def test_get_temp_snap_name(self):
source_device_id = self.data.device_id
ref_name = self.data.temp_snapvx
snap_name = self.utils.get_temp_snap_name(source_device_id)
self.assertEqual(ref_name, snap_name)
def test_get_array_and_device_id(self):
volume = deepcopy(self.data.test_volume)
external_ref = {u'source-name': u'00002'}
array, device_id = self.utils.get_array_and_device_id(
volume, external_ref)
self.assertEqual(self.data.array, array)
self.assertEqual('00002', device_id)
# Test to check if device id returned is in upper case
external_ref = {u'source-name': u'0028a'}
__, device_id = self.utils.get_array_and_device_id(
volume, external_ref)
ref_device_id = u'0028A'
self.assertEqual(ref_device_id, device_id)
def test_get_array_and_device_id_exception(self):
volume = deepcopy(self.data.test_volume)
external_ref = {u'source-name': None}
self.assertRaises(exception.VolumeBackendAPIException,
self.utils.get_array_and_device_id,
volume, external_ref)
@data({u'source-name': u'000001'}, {u'source-name': u'00028A'})
def test_get_array_and_device_id_invalid_long_id(self, external_ref):
volume = deepcopy(self.data.test_volume)
# Test for device id more than 5 digits
self.assertRaises(exception.VolumeBackendAPIException,
self.utils.get_array_and_device_id,
volume, external_ref)
@data({u'source-name': u'01'}, {u'source-name': u'028A'},
{u'source-name': u'0001'})
def test_get_array_and_device_id_invalid_short_id(self, external_ref):
volume = deepcopy(self.data.test_volume)
# Test for device id less than 5 digits
self.assertRaises(exception.VolumeBackendAPIException,
self.utils.get_array_and_device_id,
volume, external_ref)
def test_get_pg_short_name(self):
pg_under_12_chars = 'pg_11_chars'
pg1 = self.utils.get_pg_short_name(pg_under_12_chars)
self.assertEqual(pg_under_12_chars, pg1)
pg_over_12_chars = 'portgroup_over_12_characters'
# Check that the same md5 value is retrieved from multiple calls
pg2 = self.utils.get_pg_short_name(pg_over_12_chars)
pg3 = self.utils.get_pg_short_name(pg_over_12_chars)
self.assertEqual(pg2, pg3)
def test_is_compression_disabled_true(self):
extra_specs = self.data.extra_specs_disable_compression
do_disable_compression = self.utils.is_compression_disabled(
extra_specs)
self.assertTrue(do_disable_compression)
def test_is_compression_disabled_false(self):
# Path 1: no compression extra spec set
extra_specs = self.data.extra_specs
do_disable_compression = self.utils.is_compression_disabled(
extra_specs)
self.assertFalse(do_disable_compression)
# Path 2: compression extra spec set to false
extra_specs2 = deepcopy(extra_specs)
extra_specs2.update({utils.DISABLECOMPRESSION: 'false'})
do_disable_compression2 = self.utils.is_compression_disabled(
extra_specs)
self.assertFalse(do_disable_compression2)
def test_change_compression_type_true(self):
source_compr_disabled_true = 'true'
new_type_compr_disabled = {
'extra_specs': {utils.DISABLECOMPRESSION: 'no'}}
ans = self.utils.change_compression_type(
source_compr_disabled_true, new_type_compr_disabled)
self.assertTrue(ans)
def test_change_compression_type_false(self):
source_compr_disabled_true = True
new_type_compr_disabled = {
'extra_specs': {utils.DISABLECOMPRESSION: 'true'}}
ans = self.utils.change_compression_type(
source_compr_disabled_true, new_type_compr_disabled)
self.assertFalse(ans)
def test_is_replication_enabled(self):
is_re = self.utils.is_replication_enabled(
self.data.vol_type_extra_specs_rep_enabled)
self.assertTrue(is_re)
is_re2 = self.utils.is_replication_enabled(self.data.extra_specs)
self.assertFalse(is_re2)
def test_get_replication_config(self):
# Success, allow_extend false
rep_device_list1 = [{'target_device_id': self.data.remote_array,
'remote_pool': self.data.srp,
'remote_port_group': self.data.port_group_name_f,
'rdf_group_label': self.data.rdf_group_name}]
rep_config1 = self.utils.get_replication_config(rep_device_list1)
self.assertEqual(self.data.remote_array, rep_config1['array'])
# Success, allow_extend true
rep_device_list2 = rep_device_list1
rep_device_list2[0]['allow_extend'] = 'true'
rep_config2 = self.utils.get_replication_config(rep_device_list2)
self.assertTrue(rep_config2['allow_extend'])
# No rep_device_list
rep_device_list3 = []
rep_config3 = self.utils.get_replication_config(rep_device_list3)
self.assertIsNone(rep_config3)
# Exception
rep_device_list4 = [{'target_device_id': self.data.remote_array,
'remote_pool': self.data.srp}]
self.assertRaises(exception.VolumeBackendAPIException,
self.utils.get_replication_config, rep_device_list4)
# Success, mode is async
rep_device_list5 = rep_device_list2
rep_device_list5[0]['mode'] = 'async'
rep_config5 = self.utils.get_replication_config(rep_device_list5)
self.assertEqual(utils.REP_ASYNC, rep_config5['mode'])
# Success, mode is metro - no other options set
rep_device_list6 = rep_device_list5
rep_device_list6[0]['mode'] = 'metro'
rep_config6 = self.utils.get_replication_config(rep_device_list6)
self.assertFalse(rep_config6['metro_bias'])
self.assertFalse(rep_config6['allow_delete_metro'])
# Success, mode is metro - metro options true
rep_device_list7 = rep_device_list6
rep_device_list6[0].update(
{'allow_delete_metro': 'true', 'metro_use_bias': 'true'})
rep_config7 = self.utils.get_replication_config(rep_device_list7)
self.assertTrue(rep_config7['metro_bias'])
self.assertTrue(rep_config7['allow_delete_metro'])
def test_is_volume_failed_over(self):
vol = deepcopy(self.data.test_volume)
vol.replication_status = fields.ReplicationStatus.FAILED_OVER
is_fo1 = self.utils.is_volume_failed_over(vol)
self.assertTrue(is_fo1)
is_fo2 = self.utils.is_volume_failed_over(self.data.test_volume)
self.assertFalse(is_fo2)
is_fo3 = self.utils.is_volume_failed_over(None)
self.assertFalse(is_fo3)
def test_add_legacy_pools(self):
pools = [{'pool_name': 'Diamond+None+SRP_1+000197800111'},
{'pool_name': 'Diamond+OLTP+SRP_1+000197800111'}]
new_pools = self.utils.add_legacy_pools(pools)
ref_pools = [{'pool_name': 'Diamond+None+SRP_1+000197800111'},
{'pool_name': 'Diamond+OLTP+SRP_1+000197800111'},
{'pool_name': 'Diamond+SRP_1+000197800111'}]
self.assertEqual(ref_pools, new_pools)
def test_update_volume_group_name(self):
group = self.data.test_group_1
ref_group_name = self.data.test_vol_grp_name
vol_grp_name = self.utils.update_volume_group_name(group)
self.assertEqual(ref_group_name, vol_grp_name)
def test_update_volume_group_name_id_only(self):
group = self.data.test_group_without_name
ref_group_name = self.data.test_vol_grp_name_id_only
vol_grp_name = self.utils.update_volume_group_name(group)
self.assertEqual(ref_group_name, vol_grp_name)
def test_get_volume_group_utils(self):
array, intervals_retries = self.utils.get_volume_group_utils(
self.data.test_group_1, interval=1, retries=1)
ref_array = self.data.array
self.assertEqual(ref_array, array)
def test_update_volume_model_updates(self):
volume_model_updates = [{'id': '1', 'status': 'available'}]
volumes = [self.data.test_volume]
ref_val = {'id': self.data.test_volume.id,
'status': 'error_deleting'}
ret_val = self.utils.update_volume_model_updates(
volume_model_updates, volumes, 'abc', status='error_deleting')
self.assertEqual(ref_val, ret_val[1])
def test_update_volume_model_updates_empty_update_list(self):
volume_model_updates = []
volumes = [self.data.test_volume]
ref_val = [{'id': self.data.test_volume.id,
'status': 'available'}]
ret_val = self.utils.update_volume_model_updates(
volume_model_updates, volumes, 'abc')
self.assertEqual(ref_val, ret_val)
def test_update_volume_model_updates_empty_vol_list(self):
volume_model_updates, volumes, ref_val = [], [], []
ret_val = self.utils.update_volume_model_updates(
volume_model_updates, volumes, 'abc')
self.assertEqual(ref_val, ret_val)
def test_check_replication_matched(self):
# Check 1: Volume is not part of a group
self.utils.check_replication_matched(
self.data.test_volume, self.data.extra_specs)
group_volume = deepcopy(self.data.test_volume)
group_volume.group = self.data.test_group
with mock.patch.object(volume_utils, 'is_group_a_type',
return_value=False):
# Check 2: Both volume and group have the same rep status
self.utils.check_replication_matched(
group_volume, self.data.extra_specs)
# Check 3: Volume and group have different rep status
with mock.patch.object(self.utils, 'is_replication_enabled',
return_value=True):
self.assertRaises(exception.InvalidInput,
self.utils.check_replication_matched,
group_volume, self.data.extra_specs)
def test_check_rep_status_enabled(self):
# Check 1: not replication enabled
with mock.patch.object(volume_utils, 'is_group_a_type',
return_value=False):
self.utils.check_rep_status_enabled(self.data.test_group)
# Check 2: replication enabled, status enabled
with mock.patch.object(volume_utils, 'is_group_a_type',
return_value=True):
self.utils.check_rep_status_enabled(self.data.test_rep_group)
# Check 3: replication enabled, status disabled
self.assertRaises(exception.InvalidInput,
self.utils.check_rep_status_enabled,
self.data.test_group)
def test_get_replication_prefix(self):
async_prefix = self.utils.get_replication_prefix(utils.REP_ASYNC)
self.assertEqual('-RA', async_prefix)
sync_prefix = self.utils.get_replication_prefix(utils.REP_SYNC)
self.assertEqual('-RE', sync_prefix)
metro_prefix = self.utils.get_replication_prefix(utils.REP_METRO)
self.assertEqual('-RM', metro_prefix)
def test_get_async_rdf_managed_grp_name(self):
rep_config = {'rdf_group_label': self.data.rdf_group_name,
'mode': utils.REP_ASYNC}
grp_name = self.utils.get_async_rdf_managed_grp_name(rep_config)
self.assertEqual(self.data.rdf_managed_async_grp, grp_name)
def test_is_metro_device(self):
rep_config = {'mode': utils.REP_METRO}
is_metro = self.utils.is_metro_device(
rep_config, self.data.rep_extra_specs)
self.assertTrue(is_metro)
rep_config2 = {'mode': utils.REP_ASYNC}
is_metro2 = self.utils.is_metro_device(
rep_config2, self.data.rep_extra_specs)
self.assertFalse(is_metro2)
def test_does_vol_need_rdf_management_group(self):
self.assertFalse(self.utils.does_vol_need_rdf_management_group(
self.data.rep_extra_specs))
extra_specs = deepcopy(self.data.rep_extra_specs)
extra_specs[utils.REP_MODE] = utils.REP_ASYNC
self.assertTrue(self.utils.does_vol_need_rdf_management_group(
extra_specs))
def test_modify_snapshot_prefix_manage(self):
snap_name = self.data.snapshot_id
expected_snap_name = self.data.managed_snap_id
updated_name = self.utils.modify_snapshot_prefix(
snap_name, manage=True)
self.assertEqual(expected_snap_name, updated_name)
def test_modify_snapshot_prefix_unmanage(self):
snap_name = self.data.managed_snap_id
expected_snap_name = self.data.snapshot_id
updated_name = self.utils.modify_snapshot_prefix(
snap_name, unmanage=True)
self.assertEqual(expected_snap_name, updated_name)
def test_change_replication(self):
new_type = {'extra_specs': self.data.extra_specs_rep_enabled}
self.assertFalse(self.utils.change_replication(True, new_type))
self.assertTrue(self.utils.change_replication(False, new_type))
def test_get_child_sg_name(self):
host_name = 'HostX'
# Slo and rep enabled
extra_specs1 = self.data.extra_specs_rep_enabled
extra_specs1[utils.PORTGROUPNAME] = self.data.port_group_name_f
child_sg_name, do_disable_compression, rep_enabled, pg_name = (
self.utils.get_child_sg_name(host_name, extra_specs1))
re_name = self.data.storagegroup_name_f + '-RE'
self.assertEqual(re_name, child_sg_name)
# Disable compression
extra_specs2 = self.data.extra_specs_disable_compression
extra_specs2[utils.PORTGROUPNAME] = self.data.port_group_name_f
child_sg_name, do_disable_compression, rep_enabled, pg_name = (
self.utils.get_child_sg_name(host_name, extra_specs2))
cd_name = self.data.storagegroup_name_f + '-CD'
self.assertEqual(cd_name, child_sg_name)
# No slo
extra_specs3 = deepcopy(self.data.extra_specs)
extra_specs3[utils.SLO] = None
extra_specs3[utils.PORTGROUPNAME] = self.data.port_group_name_f
child_sg_name, do_disable_compression, rep_enabled, pg_name = (
self.utils.get_child_sg_name(host_name, extra_specs3))
self.assertEqual(self.data.no_slo_sg_name, child_sg_name)
def test_change_multiattach(self):
extra_specs_ma_true = {'multiattach': '<is> True'}
extra_specs_ma_false = {'multiattach': '<is> False'}
self.assertTrue(self.utils.change_multiattach(
extra_specs_ma_true, extra_specs_ma_false))
self.assertFalse(self.utils.change_multiattach(
extra_specs_ma_true, extra_specs_ma_true))
self.assertFalse(self.utils.change_multiattach(
extra_specs_ma_false, extra_specs_ma_false))
def test_is_volume_manageable(self):
for volume in self.data.priv_vol_func_response_multi:
self.assertTrue(
self.utils.is_volume_manageable(volume))
for volume in self.data.priv_vol_func_response_multi_invalid:
self.assertFalse(
self.utils.is_volume_manageable(volume))
def test_is_snapshot_manageable(self):
for volume in self.data.priv_vol_func_response_multi:
self.assertTrue(
self.utils.is_snapshot_manageable(volume))
for volume in self.data.priv_vol_func_response_multi_invalid:
self.assertFalse(
self.utils.is_snapshot_manageable(volume))
def test_get_volume_attached_hostname(self):
device_info_pass = self.data.volume_details_attached
# Success
hostname = self.utils.get_volume_attached_hostname(device_info_pass)
self.assertEqual('HostX', hostname)
# Fail
device_info_fail = self.data.volume_details_no_sg
hostname = self.utils.get_volume_attached_hostname(device_info_fail)
self.assertIsNone(hostname)