Merge "pipeline: remove transformer support"
This commit is contained in:
commit
494d17f350
@ -91,33 +91,22 @@ class Sink(object):
|
||||
Each sink config is concerned *only* with the transformation rules
|
||||
and publication conduits for data.
|
||||
|
||||
In effect, a sink describes a chain of handlers. The chain starts
|
||||
with zero or more transformers and ends with one or more publishers.
|
||||
|
||||
The first transformer in the chain is passed data from the
|
||||
corresponding source, takes some action such as deriving rate of
|
||||
change, performing unit conversion, or aggregating, before passing
|
||||
the modified data to next step.
|
||||
|
||||
The subsequent transformers, if any, handle the data similarly.
|
||||
In effect, a sink describes a chain of handlers. The chain ends with one or
|
||||
more publishers.
|
||||
|
||||
At the end of the chain, publishers publish the data. The exact
|
||||
publishing method depends on publisher type, for example, pushing
|
||||
into data storage via the message bus providing guaranteed delivery,
|
||||
or for loss-tolerant data UDP may be used.
|
||||
|
||||
If no transformers are included in the chain, the publishers are
|
||||
passed data directly from the sink which are published unchanged.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, cfg, transformer_manager, publisher_manager):
|
||||
def __init__(self, conf, cfg, publisher_manager):
|
||||
self.conf = conf
|
||||
self.cfg = cfg
|
||||
|
||||
try:
|
||||
self.name = cfg['name']
|
||||
# It's legal to have no transformer specified
|
||||
self.transformer_cfg = cfg.get('transformers') or []
|
||||
except KeyError as err:
|
||||
raise PipelineException(
|
||||
"Required field %s not specified" % err.args[0], cfg)
|
||||
@ -138,30 +127,10 @@ class Sink(object):
|
||||
exc_info=True)
|
||||
|
||||
self.multi_publish = True if len(self.publishers) > 1 else False
|
||||
self.transformers = self._setup_transformers(cfg, transformer_manager)
|
||||
|
||||
def __str__(self):
|
||||
return self.name
|
||||
|
||||
def _setup_transformers(self, cfg, transformer_manager):
|
||||
transformers = []
|
||||
for transformer in self.transformer_cfg:
|
||||
parameter = transformer['parameters'] or {}
|
||||
try:
|
||||
ext = transformer_manager[transformer['name']]
|
||||
except KeyError:
|
||||
raise PipelineException(
|
||||
"No transformer named %s loaded" % transformer['name'],
|
||||
cfg)
|
||||
transformers.append(ext.plugin(**parameter))
|
||||
LOG.info(
|
||||
"Pipeline %(pipeline)s: Setup transformer instance %(name)s "
|
||||
"with parameter %(param)s" % ({'pipeline': self,
|
||||
'name': transformer['name'],
|
||||
'param': parameter}))
|
||||
|
||||
return transformers
|
||||
|
||||
@staticmethod
|
||||
def flush():
|
||||
"""Flush data after all events have been injected to pipeline."""
|
||||
@ -220,7 +189,7 @@ class PipelineManager(agent.ConfigManagerBase):
|
||||
|
||||
NOTIFICATION_IPC = 'ceilometer_ipc'
|
||||
|
||||
def __init__(self, conf, cfg_file, transformer_manager):
|
||||
def __init__(self, conf, cfg_file):
|
||||
"""Setup the pipelines according to config.
|
||||
|
||||
The configuration is supported as follows:
|
||||
@ -244,13 +213,6 @@ class PipelineManager(agent.ConfigManagerBase):
|
||||
},
|
||||
],
|
||||
"sinks": [{"name": sink_1,
|
||||
"transformers": [
|
||||
{"name": "Transformer_1",
|
||||
"parameters": {"p1": "value"}},
|
||||
|
||||
{"name": "Transformer_2",
|
||||
"parameters": {"p1": "value"}},
|
||||
],
|
||||
"publishers": ["publisher_1", "publisher_2"]
|
||||
},
|
||||
{"name": sink_2,
|
||||
@ -268,8 +230,6 @@ class PipelineManager(agent.ConfigManagerBase):
|
||||
"excluded meter names", wildcard and "excluded meter names", or
|
||||
only wildcard.
|
||||
|
||||
Transformer's name is plugin name in setup.cfg.
|
||||
|
||||
Publisher's name is plugin name in setup.cfg
|
||||
|
||||
"""
|
||||
@ -303,7 +263,6 @@ class PipelineManager(agent.ConfigManagerBase):
|
||||
else:
|
||||
unique_names.add(name)
|
||||
sinks[s['name']] = self.pm_sink(self.conf, s,
|
||||
transformer_manager,
|
||||
publisher_manager)
|
||||
unique_names.clear()
|
||||
|
||||
|
@ -5,92 +5,7 @@ sources:
|
||||
- "*"
|
||||
sinks:
|
||||
- meter_sink
|
||||
- name: cpu_source
|
||||
meters:
|
||||
- "cpu"
|
||||
sinks:
|
||||
- cpu_sink
|
||||
- cpu_delta_sink
|
||||
- name: disk_source
|
||||
meters:
|
||||
- "disk.read.bytes"
|
||||
- "disk.read.requests"
|
||||
- "disk.write.bytes"
|
||||
- "disk.write.requests"
|
||||
- "disk.device.read.bytes"
|
||||
- "disk.device.read.requests"
|
||||
- "disk.device.write.bytes"
|
||||
- "disk.device.write.requests"
|
||||
sinks:
|
||||
- disk_sink
|
||||
- name: network_source
|
||||
meters:
|
||||
- "network.incoming.bytes"
|
||||
- "network.incoming.packets"
|
||||
- "network.outgoing.bytes"
|
||||
- "network.outgoing.packets"
|
||||
sinks:
|
||||
- network_sink
|
||||
sinks:
|
||||
- name: meter_sink
|
||||
publishers:
|
||||
- gnocchi://
|
||||
|
||||
# All these transformers are deprecated, and will be removed in the future, don't use them.
|
||||
- name: cpu_sink
|
||||
transformers:
|
||||
- name: "rate_of_change"
|
||||
parameters:
|
||||
target:
|
||||
name: "cpu_util"
|
||||
unit: "%"
|
||||
type: "gauge"
|
||||
max: 100
|
||||
scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
|
||||
publishers:
|
||||
- gnocchi://
|
||||
|
||||
# All these transformers are deprecated, and will be removed in the future, don't use them.
|
||||
- name: cpu_delta_sink
|
||||
transformers:
|
||||
- name: "delta"
|
||||
parameters:
|
||||
target:
|
||||
name: "cpu.delta"
|
||||
growth_only: True
|
||||
publishers:
|
||||
- gnocchi://
|
||||
|
||||
# All these transformers are deprecated, and will be removed in the future, don't use them.
|
||||
- name: disk_sink
|
||||
transformers:
|
||||
- name: "rate_of_change"
|
||||
parameters:
|
||||
source:
|
||||
map_from:
|
||||
name: "(disk\\.device|disk)\\.(read|write)\\.(bytes|requests)"
|
||||
unit: "(B|request)"
|
||||
target:
|
||||
map_to:
|
||||
name: "\\1.\\2.\\3.rate"
|
||||
unit: "\\1/s"
|
||||
type: "gauge"
|
||||
publishers:
|
||||
- gnocchi://
|
||||
|
||||
# All these transformers are deprecated, and will be removed in the future, don't use them.
|
||||
- name: network_sink
|
||||
transformers:
|
||||
- name: "rate_of_change"
|
||||
parameters:
|
||||
source:
|
||||
map_from:
|
||||
name: "network\\.(incoming|outgoing)\\.(bytes|packets)"
|
||||
unit: "(B|packet)"
|
||||
target:
|
||||
map_to:
|
||||
name: "network.\\1.\\2.rate"
|
||||
unit: "\\1/s"
|
||||
type: "gauge"
|
||||
publishers:
|
||||
- gnocchi://
|
||||
|
@ -126,7 +126,7 @@ class EventPipelineManager(base.PipelineManager):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(EventPipelineManager, self).__init__(
|
||||
conf, conf.event_pipeline_cfg_file, {})
|
||||
conf, conf.event_pipeline_cfg_file)
|
||||
|
||||
def get_main_endpoints(self):
|
||||
return [EventEndpoint(self.conf, self.publisher())]
|
||||
|
@ -73,74 +73,25 @@ class SampleSource(base.PipelineSource):
|
||||
|
||||
class SampleSink(base.Sink):
|
||||
|
||||
def _transform_sample(self, start, sample):
|
||||
try:
|
||||
for transformer in self.transformers[start:]:
|
||||
sample = transformer.handle_sample(sample)
|
||||
if not sample:
|
||||
LOG.debug(
|
||||
"Pipeline %(pipeline)s: Sample dropped by "
|
||||
"transformer %(trans)s", {'pipeline': self,
|
||||
'trans': transformer})
|
||||
return
|
||||
return sample
|
||||
except Exception:
|
||||
LOG.error("Pipeline %(pipeline)s: Exit after error "
|
||||
"from transformer %(trans)s "
|
||||
"for %(smp)s" % {'pipeline': self,
|
||||
'trans': transformer,
|
||||
'smp': sample},
|
||||
exc_info=True)
|
||||
|
||||
def _publish_samples(self, start, samples):
|
||||
def publish_samples(self, samples):
|
||||
"""Push samples into pipeline for publishing.
|
||||
|
||||
:param start: The first transformer that the sample will be injected.
|
||||
This is mainly for flush() invocation that transformer
|
||||
may emit samples.
|
||||
:param samples: Sample list.
|
||||
|
||||
"""
|
||||
|
||||
transformed_samples = []
|
||||
if not self.transformers:
|
||||
transformed_samples = samples
|
||||
else:
|
||||
for sample in samples:
|
||||
LOG.debug(
|
||||
"Pipeline %(pipeline)s: Transform sample "
|
||||
"%(smp)s from %(trans)s transformer", {'pipeline': self,
|
||||
'smp': sample,
|
||||
'trans': start})
|
||||
sample = self._transform_sample(start, sample)
|
||||
if sample:
|
||||
transformed_samples.append(sample)
|
||||
|
||||
if transformed_samples:
|
||||
if samples:
|
||||
for p in self.publishers:
|
||||
try:
|
||||
p.publish_samples(transformed_samples)
|
||||
p.publish_samples(samples)
|
||||
except Exception:
|
||||
LOG.error("Pipeline %(pipeline)s: Continue after "
|
||||
"error from publisher %(pub)s"
|
||||
% {'pipeline': self, 'pub': p},
|
||||
exc_info=True)
|
||||
|
||||
def publish_samples(self, samples):
|
||||
self._publish_samples(0, samples)
|
||||
|
||||
def flush(self):
|
||||
"""Flush data after all samples have been injected to pipeline."""
|
||||
|
||||
for (i, transformer) in enumerate(self.transformers):
|
||||
try:
|
||||
self._publish_samples(i + 1,
|
||||
list(transformer.flush()))
|
||||
except Exception:
|
||||
LOG.error("Pipeline %(pipeline)s: Error "
|
||||
"flushing transformer %(trans)s"
|
||||
% {'pipeline': self, 'trans': transformer},
|
||||
exc_info=True)
|
||||
@staticmethod
|
||||
def flush():
|
||||
pass
|
||||
|
||||
|
||||
class SamplePipeline(base.Pipeline):
|
||||
@ -195,11 +146,7 @@ class SamplePipelineManager(base.PipelineManager):
|
||||
|
||||
def __init__(self, conf):
|
||||
super(SamplePipelineManager, self).__init__(
|
||||
conf, conf.pipeline_cfg_file, self.get_transform_manager())
|
||||
|
||||
@staticmethod
|
||||
def get_transform_manager():
|
||||
return extension.ExtensionManager('ceilometer.transformer')
|
||||
conf, conf.pipeline_cfg_file)
|
||||
|
||||
def get_main_endpoints(self):
|
||||
exts = extension.ExtensionManager(
|
||||
|
@ -86,8 +86,6 @@ resources:
|
||||
vcpus:
|
||||
cpu:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
cpu.delta:
|
||||
cpu_util:
|
||||
cpu_l3_cache:
|
||||
disk.root.size:
|
||||
disk.ephemeral.size:
|
||||
@ -132,8 +130,6 @@ resources:
|
||||
|
||||
- resource_type: instance_network_interface
|
||||
metrics:
|
||||
network.outgoing.packets.rate:
|
||||
network.incoming.packets.rate:
|
||||
network.outgoing.packets:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
network.incoming.packets:
|
||||
@ -146,8 +142,6 @@ resources:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
network.incoming.packets.error:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
network.outgoing.bytes.rate:
|
||||
network.incoming.bytes.rate:
|
||||
network.outgoing.bytes:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
network.incoming.bytes:
|
||||
@ -160,16 +154,12 @@ resources:
|
||||
metrics:
|
||||
disk.device.read.requests:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
disk.device.read.requests.rate:
|
||||
disk.device.write.requests:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
disk.device.write.requests.rate:
|
||||
disk.device.read.bytes:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
disk.device.read.bytes.rate:
|
||||
disk.device.write.bytes:
|
||||
archive_policy_name: ceilometer-low-rate
|
||||
disk.device.write.bytes.rate:
|
||||
disk.device.latency:
|
||||
disk.device.read.latency:
|
||||
disk.device.write.latency:
|
||||
|
@ -63,7 +63,6 @@ class HttpPublisher(publisher.ConfigPublisherBase):
|
||||
the sinks like the following:
|
||||
|
||||
- name: event_sink
|
||||
transformers:
|
||||
publishers:
|
||||
- http://host:80/path?timeout=1&max_retries=2
|
||||
|
||||
|
@ -102,7 +102,7 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
|
||||
def publish_samples(self, samples):
|
||||
"""Publish samples on RPC.
|
||||
|
||||
:param samples: Samples from pipeline after transformation.
|
||||
:param samples: Samples from pipeline.
|
||||
|
||||
"""
|
||||
|
||||
@ -174,7 +174,7 @@ class MessagingPublisher(publisher.ConfigPublisherBase):
|
||||
def publish_events(self, events):
|
||||
"""Send an event message for publishing
|
||||
|
||||
:param events: events from pipeline after transformation
|
||||
:param events: events from pipeline.
|
||||
"""
|
||||
ev_list = [utils.message_from_event(
|
||||
event, self.conf.publisher.telemetry_secret) for event in events]
|
||||
@ -218,7 +218,6 @@ class NotifierPublisher(MessagingPublisher):
|
||||
- notifier_sink
|
||||
sinks:
|
||||
- name: notifier_sink
|
||||
transformers:
|
||||
publishers:
|
||||
- notifier://[notifier_ip]:[notifier_port]?topic=[topic]&
|
||||
driver=driver&max_retry=100
|
||||
|
@ -36,7 +36,6 @@ class ZaqarPublisher(publisher.ConfigPublisherBase):
|
||||
- zaqar_sink
|
||||
sinks:
|
||||
- name: zaqar_sink
|
||||
transformers:
|
||||
publishers:
|
||||
- zaqar://?queue=meter_queue&ttl=1200
|
||||
|
||||
@ -63,7 +62,7 @@ class ZaqarPublisher(publisher.ConfigPublisherBase):
|
||||
def publish_samples(self, samples):
|
||||
"""Send a metering message for publishing
|
||||
|
||||
:param samples: Samples from pipeline after transformation
|
||||
:param samples: Samples from pipeline.
|
||||
"""
|
||||
queue = self.client.queue(self.queue_name)
|
||||
messages = [{'body': sample.as_dict(), 'ttl': self.ttl}
|
||||
@ -73,7 +72,7 @@ class ZaqarPublisher(publisher.ConfigPublisherBase):
|
||||
def publish_events(self, events):
|
||||
"""Send an event message for publishing
|
||||
|
||||
:param events: events from pipeline after transformation
|
||||
:param events: events from pipeline.
|
||||
"""
|
||||
queue = self.client.queue(self.queue_name)
|
||||
messages = [{'body': event.serialize(), 'ttl': self.ttl}
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -677,7 +677,6 @@ class TestPollingAgent(BaseAgent):
|
||||
'sinks': ['test_sink']}],
|
||||
'sinks': [{
|
||||
'name': 'test_sink',
|
||||
'transformers': [],
|
||||
'publishers': ["test"]}]
|
||||
}
|
||||
self.setup_polling(poll_cfg)
|
||||
@ -720,7 +719,6 @@ class TestPollingAgent(BaseAgent):
|
||||
'sinks': ['test_sink']}],
|
||||
'sinks': [{
|
||||
'name': 'test_sink',
|
||||
'transformers': [],
|
||||
'publishers': ["test"]}]
|
||||
}
|
||||
self.setup_polling(poll_cfg)
|
||||
@ -742,7 +740,6 @@ class TestPollingAgent(BaseAgent):
|
||||
'sinks': ['test_sink']}],
|
||||
'sinks': [{
|
||||
'name': 'test_sink',
|
||||
'transformers': [],
|
||||
'publishers': ["test"]}]
|
||||
}
|
||||
self.setup_polling(poll_cfg)
|
||||
@ -771,7 +768,6 @@ class TestPollingAgent(BaseAgent):
|
||||
'sinks': ['test_sink']}],
|
||||
'sinks': [{
|
||||
'name': 'test_sink',
|
||||
'transformers': [],
|
||||
'publishers': ["test"]}]
|
||||
}
|
||||
self.setup_polling(poll_cfg)
|
||||
@ -812,7 +808,6 @@ class TestPollingAgent(BaseAgent):
|
||||
'sinks': ['test_sink']}],
|
||||
'sinks': [{
|
||||
'name': 'test_sink',
|
||||
'transformers': [],
|
||||
'publishers': ["test"]}]
|
||||
}
|
||||
self.setup_polling(poll_cfg)
|
||||
|
@ -12,9 +12,6 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import yaml
|
||||
|
||||
from ceilometer.pipeline import base
|
||||
from ceilometer.pipeline import sample as pipeline
|
||||
from ceilometer import sample
|
||||
@ -27,7 +24,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
||||
'meters': ['a'],
|
||||
'sinks': ['test_sink']}
|
||||
sink = {'name': 'test_sink',
|
||||
'transformers': [{'name': 'update', 'parameters': {}}],
|
||||
'publishers': ['test://']}
|
||||
self.pipeline_cfg = {'sources': [source], 'sinks': [sink]}
|
||||
|
||||
@ -39,13 +35,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
||||
})
|
||||
self.pipeline_cfg['sinks'].append({
|
||||
'name': 'second_sink',
|
||||
'transformers': [{
|
||||
'name': 'update',
|
||||
'parameters':
|
||||
{
|
||||
'append_name': '_new',
|
||||
}
|
||||
}],
|
||||
'publishers': ['new'],
|
||||
})
|
||||
|
||||
@ -57,13 +46,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
||||
})
|
||||
self.pipeline_cfg['sinks'].append({
|
||||
'name': 'second_sink',
|
||||
'transformers': [{
|
||||
'name': 'update',
|
||||
'parameters':
|
||||
{
|
||||
'append_name': '_new',
|
||||
}
|
||||
}],
|
||||
'publishers': ['except'],
|
||||
})
|
||||
|
||||
@ -113,13 +95,6 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
||||
self._set_pipeline_cfg('meters', meter_cfg)
|
||||
self.pipeline_cfg['sinks'].append({
|
||||
'name': 'second_sink',
|
||||
'transformers': [{
|
||||
'name': 'update',
|
||||
'parameters':
|
||||
{
|
||||
'append_name': '_new',
|
||||
}
|
||||
}],
|
||||
'publishers': ['new'],
|
||||
})
|
||||
self.pipeline_cfg['sources'][0]['sinks'].append('second_sink')
|
||||
@ -150,12 +125,11 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
||||
str(pipeline_manager.pipelines[1]))
|
||||
test_publisher = pipeline_manager.pipelines[0].publishers[0]
|
||||
new_publisher = pipeline_manager.pipelines[1].publishers[0]
|
||||
for publisher, sfx in [(test_publisher, '_update'),
|
||||
(new_publisher, '_new')]:
|
||||
for publisher in (test_publisher, new_publisher):
|
||||
self.assertEqual(2, len(publisher.samples))
|
||||
self.assertEqual(2, publisher.calls)
|
||||
self.assertEqual('a' + sfx, getattr(publisher.samples[0], "name"))
|
||||
self.assertEqual('b' + sfx, getattr(publisher.samples[1], "name"))
|
||||
self.assertEqual('a', getattr(publisher.samples[0], "name"))
|
||||
self.assertEqual('b', getattr(publisher.samples[1], "name"))
|
||||
|
||||
def test_multiple_sources_with_single_sink(self):
|
||||
self.pipeline_cfg['sources'].append({
|
||||
@ -193,68 +167,8 @@ class TestDecoupledPipeline(pipeline_base.BasePipelineTestCase):
|
||||
for publisher in [test_publisher, another_publisher]:
|
||||
self.assertEqual(2, len(publisher.samples))
|
||||
self.assertEqual(2, publisher.calls)
|
||||
self.assertEqual('a_update', getattr(publisher.samples[0], "name"))
|
||||
self.assertEqual('b_update', getattr(publisher.samples[1], "name"))
|
||||
|
||||
transformed_samples = self.TransformerClass.samples
|
||||
self.assertEqual(2, len(transformed_samples))
|
||||
self.assertEqual(['a', 'b'],
|
||||
[getattr(s, 'name') for s in transformed_samples])
|
||||
|
||||
def _do_test_rate_of_change_in_boilerplate_pipeline_cfg(self, index,
|
||||
meters, units):
|
||||
with open('ceilometer/pipeline/data/pipeline.yaml') as fap:
|
||||
data = fap.read()
|
||||
pipeline_cfg = yaml.safe_load(data)
|
||||
for s in pipeline_cfg['sinks']:
|
||||
s['publishers'] = ['test://']
|
||||
name = self.cfg2file(pipeline_cfg)
|
||||
self.CONF.set_override('pipeline_cfg_file', name)
|
||||
pipeline_manager = pipeline.SamplePipelineManager(self.CONF)
|
||||
pipe = pipeline_manager.pipelines[index]
|
||||
self._do_test_rate_of_change_mapping(pipe, meters, units)
|
||||
|
||||
def test_rate_of_change_boilerplate_disk_read_cfg(self):
|
||||
meters = ('disk.read.bytes', 'disk.read.requests')
|
||||
units = ('B', 'request')
|
||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
|
||||
meters,
|
||||
units)
|
||||
|
||||
def test_rate_of_change_boilerplate_disk_write_cfg(self):
|
||||
meters = ('disk.write.bytes', 'disk.write.requests')
|
||||
units = ('B', 'request')
|
||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
|
||||
meters,
|
||||
units)
|
||||
|
||||
def test_rate_of_change_boilerplate_network_incoming_cfg(self):
|
||||
meters = ('network.incoming.bytes', 'network.incoming.packets')
|
||||
units = ('B', 'packet')
|
||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4,
|
||||
meters,
|
||||
units)
|
||||
|
||||
def test_rate_of_change_boilerplate_per_disk_device_read_cfg(self):
|
||||
meters = ('disk.device.read.bytes', 'disk.device.read.requests')
|
||||
units = ('B', 'request')
|
||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
|
||||
meters,
|
||||
units)
|
||||
|
||||
def test_rate_of_change_boilerplate_per_disk_device_write_cfg(self):
|
||||
meters = ('disk.device.write.bytes', 'disk.device.write.requests')
|
||||
units = ('B', 'request')
|
||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3,
|
||||
meters,
|
||||
units)
|
||||
|
||||
def test_rate_of_change_boilerplate_network_outgoing_cfg(self):
|
||||
meters = ('network.outgoing.bytes', 'network.outgoing.packets')
|
||||
units = ('B', 'packet')
|
||||
self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(4,
|
||||
meters,
|
||||
units)
|
||||
self.assertEqual('a', getattr(publisher.samples[0], "name"))
|
||||
self.assertEqual('b', getattr(publisher.samples[1], "name"))
|
||||
|
||||
def test_duplicated_sinks_names(self):
|
||||
self.pipeline_cfg['sinks'].append({
|
||||
|
@ -141,7 +141,6 @@ class BaseRealNotification(BaseNotificationTest):
|
||||
}],
|
||||
'sinks': [{
|
||||
'name': 'test_sink',
|
||||
'transformers': [],
|
||||
'publishers': ['test://']
|
||||
}]
|
||||
})
|
||||
|
@ -1,115 +0,0 @@
|
||||
#
|
||||
# Copyright 2016 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import copy
|
||||
import datetime
|
||||
|
||||
from oslo_utils import timeutils
|
||||
from oslotest import base
|
||||
|
||||
from ceilometer import sample
|
||||
from ceilometer.transformer import conversions
|
||||
|
||||
|
||||
class AggregatorTransformerTestCase(base.BaseTestCase):
|
||||
SAMPLE = sample.Sample(
|
||||
name='cpu',
|
||||
type=sample.TYPE_CUMULATIVE,
|
||||
unit='ns',
|
||||
volume='1234567',
|
||||
user_id='56c5692032f34041900342503fecab30',
|
||||
project_id='ac9494df2d9d4e709bac378cceabaf23',
|
||||
resource_id='1ca738a1-c49c-4401-8346-5c60ebdb03f4',
|
||||
timestamp="2015-10-29 14:12:15.485877+00:00",
|
||||
resource_metadata={}
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(AggregatorTransformerTestCase, self).setUp()
|
||||
self._sample_offset = 0
|
||||
|
||||
def test_init_input_validation(self):
|
||||
aggregator = conversions.AggregatorTransformer("2", "15", None,
|
||||
None, None)
|
||||
self.assertEqual(2, aggregator.size)
|
||||
self.assertEqual(15, aggregator.retention_time)
|
||||
|
||||
def test_init_no_size_or_rention_time(self):
|
||||
aggregator = conversions.AggregatorTransformer()
|
||||
self.assertEqual(1, aggregator.size)
|
||||
self.assertIsNone(aggregator.retention_time)
|
||||
|
||||
def test_init_size_zero(self):
|
||||
aggregator = conversions.AggregatorTransformer(size="0")
|
||||
self.assertEqual(1, aggregator.size)
|
||||
self.assertIsNone(aggregator.retention_time)
|
||||
|
||||
def test_init_input_validation_size_invalid(self):
|
||||
self.assertRaises(ValueError, conversions.AggregatorTransformer,
|
||||
"abc", "15", None, None, None)
|
||||
|
||||
def test_init_input_validation_retention_time_invalid(self):
|
||||
self.assertRaises(ValueError, conversions.AggregatorTransformer,
|
||||
"2", "abc", None, None, None)
|
||||
|
||||
def test_init_no_timestamp(self):
|
||||
aggregator = conversions.AggregatorTransformer("1", "1", None,
|
||||
None, None)
|
||||
self.assertEqual("first", aggregator.timestamp)
|
||||
|
||||
def test_init_timestamp_none(self):
|
||||
aggregator = conversions.AggregatorTransformer("1", "1", None,
|
||||
None, None, None)
|
||||
self.assertEqual("first", aggregator.timestamp)
|
||||
|
||||
def test_init_timestamp_first(self):
|
||||
aggregator = conversions.AggregatorTransformer("1", "1", None,
|
||||
None, None, "first")
|
||||
self.assertEqual("first", aggregator.timestamp)
|
||||
|
||||
def test_init_timestamp_last(self):
|
||||
aggregator = conversions.AggregatorTransformer("1", "1", None,
|
||||
None, None, "last")
|
||||
self.assertEqual("last", aggregator.timestamp)
|
||||
|
||||
def test_init_timestamp_invalid(self):
|
||||
aggregator = conversions.AggregatorTransformer("1", "1", None,
|
||||
None, None,
|
||||
"invalid_option")
|
||||
self.assertEqual("first", aggregator.timestamp)
|
||||
|
||||
def test_size_unbounded(self):
|
||||
aggregator = conversions.AggregatorTransformer(size="0",
|
||||
retention_time="300")
|
||||
self._insert_sample_data(aggregator)
|
||||
|
||||
samples = aggregator.flush()
|
||||
|
||||
self.assertEqual([], samples)
|
||||
|
||||
def test_size_bounded(self):
|
||||
aggregator = conversions.AggregatorTransformer(size="100")
|
||||
self._insert_sample_data(aggregator)
|
||||
|
||||
samples = aggregator.flush()
|
||||
|
||||
self.assertEqual(100, len(samples))
|
||||
|
||||
def _insert_sample_data(self, aggregator):
|
||||
for _ in range(100):
|
||||
sample = copy.copy(self.SAMPLE)
|
||||
sample.resource_id = sample.resource_id + str(self._sample_offset)
|
||||
sample.timestamp = datetime.datetime.isoformat(timeutils.utcnow())
|
||||
aggregator.handle_sample(sample)
|
||||
self._sample_offset += 1
|
@ -1,73 +0,0 @@
|
||||
#
|
||||
# Copyright 2013 Intel Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import collections
|
||||
|
||||
import six
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class TransformerBase(object):
|
||||
"""Base class for plugins that transform the sample."""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""Setup transformer.
|
||||
|
||||
Each time a transformed is involved in a pipeline, a new transformer
|
||||
instance is created and chained into the pipeline. i.e. transformer
|
||||
instance is per pipeline. This helps if transformer need keep some
|
||||
cache and per-pipeline information.
|
||||
|
||||
:param kwargs: The parameters that are defined in pipeline config file.
|
||||
"""
|
||||
super(TransformerBase, self).__init__()
|
||||
|
||||
@abc.abstractmethod
|
||||
def handle_sample(self, sample):
|
||||
"""Transform a sample.
|
||||
|
||||
:param sample: A sample.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def flush():
|
||||
"""Flush samples cached previously."""
|
||||
return []
|
||||
|
||||
|
||||
class Namespace(object):
|
||||
"""Encapsulates the namespace.
|
||||
|
||||
Encapsulation is done by wrapping the evaluation of the configured rule.
|
||||
This allows nested dicts to be accessed in the attribute style,
|
||||
and missing attributes to yield false when used in a boolean expression.
|
||||
"""
|
||||
def __init__(self, seed):
|
||||
self.__dict__ = collections.defaultdict(lambda: Namespace({}))
|
||||
self.__dict__.update(seed)
|
||||
for k, v in six.iteritems(self.__dict__):
|
||||
if isinstance(v, dict):
|
||||
self.__dict__[k] = Namespace(v)
|
||||
|
||||
def __getattr__(self, attr):
|
||||
return self.__dict__[attr]
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.__dict__[key]
|
||||
|
||||
def __nonzero__(self):
|
||||
return len(self.__dict__) > 0
|
||||
__bool__ = __nonzero__
|
@ -1,42 +0,0 @@
|
||||
#
|
||||
# Copyright 2013 Julien Danjou
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from ceilometer import transformer
|
||||
|
||||
|
||||
class TransformerAccumulator(transformer.TransformerBase):
|
||||
"""Transformer that accumulates samples until a threshold.
|
||||
|
||||
And then flushes them out into the wild.
|
||||
"""
|
||||
|
||||
def __init__(self, size=1, **kwargs):
|
||||
if size >= 1:
|
||||
self.samples = []
|
||||
self.size = size
|
||||
super(TransformerAccumulator, self).__init__(**kwargs)
|
||||
|
||||
def handle_sample(self, sample):
|
||||
if self.size >= 1:
|
||||
self.samples.append(sample)
|
||||
else:
|
||||
return sample
|
||||
|
||||
def flush(self):
|
||||
if len(self.samples) >= self.size:
|
||||
x = self.samples
|
||||
self.samples = []
|
||||
return x
|
||||
return []
|
@ -1,157 +0,0 @@
|
||||
#
|
||||
# Copyright 2014 Red Hat, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import copy
|
||||
import keyword
|
||||
import math
|
||||
import re
|
||||
|
||||
from oslo_log import log
|
||||
import six
|
||||
|
||||
from ceilometer.i18n import _
|
||||
from ceilometer import sample
|
||||
from ceilometer import transformer
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class ArithmeticTransformer(transformer.TransformerBase):
|
||||
"""Multi meter arithmetic transformer.
|
||||
|
||||
Transformer that performs arithmetic operations
|
||||
over one or more meters and/or their metadata.
|
||||
"""
|
||||
|
||||
meter_name_re = re.compile(r'\$\(([\w\.\-]+)\)')
|
||||
|
||||
def __init__(self, target=None, **kwargs):
|
||||
super(ArithmeticTransformer, self).__init__(**kwargs)
|
||||
target = target or {}
|
||||
self.target = target
|
||||
self.expr = target.get('expr', '')
|
||||
self.expr_escaped, self.escaped_names = self.parse_expr(self.expr)
|
||||
self.required_meters = list(self.escaped_names.values())
|
||||
self.misconfigured = len(self.required_meters) == 0
|
||||
if not self.misconfigured:
|
||||
self.reference_meter = self.required_meters[0]
|
||||
# convert to set for more efficient contains operation
|
||||
self.required_meters = set(self.required_meters)
|
||||
self.cache = collections.defaultdict(dict)
|
||||
self.latest_timestamp = None
|
||||
else:
|
||||
LOG.warning(_('Arithmetic transformer must use at least one'
|
||||
' meter in expression \'%s\''), self.expr)
|
||||
|
||||
def _update_cache(self, _sample):
|
||||
"""Update the cache with the latest sample."""
|
||||
escaped_name = self.escaped_names.get(_sample.name, '')
|
||||
if escaped_name not in self.required_meters:
|
||||
return
|
||||
self.cache[_sample.resource_id][escaped_name] = _sample
|
||||
|
||||
def _check_requirements(self, resource_id):
|
||||
"""Check if all the required meters are available in the cache."""
|
||||
return len(self.cache[resource_id]) == len(self.required_meters)
|
||||
|
||||
def _calculate(self, resource_id):
|
||||
"""Evaluate the expression and return a new sample if successful."""
|
||||
ns_dict = dict((m, s.as_dict()) for m, s
|
||||
in six.iteritems(self.cache[resource_id]))
|
||||
ns = transformer.Namespace(ns_dict)
|
||||
try:
|
||||
new_volume = eval(self.expr_escaped, {}, ns)
|
||||
if math.isnan(new_volume):
|
||||
raise ArithmeticError(_('Expression evaluated to '
|
||||
'a NaN value!'))
|
||||
|
||||
reference_sample = self.cache[resource_id][self.reference_meter]
|
||||
return sample.Sample(
|
||||
name=self.target.get('name', reference_sample.name),
|
||||
unit=self.target.get('unit', reference_sample.unit),
|
||||
type=self.target.get('type', reference_sample.type),
|
||||
volume=float(new_volume),
|
||||
user_id=reference_sample.user_id,
|
||||
project_id=reference_sample.project_id,
|
||||
resource_id=reference_sample.resource_id,
|
||||
timestamp=self.latest_timestamp,
|
||||
resource_metadata=reference_sample.resource_metadata
|
||||
)
|
||||
except Exception as e:
|
||||
LOG.warning(_('Unable to evaluate expression %(expr)s: %(exc)s'),
|
||||
{'expr': self.expr, 'exc': e})
|
||||
|
||||
def handle_sample(self, _sample):
|
||||
self._update_cache(_sample)
|
||||
self.latest_timestamp = _sample.timestamp
|
||||
|
||||
def flush(self):
|
||||
new_samples = []
|
||||
if not self.misconfigured:
|
||||
# When loop self.cache, the dict could not be change by others.
|
||||
# If changed, will raise "RuntimeError: dictionary changed size
|
||||
# during iteration". so we make a tmp copy and just loop it.
|
||||
tmp_cache = copy.copy(self.cache)
|
||||
for resource_id in tmp_cache:
|
||||
if self._check_requirements(resource_id):
|
||||
new_samples.append(self._calculate(resource_id))
|
||||
if resource_id in self.cache:
|
||||
self.cache.pop(resource_id)
|
||||
return new_samples
|
||||
|
||||
@classmethod
|
||||
def parse_expr(cls, expr):
|
||||
"""Transforms meter names in the expression into valid identifiers.
|
||||
|
||||
:param expr: unescaped expression
|
||||
:return: A tuple of the escaped expression and a dict representing
|
||||
the translation of meter names into Python identifiers
|
||||
"""
|
||||
|
||||
class Replacer(object):
|
||||
"""Replaces matched meter names with escaped names.
|
||||
|
||||
If the meter name is not followed by parameter access in the
|
||||
expression, it defaults to accessing the 'volume' parameter.
|
||||
"""
|
||||
|
||||
def __init__(self, original_expr):
|
||||
self.original_expr = original_expr
|
||||
self.escaped_map = {}
|
||||
|
||||
def __call__(self, match):
|
||||
meter_name = match.group(1)
|
||||
escaped_name = self.escape(meter_name)
|
||||
self.escaped_map[meter_name] = escaped_name
|
||||
|
||||
if (match.end(0) == len(self.original_expr) or
|
||||
self.original_expr[match.end(0)] != '.'):
|
||||
escaped_name += '.volume'
|
||||
return escaped_name
|
||||
|
||||
@staticmethod
|
||||
def escape(name):
|
||||
has_dot = '.' in name
|
||||
if has_dot:
|
||||
name = name.replace('.', '_')
|
||||
|
||||
if has_dot or name.endswith('ESC') or name in keyword.kwlist:
|
||||
name = "_" + name + '_ESC'
|
||||
return name
|
||||
|
||||
replacer = Replacer(expr)
|
||||
expr = re.sub(cls.meter_name_re, replacer, expr)
|
||||
return expr, replacer.escaped_map
|
@ -1,344 +0,0 @@
|
||||
#
|
||||
# Copyright 2013 Red Hat, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import re
|
||||
|
||||
from oslo_log import log
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
|
||||
from ceilometer.i18n import _
|
||||
from ceilometer import sample
|
||||
from ceilometer import transformer
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseConversionTransformer(transformer.TransformerBase):
|
||||
"""Transformer to derive conversion."""
|
||||
|
||||
def __init__(self, source=None, target=None, **kwargs):
|
||||
"""Initialize transformer with configured parameters.
|
||||
|
||||
:param source: dict containing source sample unit
|
||||
:param target: dict containing target sample name, type,
|
||||
unit and scaling factor (a missing value
|
||||
connotes no change)
|
||||
"""
|
||||
self.source = source or {}
|
||||
self.target = target or {}
|
||||
super(BaseConversionTransformer, self).__init__(**kwargs)
|
||||
|
||||
def _map(self, s, attr):
|
||||
"""Apply the name or unit mapping if configured."""
|
||||
mapped = None
|
||||
from_ = self.source.get('map_from')
|
||||
to_ = self.target.get('map_to')
|
||||
if from_ and to_:
|
||||
if from_.get(attr) and to_.get(attr):
|
||||
try:
|
||||
mapped = re.sub(from_[attr], to_[attr], getattr(s, attr))
|
||||
except Exception:
|
||||
pass
|
||||
return mapped or self.target.get(attr, getattr(s, attr))
|
||||
|
||||
|
||||
class DeltaTransformer(BaseConversionTransformer):
|
||||
"""Transformer based on the delta of a sample volume."""
|
||||
|
||||
def __init__(self, target=None, growth_only=False, **kwargs):
|
||||
"""Initialize transformer with configured parameters.
|
||||
|
||||
:param growth_only: capture only positive deltas
|
||||
"""
|
||||
super(DeltaTransformer, self).__init__(target=target, **kwargs)
|
||||
self.growth_only = growth_only
|
||||
self.cache = {}
|
||||
|
||||
def handle_sample(self, s):
|
||||
"""Handle a sample, converting if necessary."""
|
||||
key = s.name + s.resource_id
|
||||
prev = self.cache.get(key)
|
||||
timestamp = timeutils.parse_isotime(s.timestamp)
|
||||
self.cache[key] = (s.volume, timestamp)
|
||||
|
||||
if prev:
|
||||
prev_volume = prev[0]
|
||||
prev_timestamp = prev[1]
|
||||
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
|
||||
# disallow violations of the arrow of time
|
||||
if time_delta < 0:
|
||||
LOG.warning('Dropping out of time order sample: %s', (s,))
|
||||
# Reset the cache to the newer sample.
|
||||
self.cache[key] = prev
|
||||
return None
|
||||
volume_delta = s.volume - prev_volume
|
||||
if self.growth_only and volume_delta < 0:
|
||||
LOG.warning('Negative delta detected, dropping value')
|
||||
s = None
|
||||
else:
|
||||
s = self._convert(s, volume_delta)
|
||||
LOG.debug('Converted to: %s', s)
|
||||
else:
|
||||
LOG.warning('Dropping sample with no predecessor: %s', (s,))
|
||||
s = None
|
||||
return s
|
||||
|
||||
def _convert(self, s, delta):
|
||||
"""Transform the appropriate sample fields."""
|
||||
return sample.Sample(
|
||||
name=self._map(s, 'name'),
|
||||
unit=s.unit,
|
||||
type=sample.TYPE_DELTA,
|
||||
volume=delta,
|
||||
user_id=s.user_id,
|
||||
project_id=s.project_id,
|
||||
resource_id=s.resource_id,
|
||||
timestamp=s.timestamp,
|
||||
resource_metadata=s.resource_metadata
|
||||
)
|
||||
|
||||
|
||||
class ScalingTransformer(BaseConversionTransformer):
|
||||
"""Transformer to apply a scaling conversion."""
|
||||
|
||||
def __init__(self, source=None, target=None, **kwargs):
|
||||
"""Initialize transformer with configured parameters.
|
||||
|
||||
:param source: dict containing source sample unit
|
||||
:param target: dict containing target sample name, type,
|
||||
unit and scaling factor (a missing value
|
||||
connotes no change)
|
||||
"""
|
||||
super(ScalingTransformer, self).__init__(source=source, target=target,
|
||||
**kwargs)
|
||||
self.scale = self.target.get('scale')
|
||||
self.max = self.target.get('max')
|
||||
LOG.debug('scaling conversion transformer with source:'
|
||||
' %(source)s target: %(target)s:', {'source': self.source,
|
||||
'target': self.target})
|
||||
|
||||
def _scale(self, s):
|
||||
"""Apply the scaling factor.
|
||||
|
||||
Either a straight multiplicative factor or else a string to be eval'd.
|
||||
"""
|
||||
ns = transformer.Namespace(s.as_dict())
|
||||
|
||||
scale = self.scale
|
||||
return ((eval(scale, {}, ns) if isinstance(scale, six.string_types)
|
||||
else s.volume * scale) if scale else s.volume)
|
||||
|
||||
def _convert(self, s, growth=1):
|
||||
"""Transform the appropriate sample fields."""
|
||||
volume = self._scale(s) * growth
|
||||
return sample.Sample(
|
||||
name=self._map(s, 'name'),
|
||||
unit=self._map(s, 'unit'),
|
||||
type=self.target.get('type', s.type),
|
||||
volume=min(volume, self.max) if self.max else volume,
|
||||
user_id=s.user_id,
|
||||
project_id=s.project_id,
|
||||
resource_id=s.resource_id,
|
||||
timestamp=s.timestamp,
|
||||
resource_metadata=s.resource_metadata
|
||||
)
|
||||
|
||||
def handle_sample(self, s):
|
||||
"""Handle a sample, converting if necessary."""
|
||||
LOG.debug('handling sample %s', s)
|
||||
if self.source.get('unit', s.unit) == s.unit:
|
||||
s = self._convert(s)
|
||||
LOG.debug('converted to: %s', s)
|
||||
return s
|
||||
|
||||
|
||||
class RateOfChangeTransformer(ScalingTransformer):
|
||||
"""Transformer based on the rate of change of a sample volume.
|
||||
|
||||
For example, taking the current and previous volumes of a cumulative sample
|
||||
and producing a gauge value based on the proportion of some maximum used.
|
||||
"""
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""Initialize transformer with configured parameters."""
|
||||
super(RateOfChangeTransformer, self).__init__(**kwargs)
|
||||
self.cache = {}
|
||||
self.scale = self.scale or '1'
|
||||
|
||||
def handle_sample(self, s):
|
||||
"""Handle a sample, converting if necessary."""
|
||||
LOG.debug('handling sample %s', s)
|
||||
key = s.name + s.resource_id
|
||||
prev = self.cache.get(key)
|
||||
timestamp = timeutils.parse_isotime(s.timestamp)
|
||||
self.cache[key] = (s.volume, timestamp, s.monotonic_time)
|
||||
|
||||
if prev:
|
||||
prev_volume = prev[0]
|
||||
prev_timestamp = prev[1]
|
||||
prev_monotonic_time = prev[2]
|
||||
if (prev_monotonic_time is not None and
|
||||
s.monotonic_time is not None):
|
||||
# NOTE(sileht): Prefer high precision timer
|
||||
time_delta = s.monotonic_time - prev_monotonic_time
|
||||
else:
|
||||
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
|
||||
# disallow violations of the arrow of time
|
||||
if time_delta < 0:
|
||||
LOG.warning(_('dropping out of time order sample: %s'), (s,))
|
||||
# Reset the cache to the newer sample.
|
||||
self.cache[key] = prev
|
||||
return None
|
||||
# we only allow negative volume deltas for noncumulative
|
||||
# samples, whereas for cumulative we assume that a reset has
|
||||
# occurred in the interim so that the current volume gives a
|
||||
# lower bound on growth
|
||||
volume_delta = (s.volume - prev_volume
|
||||
if (prev_volume <= s.volume or
|
||||
s.type != sample.TYPE_CUMULATIVE)
|
||||
else s.volume)
|
||||
rate_of_change = ((1.0 * volume_delta / time_delta)
|
||||
if time_delta else 0.0)
|
||||
|
||||
s = self._convert(s, rate_of_change)
|
||||
LOG.debug('converted to: %s', s)
|
||||
else:
|
||||
LOG.warning(_('dropping sample with no predecessor: %s'),
|
||||
(s,))
|
||||
s = None
|
||||
return s
|
||||
|
||||
|
||||
class AggregatorTransformer(ScalingTransformer):
|
||||
"""Transformer that aggregates samples.
|
||||
|
||||
Aggregation goes until a threshold or/and a retention_time, and then
|
||||
flushes them out into the wild.
|
||||
|
||||
Example:
|
||||
To aggregate sample by resource_metadata and keep the
|
||||
resource_metadata of the latest received sample;
|
||||
|
||||
AggregatorTransformer(retention_time=60, resource_metadata='last')
|
||||
|
||||
To aggregate sample by user_id and resource_metadata and keep the
|
||||
user_id of the first received sample and drop the resource_metadata.
|
||||
|
||||
AggregatorTransformer(size=15, user_id='first',
|
||||
resource_metadata='drop')
|
||||
|
||||
To keep the timestamp of the last received sample rather
|
||||
than the first:
|
||||
|
||||
AggregatorTransformer(timestamp="last")
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, size=1, retention_time=None,
|
||||
project_id=None, user_id=None, resource_metadata="last",
|
||||
timestamp="first", **kwargs):
|
||||
super(AggregatorTransformer, self).__init__(**kwargs)
|
||||
self.samples = {}
|
||||
self.counts = collections.defaultdict(int)
|
||||
self.size = int(size) if size else None
|
||||
self.retention_time = float(retention_time) if retention_time else None
|
||||
if not (self.size or self.retention_time):
|
||||
self.size = 1
|
||||
|
||||
if timestamp in ["first", "last"]:
|
||||
self.timestamp = timestamp
|
||||
else:
|
||||
self.timestamp = "first"
|
||||
|
||||
self.initial_timestamp = None
|
||||
self.aggregated_samples = 0
|
||||
|
||||
self.key_attributes = []
|
||||
self.merged_attribute_policy = {}
|
||||
|
||||
self._init_attribute('project_id', project_id)
|
||||
self._init_attribute('user_id', user_id)
|
||||
self._init_attribute('resource_metadata', resource_metadata,
|
||||
is_droppable=True, mandatory=True)
|
||||
|
||||
def _init_attribute(self, name, value, is_droppable=False,
|
||||
mandatory=False):
|
||||
drop = ['drop'] if is_droppable else []
|
||||
if value or mandatory:
|
||||
if value not in ['last', 'first'] + drop:
|
||||
LOG.warning('%s is unknown (%s), using last' % (name, value))
|
||||
value = 'last'
|
||||
self.merged_attribute_policy[name] = value
|
||||
else:
|
||||
self.key_attributes.append(name)
|
||||
|
||||
def _get_unique_key(self, s):
|
||||
# NOTE(arezmerita): in samples generated by ceilometer middleware,
|
||||
# when accessing without authentication publicly readable/writable
|
||||
# swift containers, the project_id and the user_id are missing.
|
||||
# They will be replaced by <undefined> for unique key construction.
|
||||
keys = ['<undefined>' if getattr(s, f) is None else getattr(s, f)
|
||||
for f in self.key_attributes]
|
||||
non_aggregated_keys = "-".join(keys)
|
||||
# NOTE(sileht): it assumes, a meter always have the same unit/type
|
||||
return "%s-%s-%s" % (s.name, s.resource_id, non_aggregated_keys)
|
||||
|
||||
def handle_sample(self, sample_):
|
||||
if not self.initial_timestamp:
|
||||
self.initial_timestamp = timeutils.parse_isotime(sample_.timestamp)
|
||||
|
||||
self.aggregated_samples += 1
|
||||
key = self._get_unique_key(sample_)
|
||||
self.counts[key] += 1
|
||||
if key not in self.samples:
|
||||
self.samples[key] = self._convert(sample_)
|
||||
if self.merged_attribute_policy[
|
||||
'resource_metadata'] == 'drop':
|
||||
self.samples[key].resource_metadata = {}
|
||||
else:
|
||||
if self.timestamp == "last":
|
||||
self.samples[key].timestamp = sample_.timestamp
|
||||
if sample_.type == sample.TYPE_CUMULATIVE:
|
||||
self.samples[key].volume = self._scale(sample_)
|
||||
else:
|
||||
self.samples[key].volume += self._scale(sample_)
|
||||
for field in self.merged_attribute_policy:
|
||||
if self.merged_attribute_policy[field] == 'last':
|
||||
setattr(self.samples[key], field,
|
||||
getattr(sample_, field))
|
||||
|
||||
def flush(self):
|
||||
if not self.initial_timestamp:
|
||||
return []
|
||||
|
||||
expired = (self.retention_time and
|
||||
timeutils.is_older_than(self.initial_timestamp,
|
||||
self.retention_time))
|
||||
full = self.size and self.aggregated_samples >= self.size
|
||||
if full or expired:
|
||||
x = list(self.samples.values())
|
||||
# gauge aggregates need to be averages
|
||||
for s in x:
|
||||
if s.type == sample.TYPE_GAUGE:
|
||||
key = self._get_unique_key(s)
|
||||
s.volume /= self.counts[key]
|
||||
self.samples.clear()
|
||||
self.counts.clear()
|
||||
self.aggregated_samples = 0
|
||||
self.initial_timestamp = None
|
||||
return x
|
||||
return []
|
@ -405,6 +405,9 @@ if is_service_enabled ceilometer; then
|
||||
start_ceilometer
|
||||
elif [[ "$1" == "stack" && "$2" == "test-config" ]]; then
|
||||
iniset $TEMPEST_CONFIG telemetry alarm_granularity $CEILOMETER_ALARM_GRANULARITY
|
||||
iniset $TEMPEST_CONFIG telemetry alarm_threshold 10000000000
|
||||
iniset $TEMPEST_CONFIG telemetry alarm_metric_name cpu
|
||||
iniset $TEMPEST_CONFIG telemetry alarm_aggregation_method rate:mean
|
||||
fi
|
||||
|
||||
if [[ "$1" == "unstack" ]]; then
|
||||
|
@ -19,7 +19,11 @@ fi
|
||||
# Gnocchi default archive_policy for Ceilometer
|
||||
# TODO(sileht): when Gnocchi 4.0 is out use the tarball instead
|
||||
GNOCCHI_GIT_PATH=${GNOCCHI_GIT_PATH:-git+https://github.com/gnocchixyz/gnocchi#egg=gnocchi}
|
||||
GNOCCHI_ARCHIVE_POLICY=${GNOCCHI_ARCHIVE_POLICY:-ceilometer-low}
|
||||
if [ -n "$GNOCCHI_ARCHIVE_POLICY_TEMPEST" ]; then
|
||||
GNOCCHI_ARCHIVE_POLICY=$GNOCCHI_ARCHIVE_POLICY_TEMPEST
|
||||
else
|
||||
GNOCCHI_ARCHIVE_POLICY=${GNOCCHI_ARCHIVE_POLICY:-ceilometer-low}
|
||||
fi
|
||||
GNOCCHI_CONF_DIR=${GNOCCHI_CONF_DIR:-/etc/gnocchi}
|
||||
GNOCCHI_CONF=${GNOCCHI_CONF:-${GNOCCHI_CONF_DIR}/gnocchi.conf}
|
||||
GNOCCHI_COORDINATOR_URL=${CEILOMETER_COORDINATOR_URL:-redis://localhost:6379}
|
||||
|
@ -103,14 +103,6 @@ The following meters are collected for OpenStack Compute.
|
||||
| cpu | Cumu\ | ns | instance | Pollster | Libvirt,| CPU time used |
|
||||
| | lative| | ID | | Hyper-V | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| cpu.delta | Delta | ns | instance | Pollster | Libvirt,| CPU time used s\ |
|
||||
| | | | ID | | Hyper-V | ince previous d\ |
|
||||
| | | | | | | atapoint |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| cpu_util | Gauge | % | instance | Pollster | LibVirt,| Average CPU |
|
||||
| | | | ID | | vSphere,| utilization |
|
||||
| | | | | | XenAPI | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| vcpus | Gauge | vcpu | instance | Notific\ | Libvirt,| Number of virtual|
|
||||
| | | | ID | ation | Hyper-V | CPUs allocated to|
|
||||
| | | | | | | the instance |
|
||||
@ -118,17 +110,9 @@ The following meters are collected for OpenStack Compute.
|
||||
| disk.read\| Cumul\| req\ | instance | Pollster | Libvirt,| Number of read |
|
||||
| .requests | ative | uest | ID | | Hyper-V | requests |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.read\| Gauge | requ\| instance | Pollster | Libvirt,| Average rate of |
|
||||
| .requests\| | est/s| ID | | Hyper-V,| read requests |
|
||||
| .rate | | | | | vSphere | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.writ\| Cumul\| req\ | instance | Pollster | Libvirt,| Number of write |
|
||||
| e.requests| ative | uest | ID | | Hyper-V | requests |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.writ\| Gauge | requ\| instance | Pollster | Libvirt,| Average rate of |
|
||||
| e.request\| | est/s| ID | | Hyper-V,| write requests |
|
||||
| s.rate | | | | | vSphere | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.read\| Cumu\ | B | instance | Pollster | Libvirt,| Volume of reads |
|
||||
| .bytes | lative| | ID | | Hyper-V | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
@ -149,38 +133,18 @@ The following meters are collected for OpenStack Compute.
|
||||
| ice.read\ | lative| uest | | | Hyper-V | requests |
|
||||
| .requests | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.dev\ | Gauge | requ\| disk ID | Pollster | Libvirt,| Average rate of |
|
||||
| ice.read\ | | est/s| | | Hyper-V,| read requests |
|
||||
| .requests\| | | | | vSphere | |
|
||||
| .rate | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.dev\ | Cumu\ | req\ | disk ID | Pollster | Libvirt,| Number of write |
|
||||
| ice.write\| lative| uest | | | Hyper-V | requests |
|
||||
| .requests | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.dev\ | Gauge | requ\| disk ID | Pollster | Libvirt,| Average rate of |
|
||||
| ice.write\| | est/s| | | Hyper-V,| write requests |
|
||||
| .requests\| | | | | vSphere | |
|
||||
| .rate | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.dev\ | Cumu\ | B | disk ID | Pollster | Libvirt,| Volume of reads |
|
||||
| ice.read\ | lative| | | | Hyper-V | |
|
||||
| .bytes | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.dev\ | Gauge | B/s | disk ID | Pollster | Libvirt,| Average rate of |
|
||||
| ice.read\ | | | | | Hyper-V,| reads |
|
||||
| .bytes | | | | | vSphere | |
|
||||
| .rate | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.dev\ | Cumu\ | B | disk ID | Pollster | Libvirt,| Volume of writes |
|
||||
| ice.write\| lative| | | | Hyper-V | |
|
||||
| .bytes | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.dev\ | Gauge | B/s | disk ID | Pollster | Libvirt,| Average rate of |
|
||||
| ice.write\| | | | | Hyper-V,| writes |
|
||||
| .bytes | | | | | vSphere | |
|
||||
| .rate | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| disk.root\| Gauge | GB | instance | Notific\ | Libvirt,| Size of root disk|
|
||||
| .size | | | ID | ation | Hyper-V | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
@ -236,38 +200,18 @@ The following meters are collected for OpenStack Compute.
|
||||
| incoming.\| lative| | ID | | Hyper-V | incoming bytes |
|
||||
| bytes | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| network.\ | Gauge | B/s | interface| Pollster | Libvirt,| Average rate of |
|
||||
| incoming.\| | | ID | | Hyper-V,| incoming bytes |
|
||||
| bytes.rate| | | | | vSphere,| |
|
||||
| | | | | | XenAPI | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| network.\ | Cumu\ | B | interface| Pollster | Libvirt,| Number of |
|
||||
| outgoing\ | lative| | ID | | Hyper-V | outgoing bytes |
|
||||
| .bytes | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| network.\ | Gauge | B/s | interface| Pollster | Libvirt,| Average rate of |
|
||||
| outgoing.\| | | ID | | Hyper-V,| outgoing bytes |
|
||||
| bytes.rate| | | | | vSphere,| |
|
||||
| | | | | | XenAPI | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| network.\ | Cumu\ | pac\ | interface| Pollster | Libvirt,| Number of |
|
||||
| incoming\ | lative| ket | ID | | Hyper-V | incoming packets |
|
||||
| .packets | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| network.\ | Gauge | pack\| interface| Pollster | Libvirt,| Average rate of |
|
||||
| incoming\ | | et/s | ID | | Hyper-V,| incoming packets |
|
||||
| .packets\ | | | | | vSphere,| |
|
||||
| .rate | | | | | XenAPI | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| network.\ | Cumu\ | pac\ | interface| Pollster | Libvirt,| Number of |
|
||||
| outgoing\ | lative| ket | ID | | Hyper-V | outgoing packets |
|
||||
| .packets | | | | | | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| network.\ | Gauge | pac\ | interface| Pollster | Libvirt,| Average rate of |
|
||||
| outgoing\ | | ket/s| ID | | Hyper-V,| outgoing packets |
|
||||
| .packets\ | | | | | vSphere,| |
|
||||
| .rate | | | | | XenAPI | |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| **Meters added in the Newton release** |
|
||||
+-----------+-------+------+----------+----------+---------+------------------+
|
||||
| cpu_l3_c\ | Gauge | B | instance | Pollster | Libvirt | L3 cache used b\ |
|
||||
@ -354,50 +298,6 @@ The following meters are collected for OpenStack Compute.
|
||||
To enable libvirt ``disk.*`` support when running on RBD-backed shared
|
||||
storage, you need to install libvirt version 1.2.16+.
|
||||
|
||||
The Telemetry service supports creating new meters by using transformers, but
|
||||
this is deprecated and discouraged to use. Among the meters gathered from
|
||||
libvirt and Hyper-V, there are a few which are derived from other meters. The
|
||||
list of meters that are created by using the ``rate_of_change`` transformer
|
||||
from the above table is the following:
|
||||
|
||||
- cpu_util
|
||||
|
||||
- cpu.delta
|
||||
|
||||
- disk.read.requests.rate
|
||||
|
||||
- disk.write.requests.rate
|
||||
|
||||
- disk.read.bytes.rate
|
||||
|
||||
- disk.write.bytes.rate
|
||||
|
||||
- disk.device.read.requests.rate
|
||||
|
||||
- disk.device.write.requests.rate
|
||||
|
||||
- disk.device.read.bytes.rate
|
||||
|
||||
- disk.device.write.bytes.rate
|
||||
|
||||
- network.incoming.bytes.rate
|
||||
|
||||
- network.outgoing.bytes.rate
|
||||
|
||||
- network.incoming.packets.rate
|
||||
|
||||
- network.outgoing.packets.rate
|
||||
|
||||
.. note::
|
||||
|
||||
If storing data in Gnocchi, derived rate_of_change metrics are also
|
||||
computed using Gnocchi in addition to Ceilometer transformers. It avoids
|
||||
missing data when Ceilometer services restart.
|
||||
To minimize Ceilometer memory requirements transformers can be disabled.
|
||||
These ``rate_of_change`` meters are deprecated and will be removed in
|
||||
default Ceilometer configuration in future release.
|
||||
|
||||
|
||||
OpenStack Compute is capable of collecting ``CPU`` related meters from
|
||||
the compute host machines. In order to use that you need to set the
|
||||
``compute_monitors`` option to ``cpu.virt_driver`` in the
|
||||
|
@ -0,0 +1,4 @@
|
||||
---
|
||||
upgrade:
|
||||
- |
|
||||
The support for transformers has been removed from the pipeline.
|
@ -222,14 +222,6 @@ ceilometer.compute.virt =
|
||||
ceilometer.hardware.inspectors =
|
||||
snmp = ceilometer.hardware.inspector.snmp:SNMPInspector
|
||||
|
||||
ceilometer.transformer =
|
||||
accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
|
||||
delta = ceilometer.transformer.conversions:DeltaTransformer
|
||||
unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
|
||||
rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
|
||||
aggregator = ceilometer.transformer.conversions:AggregatorTransformer
|
||||
arithmetic = ceilometer.transformer.arithmetic:ArithmeticTransformer
|
||||
|
||||
ceilometer.sample.publisher =
|
||||
test = ceilometer.publisher.test:TestPublisher
|
||||
notifier = ceilometer.publisher.messaging:SampleNotifierPublisher
|
||||
|
Loading…
x
Reference in New Issue
Block a user