Add cumulative and gauge to aggregator transformer
Currently, the aggregator transformer just sums up the volumes of all aggregated samples. This patch improves it so it returns just the latest volume for cumulative meters and the average of volumes for gauge meters. DocImpact: Aggregator transformer now works for gauge and cumulative as well Change-Id: I172a7298dc063ff36b2a209df25d6650debb5a14 Closes-bug: #1342668
This commit is contained in:
parent
b6cf110df8
commit
454eba9e9e
@ -1207,6 +1207,42 @@ class BasePipelineTestCase(test.BaseTestCase):
|
||||
self.assertEqual(expected_length, len(publisher.samples))
|
||||
return sorted(publisher.samples, key=lambda s: s.volume)
|
||||
|
||||
def test_aggregator_meter_type(self):
|
||||
volumes = [1.0, 2.0, 3.0]
|
||||
transformer_cfg = [
|
||||
{
|
||||
'name': 'aggregator',
|
||||
'parameters': {'size': len(volumes) * len(sample.TYPES)}
|
||||
},
|
||||
]
|
||||
self._set_pipeline_cfg('transformers', transformer_cfg)
|
||||
self._set_pipeline_cfg('counters',
|
||||
['testgauge', 'testcumulative', 'testdelta'])
|
||||
counters = []
|
||||
for sample_type in sample.TYPES:
|
||||
for volume in volumes:
|
||||
counters.append(sample.Sample(
|
||||
name='test' + sample_type,
|
||||
type=sample_type,
|
||||
volume=volume,
|
||||
unit='B',
|
||||
user_id='test_user',
|
||||
project_id='test_proj',
|
||||
resource_id='test_resource',
|
||||
timestamp=timeutils.utcnow().isoformat(),
|
||||
resource_metadata={'version': '1.0'}
|
||||
))
|
||||
|
||||
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||
self.transformer_manager)
|
||||
pipe = pipeline_manager.pipelines[0]
|
||||
|
||||
pipe.publish_samples(None, counters)
|
||||
pipe.flush(None)
|
||||
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||
actual = sorted(s.volume for s in publisher.samples)
|
||||
self.assertEqual([2.0, 3.0, 6.0], actual)
|
||||
|
||||
def test_aggregator_input_validation(self):
|
||||
aggregator = conversions.AggregatorTransformer("1", "15", None,
|
||||
None, None)
|
||||
|
@ -15,6 +15,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import re
|
||||
|
||||
from ceilometer.openstack.common.gettextutils import _
|
||||
@ -163,6 +164,7 @@ class AggregatorTransformer(ScalingTransformer):
|
||||
**kwargs):
|
||||
super(AggregatorTransformer, self).__init__(**kwargs)
|
||||
self.samples = {}
|
||||
self.counts = collections.defaultdict(int)
|
||||
self.size = int(size) if size else None
|
||||
self.retention_time = float(retention_time) if retention_time else None
|
||||
self.initial_timestamp = None
|
||||
@ -198,23 +200,27 @@ class AggregatorTransformer(ScalingTransformer):
|
||||
# NOTE(sileht): it assumes, a meter always have the same unit/type
|
||||
return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
|
||||
|
||||
def handle_sample(self, context, sample):
|
||||
def handle_sample(self, context, sample_):
|
||||
if not self.initial_timestamp:
|
||||
self.initial_timestamp = timeutils.parse_isotime(sample.timestamp)
|
||||
self.initial_timestamp = timeutils.parse_isotime(sample_.timestamp)
|
||||
|
||||
self.aggregated_samples += 1
|
||||
key = self._get_unique_key(sample)
|
||||
key = self._get_unique_key(sample_)
|
||||
self.counts[key] += 1
|
||||
if key not in self.samples:
|
||||
self.samples[key] = self._convert(sample)
|
||||
self.samples[key] = self._convert(sample_)
|
||||
if self.merged_attribute_policy[
|
||||
'resource_metadata'] == 'drop':
|
||||
self.samples[key].resource_metadata = {}
|
||||
else:
|
||||
self.samples[key].volume += self._scale(sample)
|
||||
if sample_.type == sample.TYPE_CUMULATIVE:
|
||||
self.samples[key].volume = self._scale(sample_)
|
||||
else:
|
||||
self.samples[key].volume += self._scale(sample_)
|
||||
for field in self.merged_attribute_policy:
|
||||
if self.merged_attribute_policy[field] == 'last':
|
||||
setattr(self.samples[key], field,
|
||||
getattr(sample, field))
|
||||
getattr(sample_, field))
|
||||
|
||||
def flush(self, context):
|
||||
expired = (self.retention_time and
|
||||
@ -223,7 +229,13 @@ class AggregatorTransformer(ScalingTransformer):
|
||||
full = self.aggregated_samples >= self.size
|
||||
if full or expired:
|
||||
x = self.samples.values()
|
||||
self.samples = {}
|
||||
# gauge aggregates need to be averages
|
||||
for s in x:
|
||||
if s.type == sample.TYPE_GAUGE:
|
||||
key = self._get_unique_key(s)
|
||||
s.volume /= self.counts[key]
|
||||
self.samples.clear()
|
||||
self.counts.clear()
|
||||
self.aggregated_samples = 0
|
||||
self.initial_timestamp = None
|
||||
return x
|
||||
|
Loading…
x
Reference in New Issue
Block a user