Merge "use tooz hashring"

This commit is contained in:
Jenkins 2017-03-04 14:01:16 +00:00 committed by Gerrit Code Review
commit d3caa24565
5 changed files with 16 additions and 75 deletions

View File

@ -20,9 +20,9 @@ from oslo_config import cfg
from oslo_log import log from oslo_log import log
import tenacity import tenacity
import tooz.coordination import tooz.coordination
from tooz import hashring
from ceilometer.i18n import _LE, _LI, _LW from ceilometer.i18n import _LE, _LI, _LW
from ceilometer import utils
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
@ -216,10 +216,10 @@ class PartitionCoordinator(object):
raise MemberNotInGroupError(group_id, members, self._my_id) raise MemberNotInGroupError(group_id, members, self._my_id)
LOG.debug('Members of group %s are: %s, Me: %s', LOG.debug('Members of group %s are: %s, Me: %s',
group_id, members, self._my_id) group_id, members, self._my_id)
hr = utils.HashRing(members) hr = hashring.HashRing(members, partitions=100)
iterable = list(iterable) iterable = list(iterable)
filtered = [v for v in iterable filtered = [v for v in iterable
if hr.get_node(six.text_type(v)) == self._my_id] if self._my_id in hr.get_nodes(self.encode_task(v))]
LOG.debug('The universal set: %s, my subset: %s', LOG.debug('The universal set: %s, my subset: %s',
[six.text_type(f) for f in iterable], [six.text_type(f) for f in iterable],
[six.text_type(f) for f in filtered]) [six.text_type(f) for f in filtered])
@ -228,3 +228,8 @@ class PartitionCoordinator(object):
LOG.exception(_LE('Error getting group membership info from ' LOG.exception(_LE('Error getting group membership info from '
'coordination backend.')) 'coordination backend.'))
return [] return []
@staticmethod
def encode_task(value):
"""encode to bytes"""
return six.text_type(value).encode('utf-8')

View File

@ -18,10 +18,10 @@ import logging
import mock import mock
from oslo_config import fixture as fixture_config from oslo_config import fixture as fixture_config
import tooz.coordination import tooz.coordination
from tooz import hashring
from ceilometer import coordination from ceilometer import coordination
from ceilometer.tests import base from ceilometer.tests import base
from ceilometer import utils
class MockToozCoordinator(object): class MockToozCoordinator(object):
@ -204,9 +204,10 @@ class TestPartitioning(base.BaseTestCase):
agents = ['agent_%s' % i for i in range(10)] agents = ['agent_%s' % i for i in range(10)]
expected_resources = [list() for _ in range(len(agents))] expected_resources = [list() for _ in range(len(agents))]
hr = utils.HashRing(agents) hr = hashring.HashRing(agents, partitions=100)
for r in all_resources: for r in all_resources:
key = agents.index(hr.get_node(r)) encode = coordination.PartitionCoordinator.encode_task
key = agents.index(list(hr.get_nodes(encode(r)))[0])
expected_resources[key].append(r) expected_resources[key].append(r)
agents_kwargs = [] agents_kwargs = []
@ -289,9 +290,10 @@ class TestPartitioning(base.BaseTestCase):
agents = ['agent_%s' % i for i in range(2)] agents = ['agent_%s' % i for i in range(2)]
expected_resources = [list() for _ in range(len(agents))] expected_resources = [list() for _ in range(len(agents))]
hr = utils.HashRing(agents) hr = hashring.HashRing(agents, partitions=100)
for r in all_resources: for r in all_resources:
key = agents.index(hr.get_node(r)) encode = coordination.PartitionCoordinator.encode_task
key = agents.index(list(hr.get_nodes(encode(r)))[0])
expected_resources[key].append(r) expected_resources[key].append(r)
agents_kwargs = [] agents_kwargs = []

View File

@ -146,35 +146,3 @@ class TestUtils(base.BaseTestCase):
self.assertEqual(utils.hash_of_set(x), utils.hash_of_set(y)) self.assertEqual(utils.hash_of_set(x), utils.hash_of_set(y))
self.assertNotEqual(utils.hash_of_set(x), utils.hash_of_set(z)) self.assertNotEqual(utils.hash_of_set(x), utils.hash_of_set(z))
self.assertNotEqual(utils.hash_of_set(y), utils.hash_of_set(z)) self.assertNotEqual(utils.hash_of_set(y), utils.hash_of_set(z))
def test_hash_ring(self):
num_nodes = 10
num_keys = 1000
nodes = [str(x) for x in range(num_nodes)]
hr = utils.HashRing(nodes)
buckets = [0] * num_nodes
assignments = [-1] * num_keys
for k in range(num_keys):
n = int(hr.get_node(str(k)))
self.assertTrue(0 <= n <= num_nodes)
buckets[n] += 1
assignments[k] = n
# at least something in each bucket
self.assertTrue(all((c > 0 for c in buckets)))
# approximately even distribution
diff = max(buckets) - min(buckets)
self.assertTrue(diff < 0.3 * (num_keys / num_nodes))
# consistency
num_nodes += 1
nodes.append(str(num_nodes + 1))
hr = utils.HashRing(nodes)
for k in range(num_keys):
n = int(hr.get_node(str(k)))
assignments[k] -= n
reassigned = len([c for c in assignments if c != 0])
self.assertTrue(reassigned < num_keys / num_nodes)

View File

@ -18,13 +18,10 @@
"""Utilities and helper functions.""" """Utilities and helper functions."""
import bisect
import calendar import calendar
import copy import copy
import datetime import datetime
import decimal import decimal
import hashlib
import struct
import threading import threading
import time import time
@ -219,37 +216,6 @@ def hash_of_set(s):
return str(hash(frozenset(s))) return str(hash(frozenset(s)))
class HashRing(object):
def __init__(self, nodes, replicas=100):
self._ring = dict()
self._sorted_keys = []
for node in nodes:
for r in six.moves.range(replicas):
hashed_key = self._hash('%s-%s' % (node, r))
self._ring[hashed_key] = node
self._sorted_keys.append(hashed_key)
self._sorted_keys.sort()
@staticmethod
def _hash(key):
return struct.unpack_from('>I',
hashlib.md5(decode_unicode(six
.text_type(key))).digest())[0]
def _get_position_on_ring(self, key):
hashed_key = self._hash(key)
position = bisect.bisect(self._sorted_keys, hashed_key)
return position if position < len(self._sorted_keys) else 0
def get_node(self, key):
if not self._ring:
return None
pos = self._get_position_on_ring(key)
return self._ring[self._sorted_keys[pos]]
def kill_listeners(listeners): def kill_listeners(listeners):
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(), # NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
# which stops new messages, and wait(), which processes remaining # which stops new messages, and wait(), which processes remaining

View File

@ -43,7 +43,7 @@ six>=1.9.0 # MIT
SQLAlchemy<1.1.0,>=1.0.10 # MIT SQLAlchemy<1.1.0,>=1.0.10 # MIT
sqlalchemy-migrate>=0.9.6 # Apache-2.0 sqlalchemy-migrate>=0.9.6 # Apache-2.0
stevedore>=1.9.0 # Apache-2.0 stevedore>=1.9.0 # Apache-2.0
tooz>=1.28.0 # Apache-2.0 tooz>=1.47.0 # Apache-2.0
WebOb>=1.5.0 # MIT WebOb>=1.5.0 # MIT
WSME>=0.8 # MIT WSME>=0.8 # MIT
# NOTE(jd) We do not import it directly, but WSME datetime string parsing # NOTE(jd) We do not import it directly, but WSME datetime string parsing