From dd556fa755adca195e7df82477ae6400f693af14 Mon Sep 17 00:00:00 2001 From: Chhavi Agarwal Date: Tue, 7 Nov 2017 07:05:49 -0500 Subject: [PATCH] Run backup-restore operations on native thread During huge backup file read write operations holds the CPU which leads to thread starvation, and cause cinder backup service to report down, as DB operations are impacted. Proposed changes are to run CPU and file sensitive operations like read, write, compress, decompress on a native thread. Change-Id: I1f1d9c0d6e3f04f1ecd5ef7c5d813005ee116409 Closes-Bug: #1692775 Co-Authored-By: Gorka Eguileor --- cinder/backup/chunkeddriver.py | 37 +++++++++++++------ cinder/backup/driver.py | 16 +++++++- cinder/backup/manager.py | 32 ++++++++++++---- .../unit/backup/drivers/test_backup_google.py | 11 ++++-- .../unit/backup/drivers/test_backup_nfs.py | 36 ++++++++++++------ .../unit/backup/drivers/test_backup_swift.py | 11 ++++-- 6 files changed, 104 insertions(+), 39 deletions(-) diff --git a/cinder/backup/chunkeddriver.py b/cinder/backup/chunkeddriver.py index 527a527d6f2..1752f12e663 100644 --- a/cinder/backup/chunkeddriver.py +++ b/cinder/backup/chunkeddriver.py @@ -76,12 +76,18 @@ class ChunkedBackupDriver(driver.BackupDriver): try: if algorithm.lower() in ('none', 'off', 'no'): return None - elif algorithm.lower() in ('zlib', 'gzip'): + if algorithm.lower() in ('zlib', 'gzip'): import zlib as compressor - return compressor + result = compressor elif algorithm.lower() in ('bz2', 'bzip2'): import bz2 as compressor - return compressor + result = compressor + else: + result = None + if result: + # NOTE(geguileo): Compression/Decompression starves + # greenthreads so we use a native thread instead. + return eventlet.tpool.Proxy(result) except ImportError: pass @@ -105,6 +111,16 @@ class ChunkedBackupDriver(driver.BackupDriver): self._get_compressor(CONF.backup_compression_algorithm) self.support_force_delete = True + def _get_object_writer(self, container, object_name, extra_metadata=None): + """Return writer proxy-wrapped to execute methods in native thread.""" + writer = self.get_object_writer(container, object_name, extra_metadata) + return eventlet.tpool.Proxy(writer) + + def _get_object_reader(self, container, object_name, extra_metadata=None): + """Return reader proxy-wrapped to execute methods in native thread.""" + reader = self.get_object_reader(container, object_name, extra_metadata) + return eventlet.tpool.Proxy(reader) + # To create your own "chunked" backup driver, implement the following # abstract methods. @@ -222,7 +238,7 @@ class ChunkedBackupDriver(driver.BackupDriver): metadata_json = json.dumps(metadata, sort_keys=True, indent=2) if six.PY3: metadata_json = metadata_json.encode('utf-8') - with self.get_object_writer(container, filename) as writer: + with self._get_object_writer(container, filename) as writer: writer.write(metadata_json) LOG.debug('_write_metadata finished. Metadata: %s.', metadata_json) @@ -243,7 +259,7 @@ class ChunkedBackupDriver(driver.BackupDriver): sha256file_json = json.dumps(sha256file, sort_keys=True, indent=2) if six.PY3: sha256file_json = sha256file_json.encode('utf-8') - with self.get_object_writer(container, filename) as writer: + with self._get_object_writer(container, filename) as writer: writer.write(sha256file_json) LOG.debug('_write_sha256file finished.') @@ -253,7 +269,7 @@ class ChunkedBackupDriver(driver.BackupDriver): LOG.debug('_read_metadata started, container name: %(container)s, ' 'metadata filename: %(filename)s.', {'container': container, 'filename': filename}) - with self.get_object_reader(container, filename) as reader: + with self._get_object_reader(container, filename) as reader: metadata_json = reader.read() if six.PY3: metadata_json = metadata_json.decode('utf-8') @@ -267,7 +283,7 @@ class ChunkedBackupDriver(driver.BackupDriver): LOG.debug('_read_sha256file started, container name: %(container)s, ' 'sha256 filename: %(filename)s.', {'container': container, 'filename': filename}) - with self.get_object_reader(container, filename) as reader: + with self._get_object_reader(container, filename) as reader: sha256file_json = reader.read() if six.PY3: sha256file_json = sha256file_json.decode('utf-8') @@ -327,7 +343,7 @@ class ChunkedBackupDriver(driver.BackupDriver): algorithm, output_data = self._prepare_output_data(data) obj[object_name]['compression'] = algorithm LOG.debug('About to put_object') - with self.get_object_writer( + with self._get_object_writer( container, object_name, extra_metadata=extra_metadata ) as writer: writer.write(output_data) @@ -349,8 +365,7 @@ class ChunkedBackupDriver(driver.BackupDriver): data_size_bytes = len(data) # Execute compression in native thread so it doesn't prevent # cooperative greenthread switching. - compressed_data = eventlet.tpool.execute(self.compressor.compress, - data) + compressed_data = self.compressor.compress(data) comp_size_bytes = len(compressed_data) algorithm = CONF.backup_compression_algorithm.lower() if comp_size_bytes >= data_size_bytes: @@ -618,7 +633,7 @@ class ChunkedBackupDriver(driver.BackupDriver): 'volume_id': volume_id, }) - with self.get_object_reader( + with self._get_object_reader( container, object_name, extra_metadata=extra_metadata) as reader: body = reader.read() diff --git a/cinder/backup/driver.py b/cinder/backup/driver.py index 10598300181..b085cdbe7d6 100644 --- a/cinder/backup/driver.py +++ b/cinder/backup/driver.py @@ -374,12 +374,24 @@ class BackupDriver(base.Base): @abc.abstractmethod def backup(self, backup, volume_file, backup_metadata=False): - """Start a backup of a specified volume.""" + """Start a backup of a specified volume. + + Some I/O operations may block greenthreads, so in order to prevent + starvation parameter volume_file will be a proxy that will execute all + methods in native threads, so the method implementation doesn't need to + worry about that.. + """ return @abc.abstractmethod def restore(self, backup, volume_id, volume_file): - """Restore a saved backup.""" + """Restore a saved backup. + + Some I/O operations may block greenthreads, so in order to prevent + starvation parameter volume_file will be a proxy that will execute all + methods in native threads, so the method implementation doesn't need to + worry about that.. + """ return @abc.abstractmethod diff --git a/cinder/backup/manager.py b/cinder/backup/manager.py index 2f9ea19b69b..08d12bdc800 100644 --- a/cinder/backup/manager.py +++ b/cinder/backup/manager.py @@ -32,6 +32,8 @@ Volume backups can be created, restored, deleted and listed. """ import os + +from eventlet import tpool from oslo_config import cfg from oslo_log import log as logging from oslo_log import versionutils @@ -82,6 +84,12 @@ CONF.import_opt('num_volume_device_scan_tries', 'cinder.volume.driver') QUOTAS = quota.QUOTAS +# TODO(geguileo): Once Eventlet issue #432 gets fixed we can just tpool.execute +# the whole call to the driver's backup and restore methods instead of proxy +# wrapping the device_file and having the drivers also proxy wrap their +# writes/reads and the compression/decompression calls. +# (https://github.com/eventlet/eventlet/issues/432) + class BackupManager(manager.ThreadPoolManager): """Manages backup of block storage devices.""" @@ -408,6 +416,10 @@ class BackupManager(manager.ThreadPoolManager): backup_service = self.get_backup_driver(context) properties = utils.brick_get_connector_properties() + + # NOTE(geguileo): Not all I/O disk operations properly do greenthread + # context switching and may end up blocking the greenthread, so we go + # with native threads proxy-wrapping the device file object. try: backup_device = self.volume_rpcapi.get_backup_device(context, backup, @@ -423,16 +435,16 @@ class BackupManager(manager.ThreadPoolManager): if backup_device.secure_enabled: with open(device_path) as device_file: updates = backup_service.backup( - backup, device_file) + backup, tpool.Proxy(device_file)) else: with utils.temporary_chown(device_path): with open(device_path) as device_file: updates = backup_service.backup( - backup, device_file) + backup, tpool.Proxy(device_file)) # device_path is already file-like so no need to open it else: - updates = backup_service.backup( - backup, device_path) + updates = backup_service.backup(backup, + tpool.Proxy(device_path)) finally: self._detach_device(context, attach_info, @@ -534,21 +546,27 @@ class BackupManager(manager.ThreadPoolManager): self.volume_rpcapi.secure_file_operations_enabled(context, volume)) attach_info = self._attach_device(context, volume, properties) + + # NOTE(geguileo): Not all I/O disk operations properly do greenthread + # context switching and may end up blocking the greenthread, so we go + # with native threads proxy-wrapping the device file object. try: device_path = attach_info['device']['path'] if (isinstance(device_path, six.string_types) and not os.path.isdir(device_path)): if secure_enabled: with open(device_path, 'wb') as device_file: - backup_service.restore(backup, volume.id, device_file) + backup_service.restore(backup, volume.id, + tpool.Proxy(device_file)) else: with utils.temporary_chown(device_path): with open(device_path, 'wb') as device_file: backup_service.restore(backup, volume.id, - device_file) + tpool.Proxy(device_file)) # device_path is already file-like so no need to open it else: - backup_service.restore(backup, volume.id, device_path) + backup_service.restore(backup, volume.id, + tpool.Proxy(device_path)) finally: self._detach_device(context, attach_info, volume, properties, force=True) diff --git a/cinder/tests/unit/backup/drivers/test_backup_google.py b/cinder/tests/unit/backup/drivers/test_backup_google.py index b09e74dc40b..9e2eeddb1a8 100644 --- a/cinder/tests/unit/backup/drivers/test_backup_google.py +++ b/cinder/tests/unit/backup/drivers/test_backup_google.py @@ -28,6 +28,7 @@ import tempfile import threading import zlib +from eventlet import tpool import mock from oslo_utils import units @@ -551,8 +552,10 @@ class GoogleBackupDriverTestCase(test.TestCase): self.assertIsNone(compressor) compressor = service._get_compressor('zlib') self.assertEqual(zlib, compressor) + self.assertIsInstance(compressor, tpool.Proxy) compressor = service._get_compressor('bz2') self.assertEqual(bz2, compressor) + self.assertIsInstance(compressor, tpool.Proxy) self.assertRaises(ValueError, service._get_compressor, 'fake') @gcs_client @@ -562,17 +565,17 @@ class GoogleBackupDriverTestCase(test.TestCase): thread_dict = {} original_compress = zlib.compress - def my_compress(data, *args, **kwargs): + def my_compress(data): thread_dict['compress'] = threading.current_thread() return original_compress(data) + self.mock_object(zlib, 'compress', side_effect=my_compress) + service = google_dr.GoogleBackupDriver(self.ctxt) # Set up buffer of 128 zeroed bytes fake_data = b'\0' * 128 - with mock.patch.object(service.compressor, 'compress', - side_effect=my_compress): - result = service._prepare_output_data(fake_data) + result = service._prepare_output_data(fake_data) self.assertEqual('zlib', result[0]) self.assertGreater(len(fake_data), len(result[1])) diff --git a/cinder/tests/unit/backup/drivers/test_backup_nfs.py b/cinder/tests/unit/backup/drivers/test_backup_nfs.py index 1b1ab6a69d2..4b29a47edd2 100644 --- a/cinder/tests/unit/backup/drivers/test_backup_nfs.py +++ b/cinder/tests/unit/backup/drivers/test_backup_nfs.py @@ -25,6 +25,7 @@ import tempfile import threading import zlib +from eventlet import tpool import mock from os_brick.remotefs import remotefs as remotefs_brick from oslo_config import cfg @@ -149,6 +150,10 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): self.volume_file.write(bytes([65] * data_size)) self.volume_file.seek(0) + def _store_thread(self, *args, **kwargs): + self.thread_dict['thread'] = threading.current_thread() + return self.thread_original_method(*args, **kwargs) + def setUp(self): super(BackupNFSSwiftBasedTestCase, self).setUp() @@ -173,6 +178,9 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): self.volume_file.write(os.urandom(1024)) self.size_volume_file += 1024 + # Use dictionary to share data between threads + self.thread_dict = {} + def test_backup_uncompressed(self): volume_id = fake.VOLUME_ID self._create_backup_db_entry(volume_id=volume_id) @@ -573,7 +581,9 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): restored_file.name)) def test_restore_bz2(self): + self.thread_original_method = bz2.decompress volume_id = fake.VOLUME_ID + self.mock_object(bz2, 'decompress', side_effect=self._store_thread) self._create_backup_db_entry(volume_id=volume_id) self.flags(backup_compression_algorithm='bz2') @@ -591,7 +601,12 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): self.assertTrue(filecmp.cmp(self.volume_file.name, restored_file.name)) + self.assertNotEqual(threading.current_thread(), + self.thread_dict['thread']) + def test_restore_zlib(self): + self.thread_original_method = zlib.decompress + self.mock_object(zlib, 'decompress', side_effect=self._store_thread) volume_id = fake.VOLUME_ID self._create_backup_db_entry(volume_id=volume_id) @@ -610,6 +625,9 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): self.assertTrue(filecmp.cmp(self.volume_file.name, restored_file.name)) + self.assertNotEqual(threading.current_thread(), + self.thread_dict['thread']) + def test_restore_delta(self): volume_id = fake.VOLUME_ID @@ -672,8 +690,10 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): self.assertIsNone(compressor) compressor = service._get_compressor('zlib') self.assertEqual(compressor, zlib) + self.assertIsInstance(compressor, tpool.Proxy) compressor = service._get_compressor('bz2') self.assertEqual(compressor, bz2) + self.assertIsInstance(compressor, tpool.Proxy) self.assertRaises(ValueError, service._get_compressor, 'fake') def create_buffer(self, size): @@ -688,24 +708,18 @@ class BackupNFSSwiftBasedTestCase(test.TestCase): def test_prepare_output_data_effective_compression(self): """Test compression works on a native thread.""" - # Use dictionary to share data between threads - thread_dict = {} - original_compress = zlib.compress - - def my_compress(data, *args, **kwargs): - thread_dict['compress'] = threading.current_thread() - return original_compress(data) + self.thread_original_method = zlib.compress + self.mock_object(zlib, 'compress', side_effect=self._store_thread) service = nfs.NFSBackupDriver(self.ctxt) fake_data = self.create_buffer(128) - with mock.patch.object(service.compressor, 'compress', - side_effect=my_compress): - result = service._prepare_output_data(fake_data) + + result = service._prepare_output_data(fake_data) self.assertEqual('zlib', result[0]) self.assertGreater(len(fake_data), len(result[1])) self.assertNotEqual(threading.current_thread(), - thread_dict['compress']) + self.thread_dict['thread']) def test_prepare_output_data_no_compresssion(self): self.flags(backup_compression_algorithm='none') diff --git a/cinder/tests/unit/backup/drivers/test_backup_swift.py b/cinder/tests/unit/backup/drivers/test_backup_swift.py index 5ebee45d0b6..d7faf0a4b04 100644 --- a/cinder/tests/unit/backup/drivers/test_backup_swift.py +++ b/cinder/tests/unit/backup/drivers/test_backup_swift.py @@ -27,6 +27,7 @@ import tempfile import threading import zlib +from eventlet import tpool import mock from oslo_config import cfg from swiftclient import client as swift @@ -821,8 +822,10 @@ class BackupSwiftTestCase(test.TestCase): self.assertIsNone(compressor) compressor = service._get_compressor('zlib') self.assertEqual(zlib, compressor) + self.assertIsInstance(compressor, tpool.Proxy) compressor = service._get_compressor('bz2') self.assertEqual(bz2, compressor) + self.assertIsInstance(compressor, tpool.Proxy) self.assertRaises(ValueError, service._get_compressor, 'fake') def test_prepare_output_data_effective_compression(self): @@ -831,17 +834,17 @@ class BackupSwiftTestCase(test.TestCase): thread_dict = {} original_compress = zlib.compress - def my_compress(data, *args, **kwargs): + def my_compress(data): thread_dict['compress'] = threading.current_thread() return original_compress(data) + self.mock_object(zlib, 'compress', side_effect=my_compress) + service = swift_dr.SwiftBackupDriver(self.ctxt) # Set up buffer of 128 zeroed bytes fake_data = b'\0' * 128 - with mock.patch.object(service.compressor, 'compress', - side_effect=my_compress): - result = service._prepare_output_data(fake_data) + result = service._prepare_output_data(fake_data) self.assertEqual('zlib', result[0]) self.assertGreater(len(fake_data), len(result[1]))