cinder/cinder/tests/unit/volume/test_driver.py
Gorka Eguileor ef741228d8 Report tri-state shared_targets for NVMe volumes
NVMe-oF drivers that share the subsystem have the same race condition
issue that iSCSI volumes that share targets do.

The race condition is caused by AER messages that trigger automatic
rescans on the connector host side in both cases.

For iSCSI we added a feature on the Open-iSCSI project that allowed
disabling these scans, and added support for it in os-brick.

Since manual scans is a new feature that may be missing in a host's
iSCSI client, cinder has a flag in volumes to indicate when they use
shared targets.  Using that flag os-brick consumers can use the
"guard_connection" context manager to ensure race conditions don't
happen.

The race condition is prevented by os-brick using manual scans if they
are available in the iSCSI client, or a file lock if not.

The problem we face now is that we also want to use the lock for NVMe-oF
volumes that share a subsystem for multiple namespaces (there is no way
to disable automatic scans), but cinder doesn't (and shouldn't) expose
the actual storage protocol on the volume resource, so we need to
leverage the "shared_targets" parameter.

So with a single boolean value we need to encode 3 possible options:

- Don't use locks because targets/subystems are not shared
- Use locks if iSCSI client doesn't support automatic connections
- Always use locks (for example for NVMe-oF)

The only option we have is using the "None" value as well. That way we
can encode 3 different cases.

But we have an additional restriction, "True" is already taken for the
iSCSI case, because there will exist volumes in the database that
already have that value stored.

And making guard_connection always lock when shared_targets is set to
True will introduce the bottleneck from bug (#1800515).

That leaves us with the "None" value to force the use of locks.

So we end up with the following tristate for "shared_targets":

- True to use lock if iSCSI initiator doesn't support manual scans
- False means that os-brick should never lock.
- None means that os-brick should always lock.

The alternative to this encoding would be to have an online data
migration for volumes to change "True" to "None", and accept that there
could be race conditions during the rolling upgrade (because os-brick on
computes will interpret "None" as "False").

Since "in theory" Cinder was only returning True or False for the
"shared_target", we add a new microversion with number 3.69 that returns
null when the value is internally set to None.

The patch also updates the database with a migration, though it looks
like it's not necessary since the DB already allows null values, but it
seems more correct to make sure that's always the case.

This patch doesn't close but #1961102 because the os-brick patch is
needed for that.

Related-Bug: #1961102
Change-Id: I8cda6d9830f39e27ac700b1d8796fe0489fd7c0a
2022-05-24 15:13:23 +02:00

683 lines
30 KiB
Python

# Copyright (c) 2016 Red Hat Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Tests for Volume Code."""
import shutil
import tempfile
from unittest import mock
import ddt
import os_brick
from oslo_config import cfg
from oslo_utils import importutils
from cinder.brick.local_dev import lvm as brick_lvm
from cinder import context
from cinder import db
from cinder import exception
from cinder.image import image_utils
from cinder import objects
from cinder.objects import fields
import cinder.policy
from cinder.tests.unit import fake_constants as fake
from cinder.tests.unit import fake_snapshot
from cinder.tests.unit import fake_volume
from cinder.tests.unit.image import fake as fake_image
from cinder.tests.unit import test
from cinder.tests.unit import utils as tests_utils
import cinder.volume
from cinder.volume import configuration as conf
from cinder.volume import driver
from cinder.volume import manager
from cinder.volume import rpcapi as volume_rpcapi
import cinder.volume.targets.tgt
from cinder.volume import volume_utils
CONF = cfg.CONF
def my_safe_get(self, value):
if value == 'replication_device':
return ['replication']
return None
@ddt.ddt
class DriverTestCase(test.TestCase):
@staticmethod
def _get_driver(relicated, version):
class NonReplicatedDriver(driver.VolumeDriver):
pass
class V21Driver(driver.VolumeDriver):
def failover_host(*args, **kwargs):
pass
class AADriver(V21Driver):
def failover_completed(*args, **kwargs):
pass
if not relicated:
return NonReplicatedDriver
if version == 'v2.1':
return V21Driver
return AADriver
@ddt.data('v2.1', 'a/a', 'newfeature')
def test_supports_replication_feature_none(self, rep_version):
my_driver = self._get_driver(False, None)
self.assertFalse(my_driver.supports_replication_feature(rep_version))
@ddt.data('v2.1', 'a/a', 'newfeature')
def test_supports_replication_feature_only_21(self, rep_version):
version = 'v2.1'
my_driver = self._get_driver(True, version)
self.assertEqual(rep_version == version,
my_driver.supports_replication_feature(rep_version))
@ddt.data('v2.1', 'a/a', 'newfeature')
def test_supports_replication_feature_aa(self, rep_version):
my_driver = self._get_driver(True, 'a/a')
self.assertEqual(rep_version in ('v2.1', 'a/a'),
my_driver.supports_replication_feature(rep_version))
def test_init_non_replicated(self):
config = manager.config.Configuration(manager.volume_manager_opts,
config_group='volume')
# No exception raised
self._get_driver(False, None)(configuration=config)
@ddt.data('v2.1', 'a/a')
@mock.patch('cinder.volume.configuration.Configuration.safe_get',
my_safe_get)
def test_init_replicated_non_clustered(self, version):
def append_config_values(self, volume_opts):
pass
config = manager.config.Configuration(manager.volume_manager_opts,
config_group='volume')
# No exception raised
self._get_driver(True, version)(configuration=config)
@mock.patch('cinder.volume.configuration.Configuration.safe_get',
my_safe_get)
def test_init_replicated_clustered_not_supported(self):
config = manager.config.Configuration(manager.volume_manager_opts,
config_group='volume')
# Raises exception because we are trying to run a replicated service
# in clustered mode but the driver doesn't support it.
self.assertRaises(exception.Invalid, self._get_driver(True, 'v2.1'),
configuration=config, cluster_name='mycluster')
@mock.patch('cinder.volume.configuration.Configuration.safe_get',
my_safe_get)
def test_init_replicated_clustered_supported(self):
config = manager.config.Configuration(manager.volume_manager_opts,
config_group='volume')
# No exception raised
self._get_driver(True, 'a/a')(configuration=config,
cluster_name='mycluster')
def test_failover(self):
"""Test default failover behavior of calling failover_host."""
my_driver = self._get_driver(True, 'a/a')()
with mock.patch.object(my_driver, 'failover_host') as failover_mock:
res = my_driver.failover(mock.sentinel.context,
mock.sentinel.volumes,
secondary_id=mock.sentinel.secondary_id,
groups=[])
self.assertEqual(failover_mock.return_value, res)
failover_mock.assert_called_once_with(mock.sentinel.context,
mock.sentinel.volumes,
mock.sentinel.secondary_id,
[])
class BaseDriverTestCase(test.TestCase):
"""Base Test class for Drivers."""
driver_name = "cinder.volume.driver.FakeBaseDriver"
def setUp(self):
super(BaseDriverTestCase, self).setUp()
vol_tmpdir = tempfile.mkdtemp()
self.override_config('volume_driver', self.driver_name,
conf.SHARED_CONF_GROUP)
self.override_config('volumes_dir', vol_tmpdir,
conf.SHARED_CONF_GROUP)
self.volume = importutils.import_object(CONF.volume_manager)
self.mock_object(self.volume, '_driver_shares_targets',
return_value=False)
self.context = context.get_admin_context()
self.output = ""
self.configuration = conf.Configuration(None)
self.mock_object(brick_lvm.LVM, '_vg_exists', lambda x: True)
def _fake_execute(_command, *_args, **_kwargs):
"""Fake _execute."""
return self.output, None
exec_patcher = mock.patch.object(self.volume.driver, '_execute',
_fake_execute)
exec_patcher.start()
self.addCleanup(exec_patcher.stop)
self.volume.driver.set_initialized()
self.addCleanup(self._cleanup)
def _cleanup(self):
try:
shutil.rmtree(CONF.volumes_dir)
except OSError:
pass
def _attach_volume(self):
"""Attach volumes to an instance."""
return []
@ddt.ddt
class GenericVolumeDriverTestCase(BaseDriverTestCase):
"""Test case for VolumeDriver."""
driver_name = "cinder.tests.fake_driver.FakeLoggingVolumeDriver"
def test_create_temp_cloned_volume(self):
with mock.patch.object(
self.volume.driver,
'create_cloned_volume') as mock_create_cloned_volume:
model_update = {'provider_location': 'dummy'}
mock_create_cloned_volume.return_value = model_update
vol = tests_utils.create_volume(self.context,
status='backing-up')
cloned_vol = self.volume.driver._create_temp_cloned_volume(
self.context, vol)
self.assertEqual('dummy', cloned_vol.provider_location)
self.assertEqual('available', cloned_vol.status)
mock_create_cloned_volume.return_value = None
vol = tests_utils.create_volume(self.context,
status='backing-up')
cloned_vol = self.volume.driver._create_temp_cloned_volume(
self.context, vol, status=fields.VolumeStatus.BACKING_UP)
self.assertEqual(fields.VolumeStatus.BACKING_UP, cloned_vol.status)
def test_get_backup_device_available(self):
vol = tests_utils.create_volume(self.context)
self.context.user_id = fake.USER_ID
self.context.project_id = fake.PROJECT_ID
backup_obj = tests_utils.create_backup(self.context,
vol['id'])
(backup_device, is_snapshot) = self.volume.driver.get_backup_device(
self.context, backup_obj)
volume = objects.Volume.get_by_id(self.context, vol.id)
self.assertNotIn('temporary', backup_device.admin_metadata.keys())
self.assertEqual(volume, backup_device)
self.assertFalse(is_snapshot)
backup_obj.refresh()
self.assertIsNone(backup_obj.temp_volume_id)
def test_get_backup_device_in_use(self):
vol = tests_utils.create_volume(self.context,
status='backing-up',
previous_status='in-use')
admin_meta = {'temporary': 'True'}
temp_vol = tests_utils.create_volume(
self.context, status=fields.VolumeStatus.BACKING_UP,
admin_metadata=admin_meta)
self.context.user_id = fake.USER_ID
self.context.project_id = fake.PROJECT_ID
backup_obj = tests_utils.create_backup(self.context,
vol['id'])
with mock.patch.object(
self.volume.driver,
'_create_temp_cloned_volume') as mock_create_temp:
mock_create_temp.return_value = temp_vol
(backup_device, is_snapshot) = (
self.volume.driver.get_backup_device(self.context,
backup_obj))
self.assertEqual(temp_vol, backup_device)
self.assertFalse(is_snapshot)
backup_obj.refresh()
self.assertEqual(temp_vol.id, backup_obj.temp_volume_id)
mock_create_temp.assert_called_once_with(
self.context, vol, status=fields.VolumeStatus.BACKING_UP)
def test_create_temp_volume_from_snapshot(self):
volume_dict = {'id': fake.SNAPSHOT_ID,
'host': 'fakehost',
'cluster_name': 'fakecluster',
'availability_zone': 'fakezone',
'size': 1,
'volume_type_id': fake.VOLUME_TYPE_ID}
vol = fake_volume.fake_volume_obj(self.context, **volume_dict)
snapshot = fake_snapshot.fake_snapshot_obj(self.context)
with mock.patch.object(
self.volume.driver,
'create_volume_from_snapshot'):
temp_vol = self.volume.driver._create_temp_volume_from_snapshot(
self.context,
vol, snapshot)
self.assertEqual(fields.VolumeAttachStatus.DETACHED,
temp_vol.attach_status)
self.assertEqual('fakezone', temp_vol.availability_zone)
self.assertEqual('fakecluster', temp_vol.cluster_name)
self.assertFalse(temp_vol.use_quota)
def test__create_temp_snapshot(self):
volume_dict = {'id': fake.SNAPSHOT_ID,
'host': 'fakehost',
'cluster_name': 'fakecluster',
'availability_zone': 'fakezone',
'size': 1,
'volume_type_id': fake.VOLUME_TYPE_ID}
volume = fake_volume.fake_volume_obj(self.context, **volume_dict)
# We want to confirm that the driver properly updates fields with the
# value returned by the create_snapshot method
driver_updates = {'provider_location': 'driver_provider_location'}
with mock.patch.object(self.volume.driver, 'create_snapshot',
return_value=driver_updates) as create_mock:
res = self.volume.driver._create_temp_snapshot(self.context,
volume)
create_mock.assert_called_once_with(res)
expected = {'volume_id': volume.id,
'progress': '100%',
'status': fields.SnapshotStatus.AVAILABLE,
'use_quota': False, # Temporary snapshots don't use quota
'project_id': self.context.project_id,
'user_id': self.context.user_id,
'volume_size': volume.size,
'volume_type_id': volume.volume_type_id,
'provider_location': 'driver_provider_location'}
for key, value in expected.items():
self.assertEqual(value, res[key])
@mock.patch.object(volume_utils, 'brick_get_connector_properties')
@mock.patch.object(cinder.volume.manager.VolumeManager, '_attach_volume')
@mock.patch.object(cinder.volume.manager.VolumeManager, '_detach_volume')
@mock.patch.object(volume_utils, 'copy_volume')
@mock.patch.object(volume_rpcapi.VolumeAPI, 'get_capabilities')
@mock.patch.object(cinder.volume.volume_types,
'volume_types_encryption_changed')
@ddt.data(False, True)
def test_copy_volume_data_mgr(self,
encryption_changed,
mock_encryption_changed,
mock_get_capabilities,
mock_copy,
mock_detach,
mock_attach,
mock_get_connector):
"""Test function of _copy_volume_data."""
src_vol = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
dest_vol = tests_utils.create_volume(self.context, size=1,
host=CONF.host)
mock_get_connector.return_value = {}
mock_encryption_changed.return_value = encryption_changed
self.volume.driver._throttle = mock.MagicMock()
attach_expected = [
mock.call(self.context, dest_vol, {},
remote=False,
attach_encryptor=encryption_changed),
mock.call(self.context, src_vol, {},
remote=False,
attach_encryptor=encryption_changed)]
detach_expected = [
mock.call(self.context, {'device': {'path': 'bar'}},
dest_vol, {}, force=True, remote=False,
attach_encryptor=encryption_changed),
mock.call(self.context, {'device': {'path': 'foo'}},
src_vol, {}, force=True, remote=False,
attach_encryptor=encryption_changed)]
attach_volume_returns = [
{'device': {'path': 'bar'}},
{'device': {'path': 'foo'}}
]
# Test case for sparse_copy_volume = False
mock_attach.side_effect = attach_volume_returns
mock_get_capabilities.return_value = {}
self.volume._copy_volume_data(self.context,
src_vol,
dest_vol)
self.assertEqual(attach_expected, mock_attach.mock_calls)
mock_copy.assert_called_with('foo', 'bar', 1024, '1M', sparse=False)
self.assertEqual(detach_expected, mock_detach.mock_calls)
# Test case for sparse_copy_volume = True
mock_attach.reset_mock()
mock_detach.reset_mock()
mock_attach.side_effect = attach_volume_returns
mock_get_capabilities.return_value = {'sparse_copy_volume': True}
self.volume._copy_volume_data(self.context,
src_vol,
dest_vol)
self.assertEqual(attach_expected, mock_attach.mock_calls)
mock_copy.assert_called_with('foo', 'bar', 1024, '1M', sparse=True)
self.assertEqual(detach_expected, mock_detach.mock_calls)
# cleanup resource
db.volume_destroy(self.context, src_vol['id'])
db.volume_destroy(self.context, dest_vol['id'])
@mock.patch(driver_name + '.initialize_connection')
@mock.patch(driver_name + '.create_export', return_value=None)
@mock.patch(driver_name + '._connect_device')
def test_attach_volume_encrypted(self, connect_mock, export_mock,
initialize_mock):
properties = {'host': 'myhost', 'ip': '192.168.1.43',
'initiator': u'iqn.1994-05.com.redhat:d9be887375',
'multipath': False, 'os_type': 'linux2',
'platform': 'x86_64'}
data = {'target_discovered': True,
'target_iqn': 'iqn.2010-10.org.openstack:volume-00000001',
'target_portal': '127.0.0.0.1:3260',
'volume_id': 1,
'discard': False}
passed_conn = {'driver_volume_type': 'iscsi', 'data': data.copy()}
initialize_mock.return_value = passed_conn
# _attach_volume adds the encrypted value based on the volume
expected_conn = {'driver_volume_type': 'iscsi', 'data': data.copy()}
expected_conn['data']['encrypted'] = True
volume = tests_utils.create_volume(
self.context, status='available',
size=2,
encryption_key_id=fake.ENCRYPTION_KEY_ID)
attach_info, vol = self.volume.driver._attach_volume(self.context,
volume,
properties)
export_mock.assert_called_once_with(self.context, volume, properties)
initialize_mock.assert_called_once_with(volume, properties)
connect_mock.assert_called_once_with(expected_conn)
self.assertEqual(connect_mock.return_value, attach_info)
self.assertEqual(volume, vol)
@mock.patch.object(os_brick.initiator.connector,
'get_connector_properties')
@mock.patch.object(image_utils, 'fetch_to_raw')
@mock.patch.object(cinder.volume.driver.VolumeDriver, '_attach_volume')
@mock.patch.object(cinder.volume.driver.VolumeDriver, '_detach_volume')
@mock.patch.object(cinder.volume.volume_utils,
'brick_attach_volume_encryptor')
@mock.patch.object(cinder.volume.volume_utils,
'brick_detach_volume_encryptor')
def test_copy_image_to_encrypted_volume(self,
mock_detach_encryptor,
mock_attach_encryptor,
mock_detach_volume,
mock_attach_volume,
mock_fetch_to_raw,
mock_get_connector_properties):
properties = {}
volume = tests_utils.create_volume(
self.context, status='available',
size=2,
encryption_key_id=fake.ENCRYPTION_KEY_ID)
volume_id = volume['id']
volume = db.volume_get(context.get_admin_context(), volume_id)
image_service = fake_image.FakeImageService()
local_path = 'dev/sda'
attach_info = {'device': {'path': local_path},
'conn': {'driver_volume_type': 'iscsi',
'data': {}, }}
mock_get_connector_properties.return_value = properties
mock_attach_volume.return_value = [attach_info, volume]
self.volume.driver.copy_image_to_encrypted_volume(
self.context, volume, image_service, fake.IMAGE_ID)
encryption = {'encryption_key_id': fake.ENCRYPTION_KEY_ID}
mock_attach_volume.assert_called_once_with(
self.context, volume, properties)
mock_attach_encryptor.assert_called_once_with(
self.context, attach_info, encryption)
mock_fetch_to_raw.assert_called_once_with(
self.context, image_service, fake.IMAGE_ID,
local_path, '1M', size=2)
mock_detach_encryptor.assert_called_once_with(
attach_info, encryption)
mock_detach_volume.assert_called_once_with(
self.context, attach_info, volume, properties, force=True)
@mock.patch.object(os_brick.initiator.connector,
'get_connector_properties')
@mock.patch.object(image_utils, 'fetch_to_raw')
@mock.patch.object(cinder.volume.driver.VolumeDriver, '_attach_volume')
@mock.patch.object(cinder.volume.driver.VolumeDriver, '_detach_volume')
@mock.patch.object(cinder.volume.volume_utils,
'brick_attach_volume_encryptor')
@mock.patch.object(cinder.volume.volume_utils,
'brick_detach_volume_encryptor')
def test_copy_image_to_encrypted_volume_failed_attach_encryptor(
self,
mock_detach_encryptor,
mock_attach_encryptor,
mock_detach_volume,
mock_attach_volume,
mock_fetch_to_raw,
mock_get_connector_properties):
properties = {}
volume = tests_utils.create_volume(
self.context, status='available',
size=2,
encryption_key_id=fake.ENCRYPTION_KEY_ID)
volume_id = volume['id']
volume = db.volume_get(context.get_admin_context(), volume_id)
image_service = fake_image.FakeImageService()
attach_info = {'device': {'path': 'dev/sda'},
'conn': {'driver_volume_type': 'iscsi',
'data': {}, }}
mock_get_connector_properties.return_value = properties
mock_attach_volume.return_value = [attach_info, volume]
raised_exception = os_brick.exception.VolumeEncryptionNotSupported(
volume_id = "123",
volume_type = "abc")
mock_attach_encryptor.side_effect = raised_exception
self.assertRaises(os_brick.exception.VolumeEncryptionNotSupported,
self.volume.driver.copy_image_to_encrypted_volume,
self.context, volume, image_service, fake.IMAGE_ID)
encryption = {'encryption_key_id': fake.ENCRYPTION_KEY_ID}
mock_attach_volume.assert_called_once_with(
self.context, volume, properties)
mock_attach_encryptor.assert_called_once_with(
self.context, attach_info, encryption)
self.assertFalse(mock_fetch_to_raw.called)
self.assertFalse(mock_detach_encryptor.called)
mock_detach_volume.assert_called_once_with(
self.context, attach_info, volume, properties, force=True)
@mock.patch.object(os_brick.initiator.connector,
'get_connector_properties')
@mock.patch.object(image_utils, 'fetch_to_raw')
@mock.patch.object(cinder.volume.driver.VolumeDriver, '_attach_volume')
@mock.patch.object(cinder.volume.driver.VolumeDriver, '_detach_volume')
@mock.patch.object(cinder.volume.volume_utils,
'brick_attach_volume_encryptor')
@mock.patch.object(cinder.volume.volume_utils,
'brick_detach_volume_encryptor')
@ddt.data(exception.ImageUnacceptable(
reason='fake', image_id=fake.IMAGE_ID),
exception.ImageTooBig(
reason='fake image size exceeded', image_id=fake.IMAGE_ID))
def test_copy_image_to_encrypted_volume_failed_fetch(
self, excep,
mock_detach_encryptor, mock_attach_encryptor,
mock_detach_volume, mock_attach_volume, mock_fetch_to_raw,
mock_get_connector_properties):
properties = {}
volume = tests_utils.create_volume(
self.context, status='available',
size=2,
encryption_key_id=fake.ENCRYPTION_KEY_ID)
volume_id = volume['id']
volume = db.volume_get(context.get_admin_context(), volume_id)
image_service = fake_image.FakeImageService()
local_path = 'dev/sda'
attach_info = {'device': {'path': local_path},
'conn': {'driver_volume_type': 'iscsi',
'data': {}, }}
mock_get_connector_properties.return_value = properties
mock_attach_volume.return_value = [attach_info, volume]
mock_fetch_to_raw.side_effect = excep
encryption = {'encryption_key_id': fake.ENCRYPTION_KEY_ID}
self.assertRaises(type(excep),
self.volume.driver.copy_image_to_encrypted_volume,
self.context, volume, image_service, fake.IMAGE_ID)
mock_attach_volume.assert_called_once_with(
self.context, volume, properties)
mock_attach_encryptor.assert_called_once_with(
self.context, attach_info, encryption)
mock_fetch_to_raw.assert_called_once_with(
self.context, image_service, fake.IMAGE_ID,
local_path, '1M', size=2)
mock_detach_encryptor.assert_called_once_with(
attach_info, encryption)
mock_detach_volume.assert_called_once_with(
self.context, attach_info, volume, properties, force=True)
@mock.patch('cinder.volume.driver.brick_exception')
@mock.patch('cinder.tests.fake_driver.FakeLoggingVolumeDriver.'
'terminate_connection', side_effect=Exception)
@mock.patch('cinder.tests.fake_driver.FakeLoggingVolumeDriver.'
'remove_export', side_effect=Exception)
def test_detach_volume_force(self, remove_mock, terminate_mock, exc_mock):
"""Test force parameter on _detach_volume.
On the driver if we receive the force parameter we will do everything
even with Exceptions on disconnect, terminate, and remove export.
"""
connector = mock.Mock()
connector.disconnect_volume.side_effect = Exception
# TODO(geguileo): Remove this ExceptionChainer simulation once we
# release OS-Brick version with it and bump min version.
exc = exc_mock.ExceptionChainer.return_value
exc.context.return_value.__enter__.return_value = exc
exc.context.return_value.__exit__.return_value = True
volume = {'id': fake.VOLUME_ID}
attach_info = {'device': {},
'connector': connector,
'conn': {'data': {}, }}
# TODO(geguileo): Change TypeError to ExceptionChainer once we release
# OS-Brick version with it and bump min version.
self.assertRaises(TypeError,
self.volume.driver._detach_volume, self.context,
attach_info, volume, {}, force=True)
self.assertTrue(connector.disconnect_volume.called)
self.assertTrue(remove_mock.called)
self.assertTrue(terminate_mock.called)
self.assertEqual(3, exc.context.call_count)
@ddt.data({'cfg_value': '10', 'valid': True},
{'cfg_value': 'auto', 'valid': True},
{'cfg_value': '1', 'valid': True},
{'cfg_value': '1.2', 'valid': True},
{'cfg_value': '100', 'valid': True},
{'cfg_value': '20.15', 'valid': True},
{'cfg_value': 'True', 'valid': False},
{'cfg_value': 'False', 'valid': False},
{'cfg_value': '10.0.0', 'valid': False},
{'cfg_value': '0.00', 'valid': True},
{'cfg_value': 'anything', 'valid': False},)
@ddt.unpack
def test_auto_max_subscription_ratio_options(self, cfg_value, valid):
# This tests the max_over_subscription_ratio option as it is now
# checked by a regex
def _set_conf(config, value):
config.set_override('max_over_subscription_ratio', value)
config = conf.Configuration(None)
config.append_config_values(driver.volume_opts)
if valid:
_set_conf(config, cfg_value)
self.assertEqual(cfg_value, config.safe_get(
'max_over_subscription_ratio'))
else:
self.assertRaises(ValueError, _set_conf, config, cfg_value)
class FibreChannelTestCase(BaseDriverTestCase):
"""Test Case for FibreChannelDriver."""
driver_name = "cinder.volume.driver.FibreChannelDriver"
def test_initialize_connection(self):
self.assertRaises(NotImplementedError,
self.volume.driver.initialize_connection, {}, {})
def test_validate_connector(self):
"""validate_connector() successful use case.
validate_connector() does not throw an exception when
wwpns and wwnns are both set and both are not empty.
"""
connector = {'wwpns': ["not empty"],
'wwnns': ["not empty"]}
self.volume.driver.validate_connector(connector)
def test_validate_connector_no_wwpns(self):
"""validate_connector() throws exception when it has no wwpns."""
connector = {'wwnns': ["not empty"]}
self.assertRaises(exception.InvalidConnectorException,
self.volume.driver.validate_connector, connector)
def test_validate_connector_empty_wwpns(self):
"""validate_connector() throws exception when it has empty wwpns."""
connector = {'wwpns': [],
'wwnns': ["not empty"]}
self.assertRaises(exception.InvalidConnectorException,
self.volume.driver.validate_connector, connector)
def test_validate_connector_no_wwnns(self):
"""validate_connector() throws exception when it has no wwnns."""
connector = {'wwpns': ["not empty"]}
self.assertRaises(exception.InvalidConnectorException,
self.volume.driver.validate_connector, connector)
def test_validate_connector_empty_wwnns(self):
"""validate_connector() throws exception when it has empty wwnns."""
connector = {'wwnns': [],
'wwpns': ["not empty"]}
self.assertRaises(exception.InvalidConnectorException,
self.volume.driver.validate_connector, connector)