Ensure image utils don't block greenthreads
When doing image operations in Cinder we may start getting errors on RabbitMQ and DB connections, which result in the volume service appearing as down to the scheduler. This is caused by file I/O operations, that in some cases block greenthreads, preventing switching to another greenthread on I/O as they should. This results in many different errors, so this patch makes sure that image operations (fetch, put, verify image, ...) that could prevent greenthread switching are executed in native threads. Closes-Bug: #1801958 Change-Id: I8a8fbf96875319a7e5ca167fcd2bca45f57649c8
This commit is contained in:
parent
54900fd866
commit
e4de4fb33d
@ -34,6 +34,7 @@ import tempfile
|
||||
import cryptography
|
||||
from cursive import exception as cursive_exception
|
||||
from cursive import signature_utils
|
||||
from eventlet import tpool
|
||||
from oslo_concurrency import processutils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
@ -288,6 +289,19 @@ def resize_image(source, size, run_as_root=False):
|
||||
utils.execute(*cmd, run_as_root=run_as_root)
|
||||
|
||||
|
||||
def _verify_image(img_file, verifier):
|
||||
# This methods must be called from a native thread, as the file I/O may
|
||||
# not yield to other greenthread in some cases, and since the update and
|
||||
# verify operations are CPU bound there would not be any yielding either,
|
||||
# which could lead to thread starvation.
|
||||
while True:
|
||||
chunk = img_file.read(1024)
|
||||
if not chunk:
|
||||
break
|
||||
verifier.update(chunk)
|
||||
verifier.verify()
|
||||
|
||||
|
||||
def verify_glance_image_signature(context, image_service, image_id, path):
|
||||
verifier = None
|
||||
image_meta = image_service.show(context, image_id)
|
||||
@ -328,13 +342,7 @@ def verify_glance_image_signature(context, image_service, image_id, path):
|
||||
with fileutils.remove_path_on_error(path):
|
||||
with open(path, "rb") as tem_file:
|
||||
try:
|
||||
while True:
|
||||
chunk = tem_file.read(1024)
|
||||
if chunk:
|
||||
verifier.update(chunk)
|
||||
else:
|
||||
break
|
||||
verifier.verify()
|
||||
tpool.execute(_verify_image, tem_file, verifier)
|
||||
LOG.info('Image signature verification succeeded '
|
||||
'for image: %s', image_id)
|
||||
return True
|
||||
@ -365,7 +373,8 @@ def fetch(context, image_service, image_id, path, _user_id, _project_id):
|
||||
with fileutils.remove_path_on_error(path):
|
||||
with open(path, "wb") as image_file:
|
||||
try:
|
||||
image_service.download(context, image_id, image_file)
|
||||
image_service.download(context, image_id,
|
||||
tpool.Proxy(image_file))
|
||||
except IOError as e:
|
||||
if e.errno == errno.ENOSPC:
|
||||
params = {'path': os.path.dirname(path),
|
||||
@ -581,11 +590,13 @@ def upload_volume(context, image_service, image_meta, volume_path,
|
||||
image_id, volume_format, image_meta['disk_format'])
|
||||
if os.name == 'nt' or os.access(volume_path, os.R_OK):
|
||||
with open(volume_path, 'rb') as image_file:
|
||||
image_service.update(context, image_id, {}, image_file)
|
||||
image_service.update(context, image_id, {},
|
||||
tpool.Proxy(image_file))
|
||||
else:
|
||||
with utils.temporary_chown(volume_path):
|
||||
with open(volume_path, 'rb') as image_file:
|
||||
image_service.update(context, image_id, {}, image_file)
|
||||
image_service.update(context, image_id, {},
|
||||
tpool.Proxy(image_file))
|
||||
return
|
||||
|
||||
with temporary_file() as tmp:
|
||||
@ -617,7 +628,8 @@ def upload_volume(context, image_service, image_meta, volume_path,
|
||||
{'f1': out_format, 'f2': data.file_format})
|
||||
|
||||
with open(tmp, 'rb') as image_file:
|
||||
image_service.update(context, image_id, {}, image_file)
|
||||
image_service.update(context, image_id, {},
|
||||
tpool.Proxy(image_file))
|
||||
|
||||
|
||||
def check_virtual_size(virtual_size, volume_size, image_id):
|
||||
|
@ -302,9 +302,10 @@ class TestResizeImage(test.TestCase):
|
||||
|
||||
|
||||
class TestFetch(test.TestCase):
|
||||
@mock.patch('eventlet.tpool.Proxy')
|
||||
@mock.patch('os.stat')
|
||||
@mock.patch('cinder.image.image_utils.fileutils')
|
||||
def test_defaults(self, mock_fileutils, mock_stat):
|
||||
def test_defaults(self, mock_fileutils, mock_stat, mock_proxy):
|
||||
ctxt = mock.sentinel.context
|
||||
image_service = mock.Mock()
|
||||
image_id = mock.sentinel.image_id
|
||||
@ -319,8 +320,9 @@ class TestFetch(test.TestCase):
|
||||
output = image_utils.fetch(ctxt, image_service, image_id, path,
|
||||
_user_id, _project_id)
|
||||
self.assertIsNone(output)
|
||||
mock_proxy.assert_called_once_with(mock_open.return_value)
|
||||
image_service.download.assert_called_once_with(ctxt, image_id,
|
||||
mock_open.return_value)
|
||||
mock_proxy.return_value)
|
||||
mock_open.assert_called_once_with(path, 'wb')
|
||||
mock_fileutils.remove_path_on_error.assert_called_once_with(path)
|
||||
(mock_fileutils.remove_path_on_error.return_value.__enter__
|
||||
@ -441,10 +443,12 @@ class TestVerifyImageSignature(test.TestCase):
|
||||
FakeImageService(), 'fake_id', 'fake_path')
|
||||
mock_get.assert_not_called()
|
||||
|
||||
@mock.patch('six.moves.builtins.open')
|
||||
@mock.patch('eventlet.tpool.execute')
|
||||
@mock.patch('cursive.signature_utils.get_verifier')
|
||||
@mock.patch('oslo_utils.fileutils.remove_path_on_error')
|
||||
def test_image_signature_verify_success(self, mock_remove, mock_get):
|
||||
self.mock_object(builtins, 'open', mock.mock_open())
|
||||
def test_image_signature_verify_success(self, mock_remove, mock_get,
|
||||
mock_exec, mock_open):
|
||||
ctxt = mock.sentinel.context
|
||||
metadata = {'name': 'test image',
|
||||
'is_public': False,
|
||||
@ -465,6 +469,11 @@ class TestVerifyImageSignature(test.TestCase):
|
||||
result = image_utils.verify_glance_image_signature(
|
||||
ctxt, FakeImageService(), 'fake_id', 'fake_path')
|
||||
self.assertTrue(result)
|
||||
mock_exec.assert_called_once_with(
|
||||
image_utils._verify_image,
|
||||
mock_open.return_value.__enter__.return_value,
|
||||
mock_get.return_value)
|
||||
|
||||
mock_get.assert_called_once_with(
|
||||
context=ctxt,
|
||||
img_signature_certificate_uuid='fake_uuid',
|
||||
@ -635,6 +644,7 @@ class TestTemporaryDir(test.TestCase):
|
||||
class TestUploadVolume(test.TestCase):
|
||||
@ddt.data((mock.sentinel.disk_format, mock.sentinel.disk_format),
|
||||
('ploop', 'parallels'))
|
||||
@mock.patch('eventlet.tpool.Proxy')
|
||||
@mock.patch('cinder.image.image_utils.CONF')
|
||||
@mock.patch('six.moves.builtins.open')
|
||||
@mock.patch('cinder.image.image_utils.qemu_img_info')
|
||||
@ -642,7 +652,7 @@ class TestUploadVolume(test.TestCase):
|
||||
@mock.patch('cinder.image.image_utils.temporary_file')
|
||||
@mock.patch('cinder.image.image_utils.os')
|
||||
def test_diff_format(self, image_format, mock_os, mock_temp, mock_convert,
|
||||
mock_info, mock_open, mock_conf):
|
||||
mock_info, mock_open, mock_conf, mock_proxy):
|
||||
input_format, output_format = image_format
|
||||
ctxt = mock.sentinel.context
|
||||
image_service = mock.Mock()
|
||||
@ -666,10 +676,12 @@ class TestUploadVolume(test.TestCase):
|
||||
mock_info.assert_called_with(temp_file, run_as_root=True)
|
||||
self.assertEqual(2, mock_info.call_count)
|
||||
mock_open.assert_called_once_with(temp_file, 'rb')
|
||||
image_service.update.assert_called_once_with(
|
||||
ctxt, image_meta['id'], {},
|
||||
mock_proxy.assert_called_once_with(
|
||||
mock_open.return_value.__enter__.return_value)
|
||||
image_service.update.assert_called_once_with(
|
||||
ctxt, image_meta['id'], {}, mock_proxy.return_value)
|
||||
|
||||
@mock.patch('eventlet.tpool.Proxy')
|
||||
@mock.patch('cinder.image.image_utils.utils.temporary_chown')
|
||||
@mock.patch('cinder.image.image_utils.CONF')
|
||||
@mock.patch('six.moves.builtins.open')
|
||||
@ -678,7 +690,7 @@ class TestUploadVolume(test.TestCase):
|
||||
@mock.patch('cinder.image.image_utils.temporary_file')
|
||||
@mock.patch('cinder.image.image_utils.os')
|
||||
def test_same_format(self, mock_os, mock_temp, mock_convert, mock_info,
|
||||
mock_open, mock_conf, mock_chown):
|
||||
mock_open, mock_conf, mock_chown, mock_proxy):
|
||||
ctxt = mock.sentinel.context
|
||||
image_service = mock.Mock()
|
||||
image_meta = {'id': 'test_id',
|
||||
@ -695,10 +707,12 @@ class TestUploadVolume(test.TestCase):
|
||||
self.assertFalse(mock_info.called)
|
||||
mock_chown.assert_called_once_with(volume_path)
|
||||
mock_open.assert_called_once_with(volume_path, 'rb')
|
||||
image_service.update.assert_called_once_with(
|
||||
ctxt, image_meta['id'], {},
|
||||
mock_proxy.assert_called_once_with(
|
||||
mock_open.return_value.__enter__.return_value)
|
||||
image_service.update.assert_called_once_with(
|
||||
ctxt, image_meta['id'], {}, mock_proxy.return_value)
|
||||
|
||||
@mock.patch('eventlet.tpool.Proxy')
|
||||
@mock.patch('cinder.image.image_utils.utils.temporary_chown')
|
||||
@mock.patch('cinder.image.image_utils.CONF')
|
||||
@mock.patch('six.moves.builtins.open')
|
||||
@ -707,7 +721,8 @@ class TestUploadVolume(test.TestCase):
|
||||
@mock.patch('cinder.image.image_utils.temporary_file')
|
||||
@mock.patch('cinder.image.image_utils.os')
|
||||
def test_same_format_on_nt(self, mock_os, mock_temp, mock_convert,
|
||||
mock_info, mock_open, mock_conf, mock_chown):
|
||||
mock_info, mock_open, mock_conf, mock_chown,
|
||||
mock_proxy):
|
||||
ctxt = mock.sentinel.context
|
||||
image_service = mock.Mock()
|
||||
image_meta = {'id': 'test_id',
|
||||
@ -723,9 +738,10 @@ class TestUploadVolume(test.TestCase):
|
||||
self.assertFalse(mock_convert.called)
|
||||
self.assertFalse(mock_info.called)
|
||||
mock_open.assert_called_once_with(volume_path, 'rb')
|
||||
image_service.update.assert_called_once_with(
|
||||
ctxt, image_meta['id'], {},
|
||||
mock_proxy.assert_called_once_with(
|
||||
mock_open.return_value.__enter__.return_value)
|
||||
image_service.update.assert_called_once_with(
|
||||
ctxt, image_meta['id'], {}, mock_proxy.return_value)
|
||||
|
||||
@mock.patch('cinder.image.image_utils.CONF')
|
||||
@mock.patch('six.moves.builtins.open')
|
||||
|
Loading…
x
Reference in New Issue
Block a user