Merge "Add cumulative and gauge to aggregator transformer"
This commit is contained in:
commit
c638d043b1
@ -1219,6 +1219,42 @@ class BasePipelineTestCase(test.BaseTestCase):
|
|||||||
self.assertEqual(expected_length, len(publisher.samples))
|
self.assertEqual(expected_length, len(publisher.samples))
|
||||||
return sorted(publisher.samples, key=lambda s: s.volume)
|
return sorted(publisher.samples, key=lambda s: s.volume)
|
||||||
|
|
||||||
|
def test_aggregator_meter_type(self):
|
||||||
|
volumes = [1.0, 2.0, 3.0]
|
||||||
|
transformer_cfg = [
|
||||||
|
{
|
||||||
|
'name': 'aggregator',
|
||||||
|
'parameters': {'size': len(volumes) * len(sample.TYPES)}
|
||||||
|
},
|
||||||
|
]
|
||||||
|
self._set_pipeline_cfg('transformers', transformer_cfg)
|
||||||
|
self._set_pipeline_cfg('counters',
|
||||||
|
['testgauge', 'testcumulative', 'testdelta'])
|
||||||
|
counters = []
|
||||||
|
for sample_type in sample.TYPES:
|
||||||
|
for volume in volumes:
|
||||||
|
counters.append(sample.Sample(
|
||||||
|
name='test' + sample_type,
|
||||||
|
type=sample_type,
|
||||||
|
volume=volume,
|
||||||
|
unit='B',
|
||||||
|
user_id='test_user',
|
||||||
|
project_id='test_proj',
|
||||||
|
resource_id='test_resource',
|
||||||
|
timestamp=timeutils.utcnow().isoformat(),
|
||||||
|
resource_metadata={'version': '1.0'}
|
||||||
|
))
|
||||||
|
|
||||||
|
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
|
||||||
|
self.transformer_manager)
|
||||||
|
pipe = pipeline_manager.pipelines[0]
|
||||||
|
|
||||||
|
pipe.publish_samples(None, counters)
|
||||||
|
pipe.flush(None)
|
||||||
|
publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||||
|
actual = sorted(s.volume for s in publisher.samples)
|
||||||
|
self.assertEqual([2.0, 3.0, 6.0], actual)
|
||||||
|
|
||||||
def test_aggregator_input_validation(self):
|
def test_aggregator_input_validation(self):
|
||||||
aggregator = conversions.AggregatorTransformer("1", "15", None,
|
aggregator = conversions.AggregatorTransformer("1", "15", None,
|
||||||
None, None)
|
None, None)
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import collections
|
||||||
import re
|
import re
|
||||||
|
|
||||||
import six
|
import six
|
||||||
@ -165,6 +166,7 @@ class AggregatorTransformer(ScalingTransformer):
|
|||||||
**kwargs):
|
**kwargs):
|
||||||
super(AggregatorTransformer, self).__init__(**kwargs)
|
super(AggregatorTransformer, self).__init__(**kwargs)
|
||||||
self.samples = {}
|
self.samples = {}
|
||||||
|
self.counts = collections.defaultdict(int)
|
||||||
self.size = int(size) if size else None
|
self.size = int(size) if size else None
|
||||||
self.retention_time = float(retention_time) if retention_time else None
|
self.retention_time = float(retention_time) if retention_time else None
|
||||||
self.initial_timestamp = None
|
self.initial_timestamp = None
|
||||||
@ -200,23 +202,27 @@ class AggregatorTransformer(ScalingTransformer):
|
|||||||
# NOTE(sileht): it assumes, a meter always have the same unit/type
|
# NOTE(sileht): it assumes, a meter always have the same unit/type
|
||||||
return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
|
return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
|
||||||
|
|
||||||
def handle_sample(self, context, sample):
|
def handle_sample(self, context, sample_):
|
||||||
if not self.initial_timestamp:
|
if not self.initial_timestamp:
|
||||||
self.initial_timestamp = timeutils.parse_isotime(sample.timestamp)
|
self.initial_timestamp = timeutils.parse_isotime(sample_.timestamp)
|
||||||
|
|
||||||
self.aggregated_samples += 1
|
self.aggregated_samples += 1
|
||||||
key = self._get_unique_key(sample)
|
key = self._get_unique_key(sample_)
|
||||||
|
self.counts[key] += 1
|
||||||
if key not in self.samples:
|
if key not in self.samples:
|
||||||
self.samples[key] = self._convert(sample)
|
self.samples[key] = self._convert(sample_)
|
||||||
if self.merged_attribute_policy[
|
if self.merged_attribute_policy[
|
||||||
'resource_metadata'] == 'drop':
|
'resource_metadata'] == 'drop':
|
||||||
self.samples[key].resource_metadata = {}
|
self.samples[key].resource_metadata = {}
|
||||||
else:
|
else:
|
||||||
self.samples[key].volume += self._scale(sample)
|
if sample_.type == sample.TYPE_CUMULATIVE:
|
||||||
|
self.samples[key].volume = self._scale(sample_)
|
||||||
|
else:
|
||||||
|
self.samples[key].volume += self._scale(sample_)
|
||||||
for field in self.merged_attribute_policy:
|
for field in self.merged_attribute_policy:
|
||||||
if self.merged_attribute_policy[field] == 'last':
|
if self.merged_attribute_policy[field] == 'last':
|
||||||
setattr(self.samples[key], field,
|
setattr(self.samples[key], field,
|
||||||
getattr(sample, field))
|
getattr(sample_, field))
|
||||||
|
|
||||||
def flush(self, context):
|
def flush(self, context):
|
||||||
if not self.initial_timestamp:
|
if not self.initial_timestamp:
|
||||||
@ -228,7 +234,13 @@ class AggregatorTransformer(ScalingTransformer):
|
|||||||
full = self.aggregated_samples >= self.size
|
full = self.aggregated_samples >= self.size
|
||||||
if full or expired:
|
if full or expired:
|
||||||
x = self.samples.values()
|
x = self.samples.values()
|
||||||
self.samples = {}
|
# gauge aggregates need to be averages
|
||||||
|
for s in x:
|
||||||
|
if s.type == sample.TYPE_GAUGE:
|
||||||
|
key = self._get_unique_key(s)
|
||||||
|
s.volume /= self.counts[key]
|
||||||
|
self.samples.clear()
|
||||||
|
self.counts.clear()
|
||||||
self.aggregated_samples = 0
|
self.aggregated_samples = 0
|
||||||
self.initial_timestamp = None
|
self.initial_timestamp = None
|
||||||
return x
|
return x
|
||||||
|
Loading…
x
Reference in New Issue
Block a user