Merge "BSN: Optimistic locking strategy for consistency"

This commit is contained in:
Jenkins 2014-11-21 00:13:29 +00:00 committed by Gerrit Code Review
commit c8ac295620
9 changed files with 318 additions and 23 deletions

View File

@ -12,13 +12,42 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
import re
import string
import time
from oslo.config import cfg
from oslo.db import exception as db_exc
from oslo.db.sqlalchemy import session
import sqlalchemy as sa
from neutron.db import api as db
from neutron.db import model_base
from neutron.openstack.common.gettextutils import _LI, _LW
from neutron.openstack.common import log as logging
LOG = logging.getLogger(__name__)
# Maximum time in seconds to wait for a single record lock to be released
# NOTE: The total time waiting may exceed this if there are multiple servers
# waiting for the same lock
MAX_LOCK_WAIT_TIME = 15
def setup_db():
'''Helper to register models for unit tests'''
if HashHandler._FACADE is None:
HashHandler._FACADE = session.EngineFacade.from_config(
cfg.CONF, sqlite_fk=True)
ConsistencyHash.metadata.create_all(
HashHandler._FACADE.get_engine())
def clear_db():
'''Helper to unregister models and clear engine in unit tests'''
if not HashHandler._FACADE:
return
ConsistencyHash.metadata.drop_all(HashHandler._FACADE.get_engine())
HashHandler._FACADE = None
class ConsistencyHash(model_base.BASEV2):
@ -38,31 +67,157 @@ class HashHandler(object):
'''
A wrapper object to keep track of the session between the read
and the update operations.
'''
def __init__(self, context=None, hash_id='1'):
self.hash_id = hash_id
self.session = db.get_session() if not context else context.session
self.hash_db_obj = None
def read_for_update(self):
# REVISIT(kevinbenton): locking here with the DB is prone to deadlocks
# in various multi-REST-call scenarios (router intfs, flips, etc).
# Since it doesn't work in Galera deployments anyway, another sync
# mechanism will have to be introduced to prevent inefficient double
# syncs in HA deployments.
This class needs an SQL engine completely independent of the main
neutron connection so rollbacks from consistency hash operations don't
affect the parent sessions.
'''
_FACADE = None
def __init__(self, hash_id='1'):
if HashHandler._FACADE is None:
HashHandler._FACADE = session.EngineFacade.from_config(
cfg.CONF, sqlite_fk=True)
self.hash_id = hash_id
self.session = HashHandler._FACADE.get_session(autocommit=True,
expire_on_commit=False)
self.random_lock_id = ''.join(random.choice(string.ascii_uppercase
+ string.digits)
for _ in range(10))
self.lock_marker = 'LOCKED_BY[%s]' % self.random_lock_id
def _get_current_record(self):
with self.session.begin(subtransactions=True):
res = (self.session.query(ConsistencyHash).
filter_by(hash_id=self.hash_id).first())
if not res:
return ''
self.hash_db_obj = res
return res.hash
if res:
self.session.refresh(res) # make sure latest is loaded from db
return res
def _insert_empty_hash_with_lock(self):
# try to insert a new hash, return False on conflict
try:
with self.session.begin(subtransactions=True):
res = ConsistencyHash(hash_id=self.hash_id,
hash=self.lock_marker)
self.session.add(res)
return True
except db_exc.DBDuplicateEntry:
# another server created a new record at the same time
return False
def _optimistic_update_hash_record(self, old_record, new_hash):
# Optimistic update strategy. Returns True if successful, else False.
query = sa.update(ConsistencyHash.__table__).values(hash=new_hash)
query = query.where(ConsistencyHash.hash_id == old_record.hash_id)
query = query.where(ConsistencyHash.hash == old_record.hash)
with self._FACADE.get_engine().begin() as conn:
result = conn.execute(query)
# We need to check update row count in case another server is
# doing this at the same time. Only one will succeed, the other will
# not update any rows.
return result.rowcount != 0
def _get_lock_owner(self, record):
matches = re.findall("^LOCKED_BY\[(\w+)\]", record)
if not matches:
return None
return matches[0]
def read_for_update(self):
# An optimistic locking strategy with a timeout to avoid using a
# consistency hash while another server is using it. This will
# not return until a lock is acquired either normally or by stealing
# it after an individual ID holds it for greater than
# MAX_LOCK_WAIT_TIME.
lock_wait_start = None
last_lock_owner = None
while True:
res = self._get_current_record()
if not res:
# no current entry. try to insert to grab lock
if not self._insert_empty_hash_with_lock():
# A failed insert after missing current record means
# a concurrent insert occured. Start process over to
# find the new record.
LOG.debug("Concurrent record inserted. Retrying.")
time.sleep(0.25)
continue
# The empty hash was successfully inserted with our lock
return ''
current_lock_owner = self._get_lock_owner(res.hash)
if not current_lock_owner:
# no current lock. attempt to lock
new = self.lock_marker + res.hash
if not self._optimistic_update_hash_record(res, new):
# someone else beat us to it. restart process to wait
# for new lock ID to be removed
LOG.debug(
"Failed to acquire lock. Restarting lock wait. "
"Previous hash: %(prev)s. Attempted update: %(new)s" %
{'prev': res.hash, 'new': new})
time.sleep(0.25)
continue
# successfully got the lock
return res.hash
LOG.debug("This request's lock ID is %(this)s. "
"DB lock held by %(that)s" %
{'this': self.random_lock_id,
'that': current_lock_owner})
if current_lock_owner == self.random_lock_id:
# no change needed, we already have the table lock due to
# previous read_for_update call.
# return hash with lock tag stripped off for use in a header
return res.hash.replace(self.lock_marker, '')
if current_lock_owner != last_lock_owner:
# The owner changed since the last iteration, but it
# wasn't to us. Reset the counter. Log if not
# first iteration.
if lock_wait_start:
LOG.debug("Lock owner changed from %(old)s to %(new)s "
"while waiting to acquire it.",
{'old': last_lock_owner,
'new': current_lock_owner})
lock_wait_start = time.time()
last_lock_owner = current_lock_owner
if time.time() - lock_wait_start > MAX_LOCK_WAIT_TIME:
# the lock has been held too long, steal it
LOG.warning(_LW("Gave up waiting for consistency DB "
"lock, trying to take it. "
"Current hash is: %s"), res.hash)
new_db_value = res.hash.replace(current_lock_owner,
self.random_lock_id)
if self._optimistic_update_hash_record(res, new_db_value):
return res.hash.replace(new_db_value, '')
LOG.info(_LI("Failed to take lock. Another process updated "
"the DB first."))
def clear_lock(self):
LOG.debug("Clearing hash record lock of id %s" % self.random_lock_id)
with self.session.begin(subtransactions=True):
res = (self.session.query(ConsistencyHash).
filter_by(hash_id=self.hash_id).first())
if not res:
LOG.warning(_LW("Hash record already gone, no lock to clear."))
return
if not res.hash.startswith(self.lock_marker):
# if these are frequent the server is too slow
LOG.warning(_LW("Another server already removed the lock. %s"),
res.hash)
return
res.hash = res.hash.replace(self.lock_marker, '')
def put_hash(self, hash):
hash = hash or ''
with self.session.begin(subtransactions=True):
if self.hash_db_obj is not None:
self.hash_db_obj.hash = hash
res = (self.session.query(ConsistencyHash).
filter_by(hash_id=self.hash_id).first())
if res:
res.hash = hash
else:
conhash = ConsistencyHash(hash_id=self.hash_id, hash=hash)
self.session.merge(conhash)

View File

@ -40,7 +40,6 @@ from oslo.config import cfg
from oslo.serialization import jsonutils
from neutron.common import exceptions
from neutron.common import utils
from neutron.openstack.common import excutils
from neutron.openstack.common import log as logging
from neutron.plugins.bigswitch.db import consistency_db as cdb
@ -191,11 +190,17 @@ class ServerProxy(object):
# don't clear hash from DB if a hash header wasn't present
if hash_value is not None:
hash_handler.put_hash(hash_value)
else:
hash_handler.clear_lock()
try:
respdata = jsonutils.loads(respstr)
except ValueError:
# response was not JSON, ignore the exception
pass
else:
# release lock so others don't have to wait for timeout
hash_handler.clear_lock()
ret = (response.status, response.reason, respstr, respdata)
except httplib.HTTPException:
# If we were using a cached connection, try again with a new one.
@ -419,7 +424,6 @@ class ServerPool(object):
"""
return resp[0] in SUCCESS_CODES
@utils.synchronized('bsn-rest-call')
def rest_call(self, action, resource, data, headers, ignore_codes,
timeout=False):
context = self.get_context_ref()
@ -430,7 +434,7 @@ class ServerPool(object):
# backend controller
cdict.pop('auth_token', None)
headers[REQ_CONTEXT_HEADER] = jsonutils.dumps(cdict)
hash_handler = cdb.HashHandler(context=context)
hash_handler = cdb.HashHandler()
good_first = sorted(self.servers, key=lambda x: x.failed)
first_response = None
for active_server in good_first:

View File

@ -29,4 +29,5 @@ class BigSwitchDhcpAgentNotifierTestCase(
self.setup_config_files()
self.setup_patches()
super(BigSwitchDhcpAgentNotifierTestCase, self).setUp()
self.setup_db()
self.startHttpPatch()

View File

@ -21,6 +21,7 @@ from oslo.config import cfg
import neutron.common.test_lib as test_lib
from neutron.plugins.bigswitch import config
from neutron.plugins.bigswitch.db import consistency_db
from neutron.tests.unit.bigswitch import fake_server
@ -44,6 +45,7 @@ class BigSwitchTestBase(object):
test_lib.test_config['config_files'] = [os.path.join(etc_path,
'restproxy.ini.test')]
self.addCleanup(cfg.CONF.reset)
self.addCleanup(consistency_db.clear_db)
config.register_config()
# Only try SSL on SSL tests
cfg.CONF.set_override('server_ssl', False, 'RESTPROXY')
@ -68,3 +70,7 @@ class BigSwitchTestBase(object):
self.httpPatch = mock.patch(HTTPCON,
new=fake_server.HTTPConnectionMock)
self.httpPatch.start()
def setup_db(self):
# setup the db engine and models for the consistency db
consistency_db.setup_db()

View File

@ -48,6 +48,7 @@ class BigSwitchProxyPluginV2TestCase(test_base.BigSwitchTestBase,
service_plugins = {'L3_ROUTER_NAT': self._l3_plugin_name}
super(BigSwitchProxyPluginV2TestCase,
self).setUp(self._plugin_name, service_plugins=service_plugins)
self.setup_db()
self.port_create_status = 'BUILD'
self.startHttpPatch()

View File

@ -61,6 +61,7 @@ class DHCPOptsTestCase(test_base.BigSwitchTestBase,
self.setup_config_files()
super(test_extradhcp.ExtraDhcpOptDBTestCase,
self).setUp(plugin=self._plugin_name)
self.setup_db()
self.startHttpPatch()
@ -78,6 +79,7 @@ class RouterDBTestBase(test_base.BigSwitchTestBase,
super(RouterDBTestBase, self).setUp(plugin=self._plugin_name,
ext_mgr=ext_mgr,
service_plugins=service_plugins)
self.setup_db()
cfg.CONF.set_default('allow_overlapping_ips', False)
self.plugin_obj = manager.NeutronManager.get_service_plugins().get(
'L3_ROUTER_NAT')

View File

@ -30,6 +30,7 @@ class RestProxySecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase,
self.setup_patches()
self._attribute_map_bk_ = {}
super(RestProxySecurityGroupsTestCase, self).setUp(self.plugin_str)
self.setup_db()
plugin = manager.NeutronManager.get_plugin()
self.notifier = plugin.notifier
self.rpc = plugin.endpoints[0]

View File

@ -18,12 +18,13 @@ import ssl
import mock
from oslo.config import cfg
from oslo.db import exception as db_exc
from oslo.serialization import jsonutils
from neutron import context
from neutron import manager
from neutron.openstack.common import importutils
from neutron.plugins.bigswitch.db import consistency_db as cdb
from neutron.plugins.bigswitch.db import consistency_db
from neutron.plugins.bigswitch import servermanager
from neutron.tests.unit.bigswitch import test_restproxy_plugin as test_rp
@ -414,7 +415,7 @@ class ServerManagerTests(test_rp.BigSwitchProxyPluginV2TestCase):
def test_delete_failure_sets_bad_hash(self):
pl = manager.NeutronManager.get_plugin()
hash_handler = cdb.HashHandler()
hash_handler = consistency_db.HashHandler()
with mock.patch(
SERVERMANAGER + '.ServerProxy.rest_call',
return_value=(httplib.INTERNAL_SERVER_ERROR, 0, 0, 0)
@ -541,3 +542,126 @@ class TestSockets(test_rp.BigSwitchProxyPluginV2TestCase):
con = self.sm.HTTPSConnectionWithValidation('127.0.0.1', 0, timeout=1)
# if httpcon was created, a connect attempt should raise a socket error
self.assertRaises(socket.error, con.connect)
class HashLockingTests(test_rp.BigSwitchProxyPluginV2TestCase):
def _get_hash_from_handler_db(self, handler):
with handler.session.begin(subtransactions=True):
res = (handler.session.query(consistency_db.ConsistencyHash).
filter_by(hash_id=handler.hash_id).first())
return res.hash
def test_hash_handle_lock_no_initial_record(self):
handler = consistency_db.HashHandler()
h1 = handler.read_for_update()
# return to caller should be empty even with lock in DB
self.assertFalse(h1)
# db should have a lock marker
self.assertEqual(handler.lock_marker,
self._get_hash_from_handler_db(handler))
# an entry should clear the lock
handler.put_hash('DIGEST')
self.assertEqual('DIGEST', self._get_hash_from_handler_db(handler))
def test_hash_handle_lock_existing_record(self):
handler = consistency_db.HashHandler()
handler.put_hash('DIGEST') # set initial hash
h1 = handler.read_for_update()
self.assertEqual('DIGEST', h1)
self.assertEqual(handler.lock_marker + 'DIGEST',
self._get_hash_from_handler_db(handler))
# make sure update works
handler.put_hash('DIGEST2')
self.assertEqual('DIGEST2', self._get_hash_from_handler_db(handler))
def test_db_duplicate_on_insert(self):
handler = consistency_db.HashHandler()
with mock.patch.object(
handler.session, 'add', side_effect=[db_exc.DBDuplicateEntry, '']
) as add_mock:
handler.read_for_update()
# duplicate insert failure should result in retry
self.assertEqual(2, add_mock.call_count)
def test_update_hit_no_records(self):
handler = consistency_db.HashHandler()
# set initial hash so update will be required
handler.put_hash('DIGEST')
with mock.patch.object(handler._FACADE, 'get_engine') as ge:
conn = ge.return_value.begin.return_value.__enter__.return_value
firstresult = mock.Mock()
# a rowcount of 0 simulates the effect of another db client
# updating the same record the handler was trying to update
firstresult.rowcount = 0
secondresult = mock.Mock()
secondresult.rowcount = 1
conn.execute.side_effect = [firstresult, secondresult]
handler.read_for_update()
# update should have been called again after the failure
self.assertEqual(2, conn.execute.call_count)
def test_handler_already_holding_lock(self):
handler = consistency_db.HashHandler()
handler.read_for_update() # lock the table
with mock.patch.object(handler._FACADE, 'get_engine') as ge:
handler.read_for_update()
# get engine should not have been called because no update
# should have been made
self.assertFalse(ge.called)
def test_clear_lock(self):
handler = consistency_db.HashHandler()
handler.put_hash('SOMEHASH')
handler.read_for_update() # lock the table
self.assertEqual(handler.lock_marker + 'SOMEHASH',
self._get_hash_from_handler_db(handler))
handler.clear_lock()
self.assertEqual('SOMEHASH',
self._get_hash_from_handler_db(handler))
def test_clear_lock_skip_after_steal(self):
handler1 = consistency_db.HashHandler()
handler1.read_for_update() # lock the table
handler2 = consistency_db.HashHandler()
with mock.patch.object(consistency_db, 'MAX_LOCK_WAIT_TIME', new=0):
handler2.read_for_update()
before = self._get_hash_from_handler_db(handler1)
# handler1 should not clear handler2's lock
handler1.clear_lock()
self.assertEqual(before, self._get_hash_from_handler_db(handler1))
def test_take_lock_from_other(self):
handler1 = consistency_db.HashHandler()
handler1.read_for_update() # lock the table
handler2 = consistency_db.HashHandler()
with mock.patch.object(consistency_db, 'MAX_LOCK_WAIT_TIME') as mlock:
# make handler2 wait for only one iteration
mlock.__lt__.side_effect = [False, True]
handler2.read_for_update()
# once MAX LOCK exceeded, comparisons should stop due to lock steal
self.assertEqual(2, mlock.__lt__.call_count)
dbentry = self._get_hash_from_handler_db(handler1)
# handler2 should have the lock
self.assertIn(handler2.lock_marker, dbentry)
self.assertNotIn(handler1.lock_marker, dbentry)
# lock protection only blocks read_for_update, anyone can change
handler1.put_hash('H1')
def test_failure_to_steal_lock(self):
handler1 = consistency_db.HashHandler()
handler1.read_for_update() # lock the table
handler2 = consistency_db.HashHandler()
with contextlib.nested(
mock.patch.object(consistency_db, 'MAX_LOCK_WAIT_TIME'),
mock.patch.object(handler2, '_optimistic_update_hash_record',
side_effect=[False, True])
) as (mlock, oplock):
# handler2 will go through 2 iterations since the lock will fail on
# the first attempt
mlock.__lt__.side_effect = [False, True, False, True]
handler2.read_for_update()
self.assertEqual(4, mlock.__lt__.call_count)
self.assertEqual(2, oplock.call_count)

View File

@ -82,6 +82,7 @@ class test_ssl_certificate_base(test_plugin.NeutronDbPluginV2TestCase,
def setUp(self):
super(test_ssl_certificate_base, self).setUp(self.plugin_str)
self.setup_db()
class TestSslSticky(test_ssl_certificate_base):