Change of get_events and get_traits method in MongoDB and Hbase
This redesigns get_events method to generator style, with some additional changes in unit tests and api.v2 get_traits method is also improved in this patch, additional sorting was removed from get_traits method. Closes-bug: #1339626 Change-Id: Ie65cc6f6c85eab53847176273b27de693bd2052f
This commit is contained in:
parent
9164d6f60b
commit
c1f4eaf312
@ -2347,7 +2347,8 @@ class EventsController(rest.RestController):
|
|||||||
:param message_id: Message ID of the Event to be returned
|
:param message_id: Message ID of the Event to be returned
|
||||||
"""
|
"""
|
||||||
event_filter = storage.EventFilter(message_id=message_id)
|
event_filter = storage.EventFilter(message_id=message_id)
|
||||||
events = pecan.request.storage_conn.get_events(event_filter)
|
events = [event for event
|
||||||
|
in pecan.request.storage_conn.get_events(event_filter)]
|
||||||
if not events:
|
if not events:
|
||||||
raise EntityNotFound(_("Event"), message_id)
|
raise EntityNotFound(_("Event"), message_id)
|
||||||
|
|
||||||
|
@ -20,7 +20,7 @@ import bson.json_util
|
|||||||
|
|
||||||
from ceilometer import utils
|
from ceilometer import utils
|
||||||
|
|
||||||
DTYPE_NAMES = {'none': 0, 'string': 1, 'integer': 2, 'float': 3,
|
EVENT_TRAIT_TYPES = {'none': 0, 'string': 1, 'integer': 2, 'float': 3,
|
||||||
'datetime': 4}
|
'datetime': 4}
|
||||||
OP_SIGN = {'eq': '=', 'lt': '<', 'le': '<=', 'ne': '!=', 'gt': '>',
|
OP_SIGN = {'eq': '=', 'lt': '<', 'le': '<=', 'ne': '!=', 'gt': '>',
|
||||||
'ge': '>='}
|
'ge': '>='}
|
||||||
@ -152,7 +152,7 @@ def make_query(metaquery=None, trait_query=None, **kwargs):
|
|||||||
if v is not None:
|
if v is not None:
|
||||||
res_q = ("SingleColumnValueFilter "
|
res_q = ("SingleColumnValueFilter "
|
||||||
"('f', '%s+%d', %s, 'binary:%s', true, true)" %
|
"('f', '%s+%d', %s, 'binary:%s', true, true)" %
|
||||||
(trait_name, DTYPE_NAMES[k], OP_SIGN[op],
|
(trait_name, EVENT_TRAIT_TYPES[k], OP_SIGN[op],
|
||||||
dump(v)))
|
dump(v)))
|
||||||
return res_q
|
return res_q
|
||||||
|
|
||||||
|
@ -633,7 +633,7 @@ class Connection(base.Connection, alarm_base.Connection):
|
|||||||
return problem_events
|
return problem_events
|
||||||
|
|
||||||
def get_events(self, event_filter):
|
def get_events(self, event_filter):
|
||||||
"""Return an iterable of models.Event objects.
|
"""Return an iter of models.Event objects.
|
||||||
|
|
||||||
:param event_filter: storage.EventFilter object, consists of filters
|
:param event_filter: storage.EventFilter object, consists of filters
|
||||||
for events that are stored in database.
|
for events that are stored in database.
|
||||||
@ -645,7 +645,6 @@ class Connection(base.Connection, alarm_base.Connection):
|
|||||||
|
|
||||||
gen = events_table.scan(filter=q, row_start=start, row_stop=stop)
|
gen = events_table.scan(filter=q, row_start=start, row_stop=stop)
|
||||||
|
|
||||||
events = []
|
|
||||||
for event_id, data in gen:
|
for event_id, data in gen:
|
||||||
traits = []
|
traits = []
|
||||||
events_dict = hbase_utils.deserialize_entry(data)[0]
|
events_dict = hbase_utils.deserialize_entry(data)[0]
|
||||||
@ -658,14 +657,13 @@ class Connection(base.Connection, alarm_base.Connection):
|
|||||||
value=value))
|
value=value))
|
||||||
ts, mess = event_id.split('_', 1)
|
ts, mess = event_id.split('_', 1)
|
||||||
|
|
||||||
events.append(models.Event(
|
yield models.Event(
|
||||||
message_id=mess,
|
message_id=mess,
|
||||||
event_type=events_dict['event_type'],
|
event_type=events_dict['event_type'],
|
||||||
generated=events_dict['timestamp'],
|
generated=events_dict['timestamp'],
|
||||||
traits=sorted(traits, key=(lambda item:
|
traits=sorted(traits,
|
||||||
getattr(item, 'dtype')))
|
key=operator.attrgetter('dtype'))
|
||||||
))
|
)
|
||||||
return events
|
|
||||||
|
|
||||||
def get_event_types(self):
|
def get_event_types(self):
|
||||||
"""Return all event types as an iterable of strings."""
|
"""Return all event types as an iterable of strings."""
|
||||||
@ -691,7 +689,7 @@ class Connection(base.Connection, alarm_base.Connection):
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
q = hbase_utils.make_query(event_type=event_type)
|
q = hbase_utils.make_query(event_type=event_type)
|
||||||
trait_types = set()
|
trait_names = set()
|
||||||
with self.conn_pool.connection() as conn:
|
with self.conn_pool.connection() as conn:
|
||||||
events_table = conn.table(self.EVENT_TABLE)
|
events_table = conn.table(self.EVENT_TABLE)
|
||||||
gen = events_table.scan(filter=q)
|
gen = events_table.scan(filter=q)
|
||||||
@ -700,17 +698,17 @@ class Connection(base.Connection, alarm_base.Connection):
|
|||||||
for key, value in events_dict.items():
|
for key, value in events_dict.items():
|
||||||
if (not key.startswith('event_type') and
|
if (not key.startswith('event_type') and
|
||||||
not key.startswith('timestamp')):
|
not key.startswith('timestamp')):
|
||||||
name, tt_number = key.rsplit('+', 1)
|
trait_name, trait_type = key.rsplit('+', 1)
|
||||||
if name not in trait_types:
|
if trait_name not in trait_names:
|
||||||
# Here we check that our method return only unique
|
# Here we check that our method return only unique
|
||||||
# trait types, for ex. if it is found the same trait
|
# trait types, for ex. if it is found the same trait
|
||||||
# types in different events with equal event_type,
|
# types in different events with equal event_type,
|
||||||
# method will return only one trait type. It is
|
# method will return only one trait type. It is
|
||||||
# proposed that certain trait name could have only one
|
# proposed that certain trait name could have only one
|
||||||
# trait type.
|
# trait type.
|
||||||
trait_types.add(name)
|
trait_names.add(trait_name)
|
||||||
data_type = models.Trait.type_names[int(tt_number)]
|
data_type = models.Trait.type_names[int(trait_type)]
|
||||||
yield {'name': name, 'data_type': data_type}
|
yield {'name': trait_name, 'data_type': data_type}
|
||||||
|
|
||||||
def get_traits(self, event_type, trait_type=None):
|
def get_traits(self, event_type, trait_type=None):
|
||||||
"""Return all trait instances associated with an event_type.
|
"""Return all trait instances associated with an event_type.
|
||||||
@ -721,7 +719,6 @@ class Connection(base.Connection, alarm_base.Connection):
|
|||||||
"""
|
"""
|
||||||
q = hbase_utils.make_query(event_type=event_type,
|
q = hbase_utils.make_query(event_type=event_type,
|
||||||
trait_type=trait_type)
|
trait_type=trait_type)
|
||||||
traits = []
|
|
||||||
with self.conn_pool.connection() as conn:
|
with self.conn_pool.connection() as conn:
|
||||||
events_table = conn.table(self.EVENT_TABLE)
|
events_table = conn.table(self.EVENT_TABLE)
|
||||||
gen = events_table.scan(filter=q)
|
gen = events_table.scan(filter=q)
|
||||||
@ -730,8 +727,6 @@ class Connection(base.Connection, alarm_base.Connection):
|
|||||||
for key, value in events_dict.items():
|
for key, value in events_dict.items():
|
||||||
if (not key.startswith('event_type') and
|
if (not key.startswith('event_type') and
|
||||||
not key.startswith('timestamp')):
|
not key.startswith('timestamp')):
|
||||||
name, tt_number = key.rsplit('+', 1)
|
trait_name, trait_type = key.rsplit('+', 1)
|
||||||
traits.append(models.Trait(name=name,
|
yield models.Trait(name=trait_name,
|
||||||
dtype=int(tt_number), value=value))
|
dtype=int(trait_type), value=value)
|
||||||
for trait in sorted(traits, key=operator.attrgetter('dtype')):
|
|
||||||
yield trait
|
|
||||||
|
@ -944,7 +944,8 @@ class Connection(base.Connection):
|
|||||||
try:
|
try:
|
||||||
with session.begin():
|
with session.begin():
|
||||||
event = self._record_event(session, event_model)
|
event = self._record_event(session, event_model)
|
||||||
except dbexc.DBDuplicateEntry:
|
except dbexc.DBDuplicateEntry as e:
|
||||||
|
LOG.exception(_("Failed to record duplicated event: %s") % e)
|
||||||
problem_events.append((api_models.Event.DUPLICATE,
|
problem_events.append((api_models.Event.DUPLICATE,
|
||||||
event_model))
|
event_model))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
@ -17,10 +17,6 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
"""Common functions for MongoDB and DB2 backends
|
"""Common functions for MongoDB and DB2 backends
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
import operator
|
|
||||||
|
|
||||||
import pymongo
|
import pymongo
|
||||||
|
|
||||||
from ceilometer.openstack.common.gettextutils import _
|
from ceilometer.openstack.common.gettextutils import _
|
||||||
@ -139,7 +135,8 @@ class Connection(base.Connection):
|
|||||||
'event_type': event_model.event_type,
|
'event_type': event_model.event_type,
|
||||||
'timestamp': event_model.generated,
|
'timestamp': event_model.generated,
|
||||||
'traits': traits})
|
'traits': traits})
|
||||||
except pymongo.errors.DuplicateKeyError:
|
except pymongo.errors.DuplicateKeyError as ex:
|
||||||
|
LOG.exception(_("Failed to record duplicated event: %s") % ex)
|
||||||
problem_events.append((models.Event.DUPLICATE,
|
problem_events.append((models.Event.DUPLICATE,
|
||||||
event_model))
|
event_model))
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
@ -149,24 +146,22 @@ class Connection(base.Connection):
|
|||||||
return problem_events
|
return problem_events
|
||||||
|
|
||||||
def get_events(self, event_filter):
|
def get_events(self, event_filter):
|
||||||
"""Return a list of models.Event objects.
|
"""Return an iter of models.Event objects.
|
||||||
|
|
||||||
:param event_filter: storage.EventFilter object, consists of filters
|
:param event_filter: storage.EventFilter object, consists of filters
|
||||||
for events that are stored in database.
|
for events that are stored in database.
|
||||||
"""
|
"""
|
||||||
q = pymongo_utils.make_events_query_from_filter(event_filter)
|
q = pymongo_utils.make_events_query_from_filter(event_filter)
|
||||||
res_events = []
|
|
||||||
for event in self.db.event.find(q):
|
for event in self.db.event.find(q):
|
||||||
traits = []
|
traits = []
|
||||||
for trait in event['traits']:
|
for trait in event['traits']:
|
||||||
traits.append(models.Trait(name=trait['trait_name'],
|
traits.append(models.Trait(name=trait['trait_name'],
|
||||||
dtype=int(trait['trait_type']),
|
dtype=int(trait['trait_type']),
|
||||||
value=trait['trait_value']))
|
value=trait['trait_value']))
|
||||||
res_events.append(models.Event(message_id=event['_id'],
|
yield models.Event(message_id=event['_id'],
|
||||||
event_type=event['event_type'],
|
event_type=event['event_type'],
|
||||||
generated=event['timestamp'],
|
generated=event['timestamp'],
|
||||||
traits=traits))
|
traits=traits)
|
||||||
return res_events
|
|
||||||
|
|
||||||
def get_event_types(self):
|
def get_event_types(self):
|
||||||
"""Return all event types as an iter of strings."""
|
"""Return all event types as an iter of strings."""
|
||||||
@ -219,14 +214,11 @@ class Connection(base.Connection):
|
|||||||
{'traits': {'$elemMatch':
|
{'traits': {'$elemMatch':
|
||||||
{'trait_name': trait_name}}
|
{'trait_name': trait_name}}
|
||||||
})
|
})
|
||||||
traits = []
|
|
||||||
for event in events:
|
for event in events:
|
||||||
for trait in event['traits']:
|
for trait in event['traits']:
|
||||||
traits.append(models.Trait(name=trait['trait_name'],
|
yield models.Trait(name=trait['trait_name'],
|
||||||
dtype=trait['trait_type'],
|
dtype=trait['trait_type'],
|
||||||
value=trait['trait_value']))
|
value=trait['trait_value'])
|
||||||
for trait in sorted(traits, key=operator.attrgetter('dtype')):
|
|
||||||
yield trait
|
|
||||||
|
|
||||||
def query_samples(self, filter_expr=None, orderby=None, limit=None):
|
def query_samples(self, filter_expr=None, orderby=None, limit=None):
|
||||||
if limit == 0:
|
if limit == 0:
|
||||||
|
@ -20,6 +20,7 @@
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import datetime
|
import datetime
|
||||||
|
import operator
|
||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
@ -2754,7 +2755,7 @@ class GetEventTest(EventTestBase):
|
|||||||
|
|
||||||
def test_generated_is_datetime(self):
|
def test_generated_is_datetime(self):
|
||||||
event_filter = storage.EventFilter(self.start, self.end)
|
event_filter = storage.EventFilter(self.start, self.end)
|
||||||
events = self.conn.get_events(event_filter)
|
events = [event for event in self.conn.get_events(event_filter)]
|
||||||
self.assertEqual(6, len(events))
|
self.assertEqual(6, len(events))
|
||||||
for i, event in enumerate(events):
|
for i, event in enumerate(events):
|
||||||
self.assertIsInstance(event.generated, datetime.datetime)
|
self.assertIsInstance(event.generated, datetime.datetime)
|
||||||
@ -2768,7 +2769,7 @@ class GetEventTest(EventTestBase):
|
|||||||
|
|
||||||
def test_simple_get(self):
|
def test_simple_get(self):
|
||||||
event_filter = storage.EventFilter(self.start, self.end)
|
event_filter = storage.EventFilter(self.start, self.end)
|
||||||
events = self.conn.get_events(event_filter)
|
events = [event for event in self.conn.get_events(event_filter)]
|
||||||
self.assertEqual(6, len(events))
|
self.assertEqual(6, len(events))
|
||||||
start_time = None
|
start_time = None
|
||||||
for i, type in enumerate(['Foo', 'Bar', 'Zoo']):
|
for i, type in enumerate(['Foo', 'Bar', 'Zoo']):
|
||||||
@ -2797,7 +2798,7 @@ class GetEventTest(EventTestBase):
|
|||||||
}
|
}
|
||||||
|
|
||||||
event_filter = storage.EventFilter(self.start, self.end, "Bar")
|
event_filter = storage.EventFilter(self.start, self.end, "Bar")
|
||||||
events = self.conn.get_events(event_filter)
|
events = [event for event in self.conn.get_events(event_filter)]
|
||||||
self.assertEqual(2, len(events))
|
self.assertEqual(2, len(events))
|
||||||
self.assertEqual(events[0].event_type, "Bar")
|
self.assertEqual(events[0].event_type, "Bar")
|
||||||
self.assertEqual(events[1].event_type, "Bar")
|
self.assertEqual(events[1].event_type, "Bar")
|
||||||
@ -2819,7 +2820,7 @@ class GetEventTest(EventTestBase):
|
|||||||
trait_filters = [{'key': 'trait_B', 'integer': 101}]
|
trait_filters = [{'key': 'trait_B', 'integer': 101}]
|
||||||
event_filter = storage.EventFilter(self.start, self.end,
|
event_filter = storage.EventFilter(self.start, self.end,
|
||||||
traits_filter=trait_filters)
|
traits_filter=trait_filters)
|
||||||
events = self.conn.get_events(event_filter)
|
events = [event for event in self.conn.get_events(event_filter)]
|
||||||
self.assertEqual(1, len(events))
|
self.assertEqual(1, len(events))
|
||||||
self.assertEqual(events[0].event_type, "Bar")
|
self.assertEqual(events[0].event_type, "Bar")
|
||||||
self.assertEqual(4, len(events[0].traits))
|
self.assertEqual(4, len(events[0].traits))
|
||||||
@ -2829,7 +2830,7 @@ class GetEventTest(EventTestBase):
|
|||||||
{'key': 'trait_A', 'string': 'my_Foo_text'}]
|
{'key': 'trait_A', 'string': 'my_Foo_text'}]
|
||||||
event_filter = storage.EventFilter(self.start, self.end,
|
event_filter = storage.EventFilter(self.start, self.end,
|
||||||
traits_filter=trait_filters)
|
traits_filter=trait_filters)
|
||||||
events = self.conn.get_events(event_filter)
|
events = [event for event in self.conn.get_events(event_filter)]
|
||||||
self.assertEqual(1, len(events))
|
self.assertEqual(1, len(events))
|
||||||
self.assertEqual(events[0].event_type, "Foo")
|
self.assertEqual(events[0].event_type, "Foo")
|
||||||
self.assertEqual(4, len(events[0].traits))
|
self.assertEqual(4, len(events[0].traits))
|
||||||
@ -2839,7 +2840,7 @@ class GetEventTest(EventTestBase):
|
|||||||
{'key': 'trait_A', 'string': 'my_Zoo_text'}]
|
{'key': 'trait_A', 'string': 'my_Zoo_text'}]
|
||||||
event_filter = storage.EventFilter(self.start, self.end,
|
event_filter = storage.EventFilter(self.start, self.end,
|
||||||
traits_filter=trait_filters)
|
traits_filter=trait_filters)
|
||||||
events = self.conn.get_events(event_filter)
|
events = [event for event in self.conn.get_events(event_filter)]
|
||||||
self.assertEqual(0, len(events))
|
self.assertEqual(0, len(events))
|
||||||
|
|
||||||
def test_get_event_types(self):
|
def test_get_event_types(self):
|
||||||
@ -2885,9 +2886,8 @@ class GetEventTest(EventTestBase):
|
|||||||
|
|
||||||
def test_get_all_traits(self):
|
def test_get_all_traits(self):
|
||||||
traits = self.conn.get_traits("Foo")
|
traits = self.conn.get_traits("Foo")
|
||||||
traits = [t for t in traits]
|
traits = sorted([t for t in traits], key=operator.attrgetter('dtype'))
|
||||||
self.assertEqual(8, len(traits))
|
self.assertEqual(8, len(traits))
|
||||||
|
|
||||||
trait = traits[0]
|
trait = traits[0]
|
||||||
self.assertEqual("trait_A", trait.name)
|
self.assertEqual("trait_A", trait.name)
|
||||||
self.assertEqual(models.Trait.TEXT_TYPE, trait.dtype)
|
self.assertEqual(models.Trait.TEXT_TYPE, trait.dtype)
|
||||||
@ -2896,7 +2896,7 @@ class GetEventTest(EventTestBase):
|
|||||||
new_events = [models.Event("id_notraits", "NoTraits", self.start, [])]
|
new_events = [models.Event("id_notraits", "NoTraits", self.start, [])]
|
||||||
bad_events = self.conn.record_events(new_events)
|
bad_events = self.conn.record_events(new_events)
|
||||||
event_filter = storage.EventFilter(self.start, self.end, "NoTraits")
|
event_filter = storage.EventFilter(self.start, self.end, "NoTraits")
|
||||||
events = self.conn.get_events(event_filter)
|
events = [event for event in self.conn.get_events(event_filter)]
|
||||||
self.assertEqual(0, len(bad_events))
|
self.assertEqual(0, len(bad_events))
|
||||||
self.assertEqual(1, len(events))
|
self.assertEqual(1, len(events))
|
||||||
self.assertEqual(events[0].message_id, "id_notraits")
|
self.assertEqual(events[0].message_id, "id_notraits")
|
||||||
@ -2905,7 +2905,7 @@ class GetEventTest(EventTestBase):
|
|||||||
|
|
||||||
def test_simple_get_no_filters(self):
|
def test_simple_get_no_filters(self):
|
||||||
event_filter = storage.EventFilter(None, None, None)
|
event_filter = storage.EventFilter(None, None, None)
|
||||||
events = self.conn.get_events(event_filter)
|
events = [event for event in self.conn.get_events(event_filter)]
|
||||||
self.assertEqual(6, len(events))
|
self.assertEqual(6, len(events))
|
||||||
|
|
||||||
def test_get_by_message_id(self):
|
def test_get_by_message_id(self):
|
||||||
@ -2916,7 +2916,7 @@ class GetEventTest(EventTestBase):
|
|||||||
|
|
||||||
bad_events = self.conn.record_events(new_events)
|
bad_events = self.conn.record_events(new_events)
|
||||||
event_filter = storage.EventFilter(message_id="id_testid")
|
event_filter = storage.EventFilter(message_id="id_testid")
|
||||||
events = self.conn.get_events(event_filter)
|
events = [event for event in self.conn.get_events(event_filter)]
|
||||||
self.assertEqual(0, len(bad_events))
|
self.assertEqual(0, len(bad_events))
|
||||||
self.assertEqual(1, len(events))
|
self.assertEqual(1, len(events))
|
||||||
event = events[0]
|
event = events[0]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user