From d2ae9d6dc61868c55ccf0c2c94c16d76d126d109 Mon Sep 17 00:00:00 2001 From: gordon chung Date: Thu, 2 Jul 2015 21:39:39 -0400 Subject: [PATCH] drop deprecated pipeline the old pipeline format was deprecated as of icehouse. this patch switches all tests to use current pipeline format and drops support for old pipeline. Change-Id: Ide53c1c5beab4a586324c4727dba3a9e200f8082 --- ceilometer/pipeline.py | 179 +++++++---------- ceilometer/tests/agent/agentbase.py | 193 +++++++++---------- ceilometer/tests/agent/test_manager.py | 28 +-- ceilometer/tests/pipeline_base.py | 16 +- ceilometer/tests/test_deprecated_pipeline.py | 135 ------------- ceilometer/tests/test_notification.py | 20 +- etc/ceilometer/deprecated_pipeline.yaml | 73 ------- 7 files changed, 199 insertions(+), 445 deletions(-) delete mode 100644 ceilometer/tests/test_deprecated_pipeline.py delete mode 100644 etc/ceilometer/deprecated_pipeline.yaml diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index 42e494087a..acd4b489c0 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -578,74 +578,44 @@ class PipelineManager(object): def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE): """Setup the pipelines according to config. - The configuration is supported in one of two forms: + The configuration is supported as follows: - 1. Deprecated: the source and sink configuration are conflated - as a list of consolidated pipelines. + Decoupled: the source and sink configuration are separately + specified before being linked together. This allows source- + specific configuration, such as resource discovery, to be + kept focused only on the fine-grained source while avoiding + the necessity for wide duplication of sink-related config. - The pipelines are defined as a list of dictionaries each - specifying the target samples, the transformers involved, - and the target publishers, for example: + The configuration is provided in the form of separate lists + of dictionaries defining sources and sinks, for example: - [{"name": pipeline_1, - "interval": interval_time, - "meters" : ["meter_1", "meter_2"], - "resources": ["resource_uri1", "resource_uri2"], - "transformers": [ - {"name": "Transformer_1", - "parameters": {"p1": "value"}}, + {"sources": [{"name": source_1, + "interval": interval_time, + "meters" : ["meter_1", "meter_2"], + "resources": ["resource_uri1", "resource_uri2"], + "sinks" : ["sink_1", "sink_2"] + }, + {"name": source_2, + "interval": interval_time, + "meters" : ["meter_3"], + "sinks" : ["sink_2"] + }, + ], + "sinks": [{"name": sink_1, + "transformers": [ + {"name": "Transformer_1", + "parameters": {"p1": "value"}}, - {"name": "Transformer_2", - "parameters": {"p1": "value"}}, - ], - "publishers": ["publisher_1", "publisher_2"] - }, - {"name": pipeline_2, - "interval": interval_time, - "meters" : ["meter_3"], - "publishers": ["publisher_3"] - }, - ] - - 2. Decoupled: the source and sink configuration are separately - specified before being linked together. This allows source- - specific configuration, such as resource discovery, to be - kept focused only on the fine-grained source while avoiding - the necessity for wide duplication of sink-related config. - - The configuration is provided in the form of separate lists - of dictionaries defining sources and sinks, for example: - - {"sources": [{"name": source_1, - "interval": interval_time, - "meters" : ["meter_1", "meter_2"], - "resources": ["resource_uri1", "resource_uri2"], - "sinks" : ["sink_1", "sink_2"] - }, - {"name": source_2, - "interval": interval_time, - "meters" : ["meter_3"], - "sinks" : ["sink_2"] - }, - ], - "sinks": [{"name": sink_1, - "transformers": [ - {"name": "Transformer_1", - "parameters": {"p1": "value"}}, - - {"name": "Transformer_2", - "parameters": {"p1": "value"}}, - ], - "publishers": ["publisher_1", "publisher_2"] - }, - {"name": sink_2, - "publishers": ["publisher_3"] - }, - ] - } - - The semantics of the common individual configuration elements - are identical in the deprecated and decoupled version. + {"name": "Transformer_2", + "parameters": {"p1": "value"}}, + ], + "publishers": ["publisher_1", "publisher_2"] + }, + {"name": sink_2, + "publishers": ["publisher_3"] + }, + ] + } The interval determines the cadence of sample injection into the pipeline where samples are produced under the direct control @@ -674,60 +644,47 @@ class PipelineManager(object): """ self.pipelines = [] - if 'sources' in cfg or 'sinks' in cfg: - if not ('sources' in cfg and 'sinks' in cfg): - raise PipelineException("Both sources & sinks are required", - cfg) - LOG.info(_('detected decoupled pipeline config format')) + if not ('sources' in cfg and 'sinks' in cfg): + raise PipelineException("Both sources & sinks are required", + cfg) + LOG.info(_('detected decoupled pipeline config format')) - unique_names = set() - sources = [] - for s in cfg.get('sources', []): - name = s.get('name') - if name in unique_names: - raise PipelineException("Duplicated source names: %s" % - name, self) - else: - unique_names.add(name) - sources.append(p_type['source'](s)) - unique_names.clear() + unique_names = set() + sources = [] + for s in cfg.get('sources', []): + name = s.get('name') + if name in unique_names: + raise PipelineException("Duplicated source names: %s" % + name, self) + else: + unique_names.add(name) + sources.append(p_type['source'](s)) + unique_names.clear() - sinks = {} - for s in cfg.get('sinks', []): - name = s.get('name') - if name in unique_names: - raise PipelineException("Duplicated sink names: %s" % - name, self) - else: - unique_names.add(name) - sinks[s['name']] = p_type['sink'](s, transformer_manager) - unique_names.clear() + sinks = {} + for s in cfg.get('sinks', []): + name = s.get('name') + if name in unique_names: + raise PipelineException("Duplicated sink names: %s" % + name, self) + else: + unique_names.add(name) + sinks[s['name']] = p_type['sink'](s, transformer_manager) + unique_names.clear() - for source in sources: - source.check_sinks(sinks) - for target in source.sinks: - pipe = p_type['pipeline'](source, sinks[target]) - if pipe.name in unique_names: - raise PipelineException( - "Duplicate pipeline name: %s. Ensure pipeline" - " names are unique. (name is the source and sink" - " names combined)" % pipe.name, cfg) - else: - unique_names.add(pipe.name) - self.pipelines.append(pipe) - unique_names.clear() - else: - LOG.warning(_('detected deprecated pipeline config format')) - for pipedef in cfg: - source = p_type['source'](pipedef) - sink = p_type['sink'](pipedef, transformer_manager) - pipe = p_type['pipeline'](source, sink) - if pipe.name in [p.name for p in self.pipelines]: + for source in sources: + source.check_sinks(sinks) + for target in source.sinks: + pipe = p_type['pipeline'](source, sinks[target]) + if pipe.name in unique_names: raise PipelineException( "Duplicate pipeline name: %s. Ensure pipeline" - " names are unique" % pipe.name, cfg) + " names are unique. (name is the source and sink" + " names combined)" % pipe.name, cfg) else: + unique_names.add(pipe.name) self.pipelines.append(pipe) + unique_names.clear() def publisher(self, context): """Build a new Publisher for these manager pipelines. diff --git a/ceilometer/tests/agent/agentbase.py b/ceilometer/tests/agent/agentbase.py index ce0face9c8..34e93f72da 100644 --- a/ceilometer/tests/agent/agentbase.py +++ b/ceilometer/tests/agent/agentbase.py @@ -232,14 +232,18 @@ class BaseAgentManagerTestCase(base.BaseTestCase): p_coord = self.mgr.partition_coordinator p_coord.extract_my_subset.side_effect = fake_subset self.mgr.tg = mock.MagicMock() - self.pipeline_cfg = [{ - 'name': "test_pipeline", - 'interval': 60, - 'counters': ['test'], - 'resources': ['test://'] if self.source_resources else [], - 'transformers': [], - 'publishers': ["test"], - }, ] + self.pipeline_cfg = { + 'sources': [{ + 'name': 'test_pipeline', + 'interval': 60, + 'meters': ['test'], + 'resources': ['test://'] if self.source_resources else [], + 'sinks': ['test_sink']}], + 'sinks': [{ + 'name': 'test_sink', + 'transformers': [], + 'publishers': ["test"]}] + } self.setup_pipeline() self.CONF = self.useFixture(fixture_config.Config()).conf self.CONF.set_override( @@ -294,7 +298,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.mgr.join_partitioning_groups() p_coord = self.mgr.partition_coordinator static_group_ids = [utils.hash_of_set(p['resources']) - for p in self.pipeline_cfg + for p in self.pipeline_cfg['sources'] if p['resources']] expected = [mock.call(self.mgr.construct_group_id(g)) for g in ['another_group', 'global'] + static_group_ids] @@ -308,7 +312,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertTrue(60 in polling_tasks.keys()) per_task_resources = polling_tasks[60].resources self.assertEqual(1, len(per_task_resources)) - self.assertEqual(set(self.pipeline_cfg[0]['resources']), + self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']), set(per_task_resources['test_pipeline-test'].get({}))) task = list(polling_tasks.values())[0] self.mgr.interval_task(task) @@ -317,13 +321,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertEqual(self.Pollster.test_data, pub.samples[0]) def test_setup_polling_tasks_multiple_interval(self): - self.pipeline_cfg.append({ - 'name': "test_pipeline_1", + self.pipeline_cfg['sources'].append({ + 'name': 'test_pipeline_1', 'interval': 10, - 'counters': ['test'], + 'meters': ['test'], 'resources': ['test://'] if self.source_resources else [], - 'transformers': [], - 'publishers': ["test"], + 'sinks': ['test_sink'] }) self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() @@ -332,27 +335,24 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertTrue(10 in polling_tasks.keys()) def test_setup_polling_tasks_mismatch_counter(self): - self.pipeline_cfg.append( - { - 'name': "test_pipeline_1", - 'interval': 10, - 'counters': ['test_invalid'], - 'resources': ['invalid://'], - 'transformers': [], - 'publishers': ["test"], - }) + self.pipeline_cfg['sources'].append({ + 'name': 'test_pipeline_1', + 'interval': 10, + 'meters': ['test_invalid'], + 'resources': ['invalid://'], + 'sinks': ['test_sink'] + }) polling_tasks = self.mgr.setup_polling_tasks() self.assertEqual(1, len(polling_tasks)) self.assertTrue(60 in polling_tasks.keys()) def test_setup_polling_task_same_interval(self): - self.pipeline_cfg.append({ - 'name': "test_pipeline_1", + self.pipeline_cfg['sources'].append({ + 'name': 'test_pipeline_1', 'interval': 60, - 'counters': ['testanother'], + 'meters': ['testanother'], 'resources': ['testanother://'] if self.source_resources else [], - 'transformers': [], - 'publishers': ["test"], + 'sinks': ['test_sink'] }) self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() @@ -362,31 +362,30 @@ class BaseAgentManagerTestCase(base.BaseTestCase): per_task_resources = polling_tasks[60].resources self.assertEqual(2, len(per_task_resources)) key = 'test_pipeline-test' - self.assertEqual(set(self.pipeline_cfg[0]['resources']), + self.assertEqual(set(self.pipeline_cfg['sources'][0]['resources']), set(per_task_resources[key].get({}))) key = 'test_pipeline_1-testanother' - self.assertEqual(set(self.pipeline_cfg[1]['resources']), + self.assertEqual(set(self.pipeline_cfg['sources'][1]['resources']), set(per_task_resources[key].get({}))) def test_interval_exception_isolation(self): - self.pipeline_cfg = [ - { - 'name': "test_pipeline_1", + self.pipeline_cfg = { + 'sources': [{ + 'name': 'test_pipeline_1', 'interval': 10, - 'counters': ['testexceptionanother'], + 'meters': ['testexceptionanother'], 'resources': ['test://'] if self.source_resources else [], + 'sinks': ['test_sink']}, + {'name': 'test_pipeline_2', + 'interval': 10, + 'meters': ['testexception'], + 'resources': ['test://'] if self.source_resources else [], + 'sinks': ['test_sink']}], + 'sinks': [{ + 'name': 'test_sink', 'transformers': [], - 'publishers': ["test"], - }, - { - 'name': "test_pipeline_2", - 'interval': 10, - 'counters': ['testexception'], - 'resources': ['test://'] if self.source_resources else [], - 'transformers': [], - 'publishers': ["test"], - }, - ] + 'publishers': ["test"]}] + } self.mgr.pipeline_manager = pipeline.PipelineManager( self.pipeline_cfg, self.transformer_manager) @@ -407,12 +406,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertTrue(mgr.tg.add_timer.called) def test_manager_exception_persistency(self): - self.pipeline_cfg.append({ - 'name': "test_pipeline_1", + self.pipeline_cfg['sources'].append({ + 'name': 'test_pipeline_1', 'interval': 60, - 'counters': ['testanother'], - 'transformers': [], - 'publishers': ["test"], + 'meters': ['testanother'], + 'sinks': ['test_sink'] }) self.setup_pipeline() @@ -431,10 +429,11 @@ class BaseAgentManagerTestCase(base.BaseTestCase): if static_resources: # just so we can test that static + pre_pipeline amalgamated # override per_pollster - self.pipeline_cfg[0]['discovery'] = ['testdiscoveryanother', - 'testdiscoverynonexistent', - 'testdiscoveryexception'] - self.pipeline_cfg[0]['resources'] = static_resources + self.pipeline_cfg['sources'][0]['discovery'] = [ + 'testdiscoveryanother', + 'testdiscoverynonexistent', + 'testdiscoveryexception'] + self.pipeline_cfg['sources'][0]['resources'] = static_resources self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) @@ -475,8 +474,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.PollsterAnother.discovery = 'testdiscovery' self.mgr.discovery_manager = self.create_discovery_manager() self.Discovery.resources = discovered_resources - self.pipeline_cfg[0]['counters'].append('testanother') - self.pipeline_cfg[0]['resources'] = [] + self.pipeline_cfg['sources'][0]['meters'].append('testanother') + self.pipeline_cfg['sources'][0]['resources'] = [] self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) @@ -491,11 +490,10 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.Discovery.resources = discovered_resources self.DiscoveryAnother.resources = [d[::-1] for d in discovered_resources] - self.pipeline_cfg[0]['discovery'] = ['testdiscovery', - 'testdiscoveryanother', - 'testdiscoverynonexistent', - 'testdiscoveryexception'] - self.pipeline_cfg[0]['resources'] = static_resources + self.pipeline_cfg['sources'][0]['discovery'] = [ + 'testdiscovery', 'testdiscoveryanother', + 'testdiscoverynonexistent', 'testdiscoveryexception'] + self.pipeline_cfg['sources'][0]['resources'] = static_resources self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) @@ -528,14 +526,18 @@ class BaseAgentManagerTestCase(base.BaseTestCase): # assert that the individual lists of static and discovered resources # for each pipeline with a common interval are passed to individual # pollsters matching each pipeline - self.pipeline_cfg[0]['resources'] = ['test://'] - self.pipeline_cfg[0]['discovery'] = ['testdiscovery'] - self.pipeline_cfg.append({ - 'name': "another_pipeline", + self.pipeline_cfg['sources'][0]['resources'] = ['test://'] + self.pipeline_cfg['sources'][0]['discovery'] = ['testdiscovery'] + self.pipeline_cfg['sources'].append({ + 'name': 'another_pipeline', 'interval': 60, - 'counters': ['test'], + 'meters': ['test'], 'resources': ['another://'], 'discovery': ['testdiscoveryanother'], + 'sinks': ['test_sink_new'] + }) + self.pipeline_cfg['sinks'].append({ + 'name': "test_sink_new", 'transformers': [], 'publishers': ["new"], }) @@ -561,8 +563,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.fail('unexpected sample resources %s' % samples) all_resources = set(test_resources) all_resources.update(another_resources) - expected_pipelines = {'test://': 'test_pipeline', - 'another://': 'another_pipeline'} + expected_pipelines = {'test://': 'test_pipeline:test_sink', + 'another://': 'another_pipeline:test_sink_new'} sunk_resources = [] for pipe_line in self.mgr.pipeline_manager.pipelines: self.assertEqual(1, len(pipe_line.publishers[0].samples)) @@ -582,12 +584,12 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.DiscoveryAnother.resources = ['discovered_3', 'discovered_4'] sources = [{'name': 'test_source_1', 'interval': 60, - 'counters': ['test'], + 'meters': ['test'], 'discovery': ['testdiscovery'], 'sinks': ['test_sink_1']}, {'name': 'test_source_2', 'interval': 60, - 'counters': ['testanother'], + 'meters': ['testanother'], 'discovery': ['testdiscoveryanother'], 'sinks': ['test_sink_2']}] sinks = [{'name': 'test_sink_1', @@ -614,7 +616,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.Discovery.resources = ['discovered_1', 'discovered_2'] sources = [{'name': 'test_source_1', 'interval': 60, - 'counters': ['test'], + 'meters': ['test'], 'discovery': ['testdiscovery'], 'sinks': ['test_sink_1', 'test_sink_2']}] sinks = [{'name': 'test_sink_1', @@ -637,11 +639,10 @@ class BaseAgentManagerTestCase(base.BaseTestCase): def test_discovery_partitioning(self): self.mgr.discovery_manager = self.create_discovery_manager() p_coord = self.mgr.partition_coordinator - self.pipeline_cfg[0]['discovery'] = ['testdiscovery', - 'testdiscoveryanother', - 'testdiscoverynonexistent', - 'testdiscoveryexception'] - self.pipeline_cfg[0]['resources'] = [] + self.pipeline_cfg['sources'][0]['discovery'] = [ + 'testdiscovery', 'testdiscoveryanother', + 'testdiscoverynonexistent', 'testdiscoveryexception'] + self.pipeline_cfg['sources'][0]['resources'] = [] self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) @@ -658,24 +659,22 @@ class BaseAgentManagerTestCase(base.BaseTestCase): p_coord = self.mgr.partition_coordinator static_resources = ['static_1', 'static_2'] static_resources2 = ['static_3', 'static_4'] - self.pipeline_cfg[0]['resources'] = static_resources - self.pipeline_cfg.append({ - 'name': "test_pipeline2", - 'interval': 60, - 'counters': ['test', 'test2'], - 'resources': static_resources2, - 'transformers': [], - 'publishers': ["test"], - }) + self.pipeline_cfg['sources'][0]['resources'] = static_resources + self.pipeline_cfg['sources'].append({ + 'name': 'test_pipeline2', + 'interval': 60, + 'meters': ['test', 'test2'], + 'resources': static_resources2, + 'sinks': ['test_sink'] + }) # have one pipeline without static resources defined - self.pipeline_cfg.append({ - 'name': "test_pipeline3", - 'interval': 60, - 'counters': ['test', 'test2'], - 'resources': [], - 'transformers': [], - 'publishers': ["test"], - }) + self.pipeline_cfg['sources'].append({ + 'name': 'test_pipeline3', + 'interval': 60, + 'meters': ['test', 'test2'], + 'resources': [], + 'sinks': ['test_sink'] + }) self.setup_pipeline() polling_tasks = self.mgr.setup_polling_tasks() self.mgr.interval_task(polling_tasks.get(60)) @@ -692,8 +691,8 @@ class BaseAgentManagerTestCase(base.BaseTestCase): self.assertIn(c, p_coord.extract_my_subset.call_args_list) def test_arithmetic_transformer(self): - self.pipeline_cfg[0]['counters'] = ['test', 'testanother'] - self.pipeline_cfg[0]['transformers'] = [ + self.pipeline_cfg['sources'][0]['meters'] = ['test', 'testanother'] + self.pipeline_cfg['sinks'][0]['transformers'] = [ {'name': 'arithmetic', 'parameters': { 'target': {'name': 'test_sum', @@ -714,7 +713,7 @@ class BaseAgentManagerTestCase(base.BaseTestCase): @mock.patch('ceilometer.tests.agent.agentbase.TestPollster.get_samples') def test_skip_polling_and_publish_with_no_resources( self, get_samples, LOG): - self.pipeline_cfg[0]['resources'] = [] + self.pipeline_cfg['sources'][0]['resources'] = [] self.setup_pipeline() polling_task = list(self.mgr.setup_polling_tasks().values())[0] pollster = list(polling_task.pollster_matches['test_pipeline'])[0] diff --git a/ceilometer/tests/agent/test_manager.py b/ceilometer/tests/agent/test_manager.py index cd53b4ad38..2b2fcd2912 100644 --- a/ceilometer/tests/agent/test_manager.py +++ b/ceilometer/tests/agent/test_manager.py @@ -213,16 +213,18 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): self.useFixture(mockpatch.Patch( 'keystoneclient.v2_0.client.Client', side_effect=Exception)) - self.pipeline_cfg = [ - { + self.pipeline_cfg = { + 'sources': [{ 'name': "test_keystone", 'interval': 10, - 'counters': ['testkeystone'], + 'meters': ['testkeystone'], 'resources': ['test://'] if self.source_resources else [], + 'sinks': ['test_sink']}], + 'sinks': [{ + 'name': 'test_sink', 'transformers': [], - 'publishers': ["test"], - }, - ] + 'publishers': ["test"]}] + } self.mgr.pipeline_manager = pipeline.PipelineManager( self.pipeline_cfg, self.transformer_manager) @@ -239,16 +241,18 @@ class TestRunTasks(agentbase.BaseAgentManagerTestCase): @mock.patch('ceilometer.agent.base.LOG') def test_polling_exception(self, LOG): source_name = 'test_pollingexception' - self.pipeline_cfg = [ - { + self.pipeline_cfg = { + 'sources': [{ 'name': source_name, 'interval': 10, - 'counters': ['testpollingexception'], + 'meters': ['testpollingexception'], 'resources': ['test://'] if self.source_resources else [], + 'sinks': ['test_sink']}], + 'sinks': [{ + 'name': 'test_sink', 'transformers': [], - 'publishers': ["test"], - }, - ] + 'publishers': ["test"]}] + } self.mgr.pipeline_manager = pipeline.PipelineManager( self.pipeline_cfg, self.transformer_manager) diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index 9f05ae7882..26ff1aeed4 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -750,16 +750,12 @@ class BasePipelineTestCase(base.BaseTestCase): getattr(publisher.samples[0], 'name')) def test_variable_counter(self): - self.pipeline_cfg = [{ - 'name': "test_pipeline", - 'interval': 5, - 'counters': ['a:*'], - 'transformers': [ - {'name': "update", - 'parameters': {}} - ], - 'publishers': ["test://"], - }, ] + transformer_cfg = [{ + 'name': "update", + 'parameters': {} + }] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('counters', ['a:*']) pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) diff --git a/ceilometer/tests/test_deprecated_pipeline.py b/ceilometer/tests/test_deprecated_pipeline.py deleted file mode 100644 index 6d2f68a9b2..0000000000 --- a/ceilometer/tests/test_deprecated_pipeline.py +++ /dev/null @@ -1,135 +0,0 @@ -# -# Copyright 2014 Red Hat, Inc -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import yaml - -from ceilometer import pipeline -from ceilometer.tests import pipeline_base - - -class TestDeprecatedPipeline(pipeline_base.BasePipelineTestCase): - def _setup_pipeline_cfg(self): - self.pipeline_cfg = [{ - 'name': 'test_pipeline', - 'interval': 5, - 'counters': ['a'], - 'transformers': [ - {'name': 'update', - 'parameters': {}} - ], - 'publishers': ['test://'], - }, ] - - def _augment_pipeline_cfg(self): - self.pipeline_cfg.append({ - 'name': 'second_pipeline', - 'interval': 5, - 'counters': ['b'], - 'transformers': [{ - 'name': 'update', - 'parameters': - { - 'append_name': '_new', - } - }], - 'publishers': ['new'], - }) - - def _break_pipeline_cfg(self): - self.pipeline_cfg.append({ - 'name': 'second_pipeline', - 'interval': 5, - 'counters': ['b'], - 'transformers': [{ - 'name': 'update', - 'parameters': - { - 'append_name': '_new', - } - }], - 'publishers': ['except'], - }) - - def _dup_pipeline_name_cfg(self): - self.pipeline_cfg.append({ - 'name': 'test_pipeline', - 'interval': 5, - 'counters': ['b'], - 'transformers': [], - 'publishers': ['except'], - }) - - def _set_pipeline_cfg(self, field, value): - self.pipeline_cfg[0][field] = value - - def _extend_pipeline_cfg(self, field, value): - self.pipeline_cfg[0][field].extend(value) - - def _unset_pipeline_cfg(self, field): - del self.pipeline_cfg[0][field] - - def _do_test_rate_of_change_in_boilerplate_pipeline_cfg(self, index, - meters, units): - with open('etc/ceilometer/deprecated_pipeline.yaml') as fap: - data = fap.read() - pipeline_cfg = yaml.safe_load(data) - for p in pipeline_cfg: - p['publishers'] = ['test://'] - pipeline_manager = pipeline.PipelineManager(pipeline_cfg, - self.transformer_manager) - pipe = pipeline_manager.pipelines[index] - self._do_test_rate_of_change_mapping(pipe, meters, units) - - def test_rate_of_change_boilerplate_disk_read_cfg(self): - meters = ('disk.read.bytes', 'disk.read.requests') - units = ('B', 'request') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2, - meters, - units) - - def test_rate_of_change_boilerplate_disk_write_cfg(self): - meters = ('disk.write.bytes', 'disk.write.requests') - units = ('B', 'request') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2, - meters, - units) - - def test_rate_of_change_boilerplate_per_disk_device_read_cfg(self): - meters = ('disk.device.read.bytes', 'disk.device.read.requests') - units = ('B', 'request') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2, - meters, - units) - - def test_rate_of_change_boilerplate_per_disk_device_write_cfg(self): - meters = ('disk.device.write.bytes', 'disk.device.write.requests') - units = ('B', 'request') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(2, - meters, - units) - - def test_rate_of_change_boilerplate_network_incoming_cfg(self): - meters = ('network.incoming.bytes', 'network.incoming.packets') - units = ('B', 'packet') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, - meters, - units) - - def test_rate_of_change_boilerplate_network_outgoing_cfg(self): - meters = ('network.outgoing.bytes', 'network.outgoing.packets') - units = ('B', 'packet') - self._do_test_rate_of_change_in_boilerplate_pipeline_cfg(3, - meters, - units) diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py index 25588cf6c1..6827cc2c17 100644 --- a/ceilometer/tests/test_notification.py +++ b/ceilometer/tests/test_notification.py @@ -176,13 +176,19 @@ class BaseRealNotification(tests_base.BaseTestCase): self.CONF = self.useFixture(fixture_config.Config()).conf self.setup_messaging(self.CONF, 'nova') - pipeline = yaml.dump([{ - 'name': 'test_pipeline', - 'interval': 5, - 'counters': ['instance', 'memory'], - 'transformers': [], - 'publishers': ['test://'], - }]) + pipeline = yaml.dump({ + 'sources': [{ + 'name': 'test_pipeline', + 'interval': 5, + 'meters': ['instance', 'memory'], + 'sinks': ['test_sink'] + }], + 'sinks': [{ + 'name': 'test_sink', + 'transformers': [], + 'publishers': ['test://'] + }] + }) if six.PY3: pipeline = pipeline.encode('utf-8') self.expected_samples = 2 diff --git a/etc/ceilometer/deprecated_pipeline.yaml b/etc/ceilometer/deprecated_pipeline.yaml deleted file mode 100644 index 6e4597fc29..0000000000 --- a/etc/ceilometer/deprecated_pipeline.yaml +++ /dev/null @@ -1,73 +0,0 @@ ---- -- - name: meter_pipeline - interval: 600 - meters: - - "*" - resources: - transformers: - publishers: - - rpc:// -- - name: cpu_pipeline - interval: 600 - meters: - - "cpu" - transformers: - - name: "rate_of_change" - parameters: - target: - name: "cpu_util" - unit: "%" - type: "gauge" - scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" - publishers: - - rpc:// -- - name: disk_pipeline - interval: 600 - meters: - - "disk.read.bytes" - - "disk.read.requests" - - "disk.write.bytes" - - "disk.write.requests" - - "disk.device.read.bytes" - - "disk.device.read.requests" - - "disk.device.write.bytes" - - "disk.device.write.requests" - transformers: - - name: "rate_of_change" - parameters: - source: - map_from: - name: "(disk\\.device|disk)\\.(read|write)\\.(bytes|requests)" - unit: "(B|request)" - target: - map_to: - name: "\\1.\\2.\\3.rate" - unit: "\\1/s" - type: "gauge" - publishers: - - rpc:// -- - name: network_pipeline - interval: 600 - meters: - - "network.incoming.bytes" - - "network.incoming.packets" - - "network.outgoing.bytes" - - "network.outgoing.packets" - transformers: - - name: "rate_of_change" - parameters: - source: - map_from: - name: "network\\.(incoming|outgoing)\\.(bytes|packets)" - unit: "(B|packet)" - target: - map_to: - name: "network.\\1.\\2.rate" - unit: "\\1/s" - type: "gauge" - publishers: - - rpc://