Rework Scality SOFS driver to use RemoteFS class

This change is big but the logic is simple. The previous version of
this driver reimplemented most of what is now in the RemoteFS base
class. SOFS stands for Scale Out FileSystem and is based on FUSE, so
it's only natural to leverage the RemoteFS class.

Change-Id: I26935061e860a477f98a61a935e67281a3a6f48e
This commit is contained in:
Jordan Pittier 2015-07-17 11:48:23 +02:00
parent 30cd678b57
commit 8e53849075
2 changed files with 433 additions and 436 deletions

View File

@ -1,4 +1,5 @@
# Copyright (c) 2013 Scality
# Copyright (c) 2015 Scality
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -15,313 +16,333 @@
"""
Unit tests for the Scality SOFS Volume Driver.
"""
import errno
import os
import shutil
import tempfile
import mock
from mox3 import mox as mox_lib
from oslo_utils import units
from six.moves import urllib
from cinder import context
from cinder import exception
from cinder.image import image_utils
from cinder.openstack.common import imageutils
from cinder import test
from cinder.volume import configuration as conf
from cinder.volume.drivers import scality
import cinder.volume.drivers.scality as driver
_FAKE_VOLUME = {'name': 'volume-a79d463e-1fd5-11e5-a6ff-5b81bfee8544',
'id': 'a79d463e-1fd5-11e5-a6ff-5b81bfee8544',
'provider_location': 'fake_share'}
_FAKE_SNAPSHOT = {'id': 'ae3d6da2-1fd5-11e5-967f-1b8cf3b401ab',
'volume': _FAKE_VOLUME,
'status': 'available',
'provider_location': None,
'volume_size': 1,
'name': 'snapshot-ae3d6da2-1fd5-11e5-967f-1b8cf3b401ab'}
_FAKE_BACKUP = {'id': '914849d2-2585-11e5-be54-d70ca0c343d6',
'volume_id': _FAKE_VOLUME['id']}
_FAKE_MNT_POINT = '/tmp'
_FAKE_SOFS_CONFIG = '/etc/sfused.conf'
_FAKE_VOLUME_DIR = 'cinder/volumes'
_FAKE_VOL_BASEDIR = os.path.join(_FAKE_MNT_POINT, _FAKE_VOLUME_DIR, '00')
_FAKE_VOL_PATH = os.path.join(_FAKE_VOL_BASEDIR, _FAKE_VOLUME['name'])
_FAKE_SNAP_PATH = os.path.join(_FAKE_VOL_BASEDIR, _FAKE_SNAPSHOT['name'])
_FAKE_MOUNTS_TABLE = [['tmpfs /dev/shm\n'],
['fuse ' + _FAKE_MNT_POINT + '\n']]
class ScalityDriverTestCase(test.TestCase):
"""Test case for the Scality driver."""
TEST_MOUNT = '/tmp/fake_mount'
TEST_CONFIG = '/tmp/fake_config'
TEST_VOLDIR = 'volumes'
TEST_VOLNAME = 'volume_name'
TEST_VOLSIZE = '1'
TEST_VOLUME = {
'name': TEST_VOLNAME,
'size': TEST_VOLSIZE
}
TEST_VOLPATH = os.path.join(TEST_MOUNT,
TEST_VOLDIR,
TEST_VOLNAME)
TEST_SNAPNAME = 'snapshot_name'
TEST_SNAPSHOT = {
'name': TEST_SNAPNAME,
'volume_name': TEST_VOLNAME,
'volume_size': TEST_VOLSIZE
}
TEST_SNAPPATH = os.path.join(TEST_MOUNT,
TEST_VOLDIR,
TEST_SNAPNAME)
TEST_CLONENAME = 'clone_name'
TEST_CLONE = {
'name': TEST_CLONENAME,
'size': TEST_VOLSIZE
}
TEST_NEWSIZE = '2'
TEST_IMAGE_SERVICE = 'image_service'
TEST_IMAGE_ID = 'image_id'
TEST_IMAGE_META = 'image_meta'
def _makedirs(self, path):
try:
os.makedirs(path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def _create_fake_config(self):
open(self.TEST_CONFIG, "w+").close()
def _create_fake_mount(self):
self._makedirs(os.path.join(self.TEST_MOUNT, self.TEST_VOLDIR))
def _remove_fake_config(self):
try:
os.unlink(self.TEST_CONFIG)
except OSError as e:
if e.errno != errno.ENOENT:
raise
def _configure_driver(self):
self.configuration.scality_sofs_config = self.TEST_CONFIG
self.configuration.scality_sofs_mount_point = self.TEST_MOUNT
self.configuration.scality_sofs_volume_dir = self.TEST_VOLDIR
self.configuration.volume_dd_blocksize = '1M'
def _set_access_wrapper(self, is_visible):
def _access_wrapper(path, flags):
if path == '/sbin/mount.sofs':
return is_visible
else:
return os.access(path, flags)
self.stubs.Set(os, 'access', _access_wrapper)
def setUp(self):
self.tempdir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tempdir)
self.TEST_MOUNT = self.tempdir
self.TEST_VOLPATH = os.path.join(self.TEST_MOUNT,
self.TEST_VOLDIR,
self.TEST_VOLNAME)
self.TEST_SNAPPATH = os.path.join(self.TEST_MOUNT,
self.TEST_VOLDIR,
self.TEST_SNAPNAME)
self.TEST_CLONEPATH = os.path.join(self.TEST_MOUNT,
self.TEST_VOLDIR,
self.TEST_CLONENAME)
self.configuration = mox_lib.MockObject(conf.Configuration)
self._configure_driver()
super(ScalityDriverTestCase, self).setUp()
self._driver = scality.ScalityDriver(configuration=self.configuration)
self._driver.set_execute(lambda *args, **kwargs: None)
self._create_fake_mount()
self._create_fake_config()
self.addCleanup(self._remove_fake_config)
self.cfg = mock.Mock(spec=conf.Configuration)
self.cfg.scality_sofs_mount_point = _FAKE_MNT_POINT
self.cfg.scality_sofs_config = _FAKE_SOFS_CONFIG
self.cfg.scality_sofs_volume_dir = _FAKE_VOLUME_DIR
self.drv = driver.ScalityDriver(configuration=self.cfg)
self.drv.db = mock.Mock()
@mock.patch.object(driver.urllib.request, 'urlopen')
@mock.patch('os.access')
def test_check_for_setup_error(self, mock_os_access, mock_urlopen):
self.drv.check_for_setup_error()
mock_urlopen.assert_called_once_with('file://%s' % _FAKE_SOFS_CONFIG,
timeout=5)
mock_os_access.assert_called_once_with('/sbin/mount.sofs', os.X_OK)
def test_check_for_setup_error_with_no_sofs_config(self):
self.cfg.scality_sofs_config = ''
self.drv = driver.ScalityDriver(configuration=self.cfg)
def test_setup_no_config(self):
"""Missing SOFS configuration shall raise an error."""
self.configuration.scality_sofs_config = None
self.assertRaises(exception.VolumeBackendAPIException,
self._driver.do_setup, None)
self.drv.check_for_setup_error)
def test_setup_missing_config(self):
"""Non-existent SOFS configuration file shall raise an error."""
self.configuration.scality_sofs_config = 'nonexistent.conf'
@mock.patch.object(driver.urllib.request, 'urlopen')
def test_check_for_setup_error_with_urlerror(self, mock_urlopen):
# Add a Unicode char to be sure that the exception is properly
# handled even if it contains Unicode chars
mock_urlopen.side_effect = urllib.error.URLError(u'\u9535')
self.assertRaises(exception.VolumeBackendAPIException,
self._driver.do_setup, None)
self.drv.check_for_setup_error)
def test_setup_no_mount_helper(self):
"""SOFS must be installed to use the driver."""
self._set_access_wrapper(False)
@mock.patch.object(driver.urllib.request, 'urlopen')
def test_check_for_setup_error_with_httperror(self, mock_urlopen):
mock_urlopen.side_effect = urllib.error.HTTPError(*[None] * 5)
self.assertRaises(exception.VolumeBackendAPIException,
self._driver.do_setup, None)
self.drv.check_for_setup_error)
def test_setup_make_voldir(self):
"""The directory for volumes shall be created automatically."""
self._set_access_wrapper(True)
voldir_path = os.path.join(self.TEST_MOUNT, self.TEST_VOLDIR)
os.rmdir(voldir_path)
fake_mounts = [['tmpfs /dev/shm\n'],
['fuse ' + self.TEST_MOUNT + '\n']]
with mock.patch.object(scality.volume_utils, 'read_proc_mounts',
side_effect=fake_mounts) as mock_get_mounts:
self._driver.do_setup(None)
self.assertEqual(2, mock_get_mounts.call_count)
self.assertTrue(os.path.isdir(voldir_path))
@mock.patch.object(driver.urllib.request, 'urlopen', mock.Mock())
@mock.patch('os.access')
def test_check_for_setup_error_with_no_mountsofs(self, mock_os_access):
mock_os_access.return_value = False
self.assertRaises(exception.VolumeBackendAPIException,
self.drv.check_for_setup_error)
mock_os_access.assert_called_once_with('/sbin/mount.sofs', os.X_OK)
def test_local_path(self):
"""Expected behaviour for local_path."""
self.assertEqual(self.TEST_VOLPATH,
self._driver.local_path(self.TEST_VOLUME))
def test_load_shares_config(self):
self.assertEqual({}, self.drv.shares)
self.drv._load_shares_config()
self.assertEqual({_FAKE_VOLUME_DIR: None}, self.drv.shares)
def test_create_volume(self):
"""Expected behaviour for create_volume."""
ret = self._driver.create_volume(self.TEST_VOLUME)
self.assertEqual(os.path.join(self.TEST_VOLDIR,
self.TEST_VOLNAME),
ret['provider_location'])
self.assertTrue(os.path.isfile(self.TEST_VOLPATH))
self.assertEqual(1 * units.Gi,
os.stat(self.TEST_VOLPATH).st_size)
def test_get_mount_point_for_share(self):
self.assertEqual(_FAKE_VOL_BASEDIR,
self.drv._get_mount_point_for_share())
def test_delete_volume(self):
"""Expected behaviour for delete_volume."""
self._driver.create_volume(self.TEST_VOLUME)
self._driver.delete_volume(self.TEST_VOLUME)
self.assertFalse(os.path.isfile(self.TEST_VOLPATH))
@mock.patch("cinder.volume.utils.read_proc_mounts")
@mock.patch("oslo_concurrency.processutils.execute")
def test_ensure_share_mounted_when_mount_failed(self, mock_execute,
mock_read_proc_mounts):
mock_read_proc_mounts.return_value = ['tmpfs /dev/shm\n']
self.assertRaises(exception.VolumeBackendAPIException,
self.drv._ensure_share_mounted)
self.assertEqual(2, mock_read_proc_mounts.call_count)
self.assertEqual(1, mock_execute.call_count)
def test_create_snapshot(self):
"""Expected behaviour for create_snapshot."""
mox = self.mox
@mock.patch("cinder.volume.utils.read_proc_mounts")
@mock.patch("oslo_concurrency.processutils.execute")
@mock.patch("oslo_utils.fileutils.ensure_tree")
@mock.patch("os.symlink")
def test_ensure_shares_mounted(self, mock_symlink, mock_ensure_tree,
mock_execute, mock_read_proc_mounts):
self.assertEqual([], self.drv._mounted_shares)
vol_size = self._driver._size_bytes(self.TEST_VOLSIZE)
mock_read_proc_mounts.side_effect = _FAKE_MOUNTS_TABLE
mox.StubOutWithMock(self._driver, '_create_file')
self._driver._create_file(self.TEST_SNAPPATH, vol_size)
mox.StubOutWithMock(self._driver, '_copy_file')
self._driver._copy_file(self.TEST_VOLPATH, self.TEST_SNAPPATH)
self.drv._ensure_shares_mounted()
mox.ReplayAll()
self.assertEqual([_FAKE_VOLUME_DIR], self.drv._mounted_shares)
self.assertEqual(2, mock_read_proc_mounts.call_count)
mock_symlink.assert_called_once_with('.', _FAKE_VOL_BASEDIR)
self.assertEqual(2, mock_ensure_tree.call_count)
self.assertEqual(1, mock_execute.call_count)
expected_args = ('mount', '-t', 'sofs', _FAKE_SOFS_CONFIG,
_FAKE_MNT_POINT)
self.assertEqual(expected_args, mock_execute.call_args[0])
self._driver.create_snapshot(self.TEST_SNAPSHOT)
def test_find_share_when_no_shares_mounted(self):
self.assertRaises(exception.RemoteFSNoSharesMounted,
self.drv._find_share, 'ignored')
def test_delete_snapshot(self):
"""Expected behaviour for delete_snapshot."""
mox = self.mox
@mock.patch("cinder.volume.utils.read_proc_mounts")
@mock.patch("oslo_concurrency.processutils.execute")
@mock.patch("oslo_utils.fileutils.ensure_tree")
@mock.patch("os.symlink")
def test_find_share(self, mock_symlink, mock_ensure_tree, mock_execute,
mock_read_proc_mounts):
mock_read_proc_mounts.side_effect = _FAKE_MOUNTS_TABLE
mox.StubOutWithMock(os, 'remove')
os.remove(self.TEST_SNAPPATH)
self.drv._ensure_shares_mounted()
mox.ReplayAll()
self.assertEqual(_FAKE_VOLUME_DIR, self.drv._find_share('ignored'))
self.assertEqual(2, mock_read_proc_mounts.call_count)
self.assertEqual(1, mock_execute.call_count)
self._driver.delete_snapshot(self.TEST_SNAPSHOT)
expected_args = ('mount', '-t', 'sofs', _FAKE_SOFS_CONFIG,
_FAKE_MNT_POINT)
self.assertEqual(expected_args, mock_execute.call_args[0])
def test_initialize_connection(self):
"""Expected behaviour for initialize_connection."""
ret = self._driver.initialize_connection(self.TEST_VOLUME, None)
self.assertEqual('scality', ret['driver_volume_type'])
self.assertEqual(os.path.join(self.TEST_VOLDIR, self.TEST_VOLNAME),
ret['data']['sofs_path'])
self.assertEqual(self.TEST_VOLDIR, ret['data']['export'])
self.assertEqual(self.TEST_VOLNAME, ret['data']['name'])
mock_symlink.assert_called_once_with('.', _FAKE_VOL_BASEDIR)
def test_copy_image_to_volume(self):
"""Expected behaviour for copy_image_to_volume."""
self.mox.StubOutWithMock(image_utils, 'fetch_to_raw')
self.assertEqual(mock_ensure_tree.call_args_list,
[mock.call(_FAKE_MNT_POINT),
mock.call(os.path.join(_FAKE_MNT_POINT,
_FAKE_VOLUME_DIR))])
image_utils.fetch_to_raw(context,
self.TEST_IMAGE_SERVICE,
self.TEST_IMAGE_ID,
self.TEST_VOLPATH,
mox_lib.IgnoreArg(),
size=self.TEST_VOLSIZE)
def test_get_volume_stats(self):
with mock.patch.object(self.cfg, 'safe_get') as mock_safe_get:
mock_safe_get.return_value = 'fake_backend_name'
stats = self.drv.get_volume_stats()
self.assertEqual(self.drv.VERSION, stats['driver_version'])
self.assertEqual(mock_safe_get.return_value,
stats['volume_backend_name'])
mock_safe_get.assert_called_once_with('volume_backend_name')
self.mox.ReplayAll()
self._driver.copy_image_to_volume(context,
self.TEST_VOLUME,
self.TEST_IMAGE_SERVICE,
self.TEST_IMAGE_ID)
def test_copy_volume_to_image(self):
"""Expected behaviour for copy_volume_to_image."""
self.mox.StubOutWithMock(image_utils, 'upload_volume')
image_utils.upload_volume(context,
self.TEST_IMAGE_SERVICE,
self.TEST_IMAGE_META,
self.TEST_VOLPATH)
self.mox.ReplayAll()
self._driver.copy_volume_to_image(context,
self.TEST_VOLUME,
self.TEST_IMAGE_SERVICE,
self.TEST_IMAGE_META)
def test_create_cloned_volume(self):
"""Expected behaviour for create_cloned_volume."""
self.mox.StubOutWithMock(self._driver, '_create_file')
self.mox.StubOutWithMock(self._driver, '_copy_file')
vol_size = self._driver._size_bytes(self.TEST_VOLSIZE)
self._driver._create_file(self.TEST_CLONEPATH, vol_size)
self._driver._copy_file(self.TEST_VOLPATH, self.TEST_CLONEPATH)
self.mox.ReplayAll()
self._driver.create_cloned_volume(self.TEST_CLONE, self.TEST_VOLUME)
def test_extend_volume(self):
"""Expected behaviour for extend_volume."""
self.mox.StubOutWithMock(self._driver, '_create_file')
new_size = self._driver._size_bytes(self.TEST_NEWSIZE)
self._driver._create_file(self.TEST_VOLPATH, new_size)
self.mox.ReplayAll()
self._driver.extend_volume(self.TEST_VOLUME, self.TEST_NEWSIZE)
@mock.patch('six.moves.builtins.open')
@mock.patch('cinder.utils.temporary_chown')
@mock.patch('cinder.image.image_utils.qemu_img_info')
def test_backup_volume(self, qemu_img_info, temporary_chown, mock_open):
volume = {'id': '2', 'name': self.TEST_VOLNAME}
backup = {'volume_id': volume['id']}
info = mock.Mock()
@mock.patch("cinder.image.image_utils.qemu_img_info")
def test_initialize_connection(self, mock_qemu_img_info):
info = imageutils.QemuImgInfo()
info.file_format = 'raw'
info.backing_file = None
qemu_img_info.return_value = info
backup_service = mock.Mock()
info.image = _FAKE_VOLUME['name']
mock_qemu_img_info.return_value = info
with mock.patch.object(self._driver, 'db') as db:
db.volume_get.return_value = volume
with mock.patch.object(self.drv, 'get_active_image_from_info') as \
mock_get_active_image_from_info:
self._driver.backup_volume(context, backup, backup_service)
mock_get_active_image_from_info.return_value = _FAKE_VOLUME['name']
conn_info = self.drv.initialize_connection(_FAKE_VOLUME, None)
db.volume_get.assert_called_once_with(context, volume['id'])
qemu_img_info.assert_called_once_with(self.TEST_VOLPATH)
temporary_chown.asser_called_once_with(self.TEST_VOLPATH)
mock_open.assert_called_once_with(self.TEST_VOLPATH)
backup_service.backup.asser_called_once_with(backup, mock.ANY)
expected_conn_info = {
'driver_volume_type': driver.ScalityDriver.driver_volume_type,
'mount_point_base': _FAKE_MNT_POINT,
'data': {
'export': _FAKE_VOLUME['provider_location'],
'name': _FAKE_VOLUME['name'],
'sofs_path': 'cinder/volumes/00/' + _FAKE_VOLUME['name'],
'format': 'raw'
}
}
self.assertEqual(expected_conn_info, conn_info)
mock_get_active_image_from_info.assert_called_once_with(_FAKE_VOLUME)
mock_qemu_img_info.assert_called_once_with(_FAKE_VOL_PATH)
info.backing_file = 'not None'
self.assertRaises(exception.InvalidVolume,
self._driver.backup_volume,
context, backup, backup_service)
@mock.patch("cinder.image.image_utils.resize_image")
@mock.patch("cinder.image.image_utils.qemu_img_info")
def test_extend_volume(self, mock_qemu_img_info, mock_resize_image):
info = imageutils.QemuImgInfo()
info.file_format = 'raw'
mock_qemu_img_info.return_value = info
info.file_format = 'not raw'
self.assertRaises(exception.InvalidVolume,
self._driver.backup_volume,
context, backup, backup_service)
self.drv.extend_volume(_FAKE_VOLUME, 2)
@mock.patch('six.moves.builtins.open')
@mock.patch('cinder.utils.temporary_chown')
def test_restore_backup(self, temporary_chown, mock_open):
volume = {'id': '2', 'name': self.TEST_VOLNAME}
backup = {'id': 123, 'volume_id': volume['id']}
backup_service = mock.Mock()
mock_qemu_img_info.assert_called_once_with(_FAKE_VOL_PATH)
self._driver.restore_backup(context, backup, volume,
backup_service)
mock_resize_image.assert_called_once_with(_FAKE_VOL_PATH, 2)
temporary_chown.asser_called_once_with(self.TEST_VOLPATH)
mock_open.assert_called_once_with(self.TEST_VOLPATH, 'wb')
backup_service.restore.asser_called_once_with(backup, volume['id'],
mock.ANY)
@mock.patch("cinder.image.image_utils.qemu_img_info")
def test_extend_volume_with_invalid_format(self, mock_qemu_img_info):
info = imageutils.QemuImgInfo()
info.file_format = 'vmdk'
mock_qemu_img_info.return_value = info
self.assertRaises(exception.InvalidVolume,
self.drv.extend_volume, _FAKE_VOLUME, 2)
@mock.patch("cinder.image.image_utils.resize_image")
@mock.patch("cinder.image.image_utils.convert_image")
def test_copy_volume_from_snapshot_with_ioerror(self, mock_convert_image,
mock_resize_image):
with mock.patch.object(self.drv, '_read_info_file') as \
mock_read_info_file, \
mock.patch.object(self.drv, '_set_rw_permissions_for_all') as \
mock_set_rw_permissions:
mock_read_info_file.side_effect = IOError(errno.ENOENT, '')
self.drv._copy_volume_from_snapshot(_FAKE_SNAPSHOT,
_FAKE_VOLUME, 1)
mock_read_info_file.assert_called_once_with("%s.info" % _FAKE_VOL_PATH)
mock_convert_image.assert_called_once_with(_FAKE_SNAP_PATH,
_FAKE_VOL_PATH, 'raw',
run_as_root=True)
mock_set_rw_permissions.assert_called_once_with(_FAKE_VOL_PATH)
mock_resize_image.assert_called_once_with(_FAKE_VOL_PATH, 1)
@mock.patch("cinder.image.image_utils.resize_image")
@mock.patch("cinder.image.image_utils.convert_image")
@mock.patch("cinder.image.image_utils.qemu_img_info")
def test_copy_volume_from_snapshot(self, mock_qemu_img_info,
mock_convert_image, mock_resize_image):
new_volume = {'name': 'volume-3fa63b02-1fe5-11e5-b492-abf97a8fb23b',
'id': '3fa63b02-1fe5-11e5-b492-abf97a8fb23b',
'provider_location': 'fake_share'}
new_vol_path = os.path.join(_FAKE_VOL_BASEDIR, new_volume['name'])
info = imageutils.QemuImgInfo()
info.file_format = 'raw'
info.backing_file = _FAKE_VOL_PATH
mock_qemu_img_info.return_value = info
with mock.patch.object(self.drv, '_read_info_file') as \
mock_read_info_file, \
mock.patch.object(self.drv, '_set_rw_permissions_for_all') as \
mock_set_rw_permissions:
self.drv._copy_volume_from_snapshot(_FAKE_SNAPSHOT,
new_volume, 1)
mock_read_info_file.assert_called_once_with("%s.info" % _FAKE_VOL_PATH)
mock_convert_image.assert_called_once_with(_FAKE_VOL_PATH,
new_vol_path, 'raw',
run_as_root=True)
mock_set_rw_permissions.assert_called_once_with(new_vol_path)
mock_resize_image.assert_called_once_with(new_vol_path, 1)
@mock.patch("cinder.image.image_utils.qemu_img_info")
@mock.patch("cinder.utils.temporary_chown")
@mock.patch("six.moves.builtins.open")
def test_backup_volume(self, mock_open, mock_temporary_chown,
mock_qemu_img_info):
"""Backup a volume with no snapshots."""
info = imageutils.QemuImgInfo()
info.file_format = 'raw'
mock_qemu_img_info.return_value = info
backup = {'volume_id': _FAKE_VOLUME['id']}
mock_backup_service = mock.MagicMock()
self.drv.db.volume_get.return_value = _FAKE_VOLUME
self.drv.backup_volume(context, backup, mock_backup_service)
mock_qemu_img_info.assert_called_once_with(_FAKE_VOL_PATH)
mock_temporary_chown.assert_called_once_with(_FAKE_VOL_PATH)
mock_open.assert_called_once_with(_FAKE_VOL_PATH)
mock_backup_service.backup.assert_called_once_with(
backup, mock_open().__enter__())
@mock.patch("cinder.image.image_utils.qemu_img_info")
def test_backup_volume_with_non_raw_volume(self, mock_qemu_img_info):
info = imageutils.QemuImgInfo()
info.file_format = 'qcow2'
mock_qemu_img_info.return_value = info
self.drv.db.volume_get.return_value = _FAKE_VOLUME
self.assertRaises(exception.InvalidVolume, self.drv.backup_volume,
context, _FAKE_BACKUP, mock.MagicMock())
mock_qemu_img_info.assert_called_once_with(_FAKE_VOL_PATH)
@mock.patch("cinder.image.image_utils.qemu_img_info")
def test_backup_volume_with_backing_file(self, mock_qemu_img_info):
info = imageutils.QemuImgInfo()
info.file_format = 'raw'
info.backing_file = 'fake.img'
mock_qemu_img_info.return_value = info
backup = {'volume_id': _FAKE_VOLUME['id']}
self.drv.db.volume_get.return_value = _FAKE_VOLUME
self.assertRaises(exception.InvalidVolume, self.drv.backup_volume,
context, backup, mock.MagicMock())
mock_qemu_img_info.assert_called_once_with(_FAKE_VOL_PATH)
@mock.patch("cinder.utils.temporary_chown")
@mock.patch("six.moves.builtins.open")
def test_restore_bakup(self, mock_open, mock_temporary_chown):
mock_backup_service = mock.MagicMock()
self.drv.restore_backup(context, _FAKE_BACKUP, _FAKE_VOLUME,
mock_backup_service)
mock_temporary_chown.assert_called_once_with(_FAKE_VOL_PATH)
mock_open.assert_called_once_with(_FAKE_VOL_PATH, 'wb')
mock_backup_service.restore.assert_called_once_with(
_FAKE_BACKUP, _FAKE_VOLUME['id'], mock_open().__enter__())

View File

@ -1,4 +1,5 @@
# Copyright (c) 2013 Scality
# Copyright (c) 2015 Scality
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -17,20 +18,21 @@ Scality SOFS Volume Driver.
"""
import errno
import os
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import fileutils
from oslo_utils import units
import six
from six.moves import urllib
from cinder import exception
from cinder.i18n import _, _LI
from cinder.image import image_utils
from cinder import utils
from cinder.volume import driver
from cinder.volume.drivers import remotefs as remotefs_drv
from cinder.volume import utils as volume_utils
@ -52,62 +54,73 @@ CONF = cfg.CONF
CONF.register_opts(volume_opts)
class ScalityDriver(driver.VolumeDriver):
class ScalityDriver(remotefs_drv.RemoteFSSnapDriver):
"""Scality SOFS cinder driver.
Creates sparse files on SOFS for hypervisors to use as block
devices.
"""
VERSION = '1.0.0'
driver_volume_type = 'scality'
driver_prefix = 'scality_sofs'
volume_backend_name = 'Scality_SOFS'
VERSION = '2.0.0'
def __init__(self, *args, **kwargs):
super(ScalityDriver, self).__init__(*args, **kwargs)
self.configuration.append_config_values(volume_opts)
def _check_prerequisites(self):
self.sofs_mount_point = self.configuration.scality_sofs_mount_point
self.sofs_config = self.configuration.scality_sofs_config
self.sofs_rel_volume_dir = self.configuration.scality_sofs_volume_dir
self.sofs_abs_volume_dir = os.path.join(self.sofs_mount_point,
self.sofs_rel_volume_dir)
# The following config flag is used by RemoteFSDriver._do_create_volume
# We want to use sparse file (ftruncated) without exposing this
# as a config switch to customers.
self.configuration.scality_sofs_sparsed_volumes = True
def check_for_setup_error(self):
"""Sanity checks before attempting to mount SOFS."""
# config is mandatory
config = self.configuration.scality_sofs_config
if not config:
if not self.sofs_config:
msg = _("Value required for 'scality_sofs_config'")
LOG.warning(msg)
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
# config can be a file path or a URL, check it
if urllib.parse.urlparse(config).scheme == '':
config = self.sofs_config
if urllib.parse.urlparse(self.sofs_config).scheme == '':
# turn local path into URL
config = 'file://%s' % config
config = 'file://%s' % self.sofs_config
try:
urllib.request.urlopen(config, timeout=5).close()
except urllib.error.URLError as e:
msg = _("Cannot access 'scality_sofs_config': %s") % e
LOG.warning(msg)
except (urllib.error.URLError, urllib.error.HTTPError) as e:
msg = _("Can't access 'scality_sofs_config'"
": %s") % six.text_type(e)
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
# mount.sofs must be installed
if not os.access('/sbin/mount.sofs', os.X_OK):
msg = _("Cannot execute /sbin/mount.sofs")
LOG.warning(msg)
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
@lockutils.synchronized('mount-sofs', 'cinder-sofs', external=True)
def _mount_sofs(self):
config = self.configuration.scality_sofs_config
mount_path = self.configuration.scality_sofs_mount_point
def _load_shares_config(self, share_file=None):
self.shares[self.sofs_rel_volume_dir] = None
fileutils.ensure_tree(mount_path)
if not self._sofs_is_mounted():
self._execute('mount', '-t', 'sofs', config, mount_path,
run_as_root=True)
if not self._sofs_is_mounted():
msg = _("Cannot mount Scality SOFS, check syslog for errors")
LOG.warning(msg)
raise exception.VolumeBackendAPIException(data=msg)
def _get_mount_point_for_share(self, share=None):
# The _qemu_img_info_base() method from the RemoteFSSnapDriver class
# expects files (volume) to be inside a subdir of the mount point.
# So we have to append a dummy subdir.
return self.sofs_abs_volume_dir + "/00"
def _sofs_is_mounted(self):
mount_path = self.configuration.scality_sofs_mount_point.rstrip('/')
"""Check if SOFS is already mounted at the expected location."""
mount_path = self.sofs_mount_point.rstrip('/')
for mount in volume_utils.read_proc_mounts():
parts = mount.split()
if (parts[0].endswith('fuse') and
@ -115,121 +128,44 @@ class ScalityDriver(driver.VolumeDriver):
return True
return False
def _size_bytes(self, size_in_g):
return int(size_in_g) * units.Gi
@lockutils.synchronized('mount-sofs', 'cinder-sofs', external=True)
def _ensure_share_mounted(self, share=None):
"""Mount SOFS if need be."""
fileutils.ensure_tree(self.sofs_mount_point)
def _create_file(self, path, size):
with open(path, "ab") as f:
f.truncate(size)
os.chmod(path, 0o666)
def _copy_file(self, src_path, dest_path):
self._execute('dd', 'if=%s' % src_path, 'of=%s' % dest_path,
'bs=1M', 'conv=fsync,nocreat,notrunc',
run_as_root=True)
def do_setup(self, context):
"""Any initialization the volume driver does while starting."""
self._check_prerequisites()
self._mount_sofs()
voldir = os.path.join(self.configuration.scality_sofs_mount_point,
self.configuration.scality_sofs_volume_dir)
fileutils.ensure_tree(voldir)
def check_for_setup_error(self):
"""Returns an error if prerequisites aren't met."""
self._check_prerequisites()
voldir = os.path.join(self.configuration.scality_sofs_mount_point,
self.configuration.scality_sofs_volume_dir)
if not os.path.isdir(voldir):
msg = _("Cannot find volume dir for Scality SOFS at '%s'") % voldir
LOG.warning(msg)
if not self._sofs_is_mounted():
self._execute('mount', '-t', 'sofs', self.sofs_config,
self.sofs_mount_point, run_as_root=True)
if not self._sofs_is_mounted():
msg = _("Cannot mount Scality SOFS, check syslog for errors")
LOG.error(msg)
raise exception.VolumeBackendAPIException(data=msg)
def create_volume(self, volume):
"""Creates a logical volume.
fileutils.ensure_tree(self.sofs_abs_volume_dir)
Can optionally return a Dictionary of changes to the volume
object to be persisted.
"""
self._create_file(self.local_path(volume),
self._size_bytes(volume['size']))
volume['provider_location'] = self._sofs_path(volume)
return {'provider_location': volume['provider_location']}
# We symlink the '00' subdir to its parent dir to maintain
# compatibility with previous version of this driver.
try:
os.symlink(".", self._get_mount_point_for_share())
except OSError as exc:
if exc.errno == errno.EEXIST:
if not os.path.islink(self._get_mount_point_for_share()):
raise
else:
raise
def create_volume_from_snapshot(self, volume, snapshot):
"""Creates a volume from a snapshot."""
changes = self.create_volume(volume)
self._copy_file(self.local_path(snapshot),
self.local_path(volume))
return changes
def _ensure_shares_mounted(self):
self._ensure_share_mounted()
self._mounted_shares = [self.sofs_rel_volume_dir]
def delete_volume(self, volume):
"""Deletes a logical volume."""
os.remove(self.local_path(volume))
def create_snapshot(self, snapshot):
"""Creates a snapshot."""
volume_path = os.path.join(self.configuration.scality_sofs_mount_point,
self.configuration.scality_sofs_volume_dir,
snapshot['volume_name'])
snapshot_path = self.local_path(snapshot)
self._create_file(snapshot_path,
self._size_bytes(snapshot['volume_size']))
self._copy_file(volume_path, snapshot_path)
def delete_snapshot(self, snapshot):
"""Deletes a snapshot."""
os.remove(self.local_path(snapshot))
def _sofs_path(self, volume):
return os.path.join(self.configuration.scality_sofs_volume_dir,
volume['name'])
def local_path(self, volume):
return os.path.join(self.configuration.scality_sofs_mount_point,
self._sofs_path(volume))
def ensure_export(self, context, volume):
"""Synchronously recreates an export for a logical volume."""
pass
def create_export(self, context, volume, connector):
"""Exports the volume.
Can optionally return a Dictionary of changes to the volume
object to be persisted.
"""
pass
def remove_export(self, context, volume):
"""Removes an export for a logical volume."""
pass
def initialize_connection(self, volume, connector):
"""Allow connection to connector and return connection info."""
return {
'driver_volume_type': 'scality',
'data': {
'sofs_path': self._sofs_path(volume),
'export': self.configuration.scality_sofs_volume_dir,
'name': volume['name'],
}
}
def terminate_connection(self, volume, connector, **kwargs):
"""Disallow connection from connector."""
pass
def detach_volume(self, context, volume, attachment=None):
"""Callback for volume detached."""
pass
def _find_share(self, volume_size_for):
try:
return self._mounted_shares[0]
except IndexError:
raise exception.RemoteFSNoSharesMounted()
def get_volume_stats(self, refresh=False):
"""Return the current state of the volume service.
If 'refresh' is True, run the update first.
"""
"""Return the current state of the volume service."""
stats = {
'vendor_name': 'Scality',
'driver_version': self.VERSION,
@ -239,56 +175,96 @@ class ScalityDriver(driver.VolumeDriver):
'reserved_percentage': 0,
}
backend_name = self.configuration.safe_get('volume_backend_name')
stats['volume_backend_name'] = backend_name or 'Scality_SOFS'
stats['volume_backend_name'] = backend_name or self.volume_backend_name
return stats
def copy_image_to_volume(self, context, volume, image_service, image_id):
"""Fetch the image from image_service and write it to the volume."""
image_utils.fetch_to_raw(context,
image_service,
image_id,
self.local_path(volume),
self.configuration.volume_dd_blocksize,
size=volume['size'])
self.create_volume(volume)
@remotefs_drv.locked_volume_id_operation
def initialize_connection(self, volume, connector):
"""Allow connection to connector and return connection info."""
def copy_volume_to_image(self, context, volume, image_service, image_meta):
"""Copy the volume to the specified image."""
image_utils.upload_volume(context,
image_service,
image_meta,
self.local_path(volume))
# Find active qcow2 file
active_file = self.get_active_image_from_info(volume)
path = '%s/%s' % (self._get_mount_point_for_share(), active_file)
sofs_rel_path = os.path.join(self.sofs_rel_volume_dir, "00",
volume['name'])
def clone_image(self, context, volume,
image_location, image_meta,
image_service):
"""Create a volume efficiently from an existing image.
data = {'export': volume['provider_location'],
'name': active_file,
'sofs_path': sofs_rel_path}
image_location is a string whose format depends on the
image service backend in use. The driver should use it
to determine whether cloning is possible.
# Test file for raw vs. qcow2 format
info = self._qemu_img_info(path, volume['name'])
data['format'] = info.file_format
if data['format'] not in ['raw', 'qcow2']:
msg = _('%s must be a valid raw or qcow2 image.') % path
raise exception.InvalidVolume(msg)
image_meta is the metadata associated with the image and
includes properties like the image id, size, virtual-size
etc.
return {
'driver_volume_type': self.driver_volume_type,
'data': data,
'mount_point_base': self.sofs_mount_point
}
image_service is the reference of the image_service to use.
Note that this is needed to be passed here for drivers that
will want to fetch images from the image service directly.
def _qemu_img_info(self, path, volume_name):
return super(ScalityDriver, self)._qemu_img_info_base(
path, volume_name, self.sofs_abs_volume_dir)
Returns a dict of volume properties eg. provider_location,
boolean indicating whether cloning occurred
@remotefs_drv.locked_volume_id_operation
def extend_volume(self, volume, size_gb):
volume_path = self.local_path(volume)
info = self._qemu_img_info(volume_path, volume['name'])
backing_fmt = info.file_format
if backing_fmt not in ['raw', 'qcow2']:
msg = _('Unrecognized backing format: %s')
raise exception.InvalidVolume(msg % backing_fmt)
# qemu-img can resize both raw and qcow2 files
image_utils.resize_image(volume_path, size_gb)
def _copy_volume_from_snapshot(self, snapshot, volume, volume_size):
"""Copy data from snapshot to destination volume.
This is done with a qemu-img convert to raw/qcow2 from the snapshot
qcow2.
"""
return None, False
def create_cloned_volume(self, volume, src_vref):
"""Creates a clone of the specified volume."""
self.create_volume_from_snapshot(volume, src_vref)
info_path = self._local_path_volume_info(snapshot['volume'])
def extend_volume(self, volume, new_size):
"""Extend an existing volume."""
self._create_file(self.local_path(volume),
self._size_bytes(new_size))
# For BC compat' with version < 2 of this driver
try:
snap_info = self._read_info_file(info_path)
except IOError as exc:
if exc.errno != errno.ENOENT:
raise
else:
path_to_snap_img = self.local_path(snapshot)
else:
vol_path = self._local_volume_dir(snapshot['volume'])
forward_file = snap_info[snapshot['id']]
forward_path = os.path.join(vol_path, forward_file)
# Find the file which backs this file, which represents the point
# when this snapshot was created.
img_info = self._qemu_img_info(forward_path,
snapshot['volume']['name'])
path_to_snap_img = os.path.join(vol_path, img_info.backing_file)
LOG.debug("will copy from snapshot at %s", path_to_snap_img)
path_to_new_vol = self.local_path(volume)
out_format = 'raw'
image_utils.convert_image(path_to_snap_img,
path_to_new_vol,
out_format,
run_as_root=self._execute_as_root)
self._set_rw_permissions_for_all(path_to_new_vol)
image_utils.resize_image(path_to_new_vol, volume_size)
def backup_volume(self, context, backup, backup_service):
"""Create a new backup from an existing volume."""