Batching of event & meter dispatching over HTTP
batch_mode = True activates batching (default is False, for backward compatibility) Change-Id: I58a856f6cdf5367c1d44d278ac08c8ebda360dff
This commit is contained in:
parent
c61befb67a
commit
54e4cc77e0
@ -42,6 +42,9 @@ http_dispatcher_opts = [
|
||||
help='The path to a server certificate or directory if the '
|
||||
'system CAs are not used or if a self-signed certificate '
|
||||
'is used. Set to False to ignore SSL cert verification.'),
|
||||
cfg.BoolOpt('batch_mode',
|
||||
default=False,
|
||||
help='Indicates whether samples are published in a batch.'),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(http_dispatcher_opts, group="dispatcher_http")
|
||||
@ -70,6 +73,12 @@ class HttpDispatcher(dispatcher.MeterDispatcherBase,
|
||||
verify_ssl = True
|
||||
# SSL verification with specific CA or directory of certs
|
||||
#verify_ssl = /path/to/ca_certificate.crt
|
||||
|
||||
Instead of publishing events and meters as JSON objects in individual HTTP
|
||||
requests, they can be batched up and published as JSON arrays of objects::
|
||||
|
||||
[dispatcher_http]
|
||||
batch_mode = True
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
@ -79,6 +88,14 @@ class HttpDispatcher(dispatcher.MeterDispatcherBase,
|
||||
self.target = self.conf.dispatcher_http.target
|
||||
self.event_target = (self.conf.dispatcher_http.event_target or
|
||||
self.target)
|
||||
|
||||
if self.conf.dispatcher_http.batch_mode:
|
||||
self.post_event_data = self.post_event
|
||||
self.post_meter_data = self.post_meter
|
||||
else:
|
||||
self.post_event_data = self.post_individual_events
|
||||
self.post_meter_data = self.post_individual_meters
|
||||
|
||||
try:
|
||||
self.verify_ssl = strutils.bool_from_string(
|
||||
self.conf.dispatcher_http.verify_ssl, strict=True)
|
||||
@ -97,30 +114,30 @@ class HttpDispatcher(dispatcher.MeterDispatcherBase,
|
||||
if not isinstance(data, list):
|
||||
data = [data]
|
||||
|
||||
for meter in data:
|
||||
LOG.debug(
|
||||
'metering data %(counter_name)s '
|
||||
'for %(resource_id)s @ %(timestamp)s: %(counter_volume)s',
|
||||
{'counter_name': meter['counter_name'],
|
||||
'resource_id': meter['resource_id'],
|
||||
'timestamp': meter.get('timestamp', 'NO TIMESTAMP'),
|
||||
'counter_volume': meter['counter_volume']})
|
||||
try:
|
||||
# Every meter should be posted to the target
|
||||
meter_json = json.dumps(meter)
|
||||
LOG.trace('Meter Message: %s', meter_json)
|
||||
res = requests.post(self.target,
|
||||
data=meter_json,
|
||||
headers=self.headers,
|
||||
verify=self.verify_ssl,
|
||||
timeout=self.timeout)
|
||||
LOG.debug('Meter message posting finished with status code '
|
||||
'%d.', res.status_code)
|
||||
res.raise_for_status()
|
||||
except requests.exceptions.HTTPError:
|
||||
LOG.exception(_LE('Status Code: %(code)s. Failed to '
|
||||
'dispatch meter: %(meter)s'),
|
||||
{'code': res.status_code, 'meter': meter})
|
||||
self.post_meter_data(data)
|
||||
|
||||
def post_individual_meters(self, meters):
|
||||
for meter in meters:
|
||||
self.post_meter(meter)
|
||||
|
||||
def post_meter(self, meter):
|
||||
meter_json = json.dumps(meter)
|
||||
res = None
|
||||
try:
|
||||
LOG.trace('Meter Message: %s', meter_json)
|
||||
res = requests.post(self.target,
|
||||
data=meter_json,
|
||||
headers=self.headers,
|
||||
verify=self.verify_ssl,
|
||||
timeout=self.timeout)
|
||||
LOG.debug('Meter message posting finished with status code '
|
||||
'%d.', res.status_code)
|
||||
res.raise_for_status()
|
||||
|
||||
except requests.exceptions.HTTPError:
|
||||
LOG.exception(_LE('Status Code: %(code)s. '
|
||||
'Failed to dispatch meter: %(meter)s') %
|
||||
{'code': res.status_code, 'meter': meter_json})
|
||||
|
||||
def record_events(self, events):
|
||||
if self.event_target == '':
|
||||
@ -133,19 +150,26 @@ class HttpDispatcher(dispatcher.MeterDispatcherBase,
|
||||
if not isinstance(events, list):
|
||||
events = [events]
|
||||
|
||||
self.post_event_data(events)
|
||||
|
||||
def post_individual_events(self, events):
|
||||
for event in events:
|
||||
try:
|
||||
event_json = json.dumps(event)
|
||||
LOG.trace('Event Message: %s', event_json)
|
||||
res = requests.post(self.event_target,
|
||||
data=event_json,
|
||||
headers=self.headers,
|
||||
verify=self.verify_ssl,
|
||||
timeout=self.timeout)
|
||||
LOG.debug('Event Message posting finished with status code '
|
||||
'%d.', res.status_code)
|
||||
res.raise_for_status()
|
||||
except requests.exceptions.HTTPError:
|
||||
LOG.exception(_LE('Status Code: %(code)s. Failed to '
|
||||
'dispatch event: %(event)s'),
|
||||
{'code': res.status_code, 'event': event})
|
||||
self.post_event(event)
|
||||
|
||||
def post_event(self, event):
|
||||
res = None
|
||||
try:
|
||||
event_json = json.dumps(event)
|
||||
LOG.trace('Event Message: %s', event_json)
|
||||
res = requests.post(self.event_target,
|
||||
data=event_json,
|
||||
headers=self.headers,
|
||||
verify=self.verify_ssl,
|
||||
timeout=self.timeout)
|
||||
LOG.debug('Event Message posting to %s: status code %d.',
|
||||
self.event_target, res.status_code)
|
||||
res.raise_for_status()
|
||||
except requests.exceptions.HTTPError:
|
||||
LOG.exception(_LE('Status Code: %(code)s. '
|
||||
'Failed to dispatch event: %(event)s') %
|
||||
{'code': res.status_code, 'event': event_json})
|
||||
|
@ -119,6 +119,24 @@ class TestDispatcherHttp(base.BaseTestCase):
|
||||
|
||||
self.assertEqual('/path/to/cert.crt', post.call_args[1]['verify'])
|
||||
|
||||
def test_http_dispatcher_non_batch(self):
|
||||
self.CONF.dispatcher_http.target = 'fake'
|
||||
self.CONF.dispatcher_http.batch_mode = False
|
||||
dispatcher = http.HttpDispatcher(self.CONF)
|
||||
|
||||
with mock.patch('requests.post') as post:
|
||||
dispatcher.record_metering_data([self.msg, self.msg])
|
||||
self.assertEqual(2, post.call_count)
|
||||
|
||||
def test_http_dispatcher_batch(self):
|
||||
self.CONF.dispatcher_http.target = 'fake'
|
||||
self.CONF.dispatcher_http.batch_mode = True
|
||||
dispatcher = http.HttpDispatcher(self.CONF)
|
||||
|
||||
with mock.patch('requests.post') as post:
|
||||
dispatcher.record_metering_data([self.msg, self.msg, self.msg])
|
||||
self.assertEqual(1, post.call_count)
|
||||
|
||||
|
||||
class TestEventDispatcherHttp(base.BaseTestCase):
|
||||
"""Test sending events with the http dispatcher"""
|
||||
@ -191,3 +209,21 @@ class TestEventDispatcherHttp(base.BaseTestCase):
|
||||
dispatcher.record_events(self.event)
|
||||
|
||||
self.assertEqual('/path/to/cert.crt', post.call_args[1]['verify'])
|
||||
|
||||
def test_http_dispatcher_nonbatch_event(self):
|
||||
self.CONF.dispatcher_http.event_target = 'fake'
|
||||
self.CONF.dispatcher_http.batch_mode = False
|
||||
dispatcher = http.HttpDispatcher(self.CONF)
|
||||
|
||||
with mock.patch('requests.post') as post:
|
||||
dispatcher.record_events([self.event, self.event])
|
||||
self.assertEqual(2, post.call_count)
|
||||
|
||||
def test_http_dispatcher_batch_event(self):
|
||||
self.CONF.dispatcher_http.event_target = 'fake'
|
||||
self.CONF.dispatcher_http.batch_mode = True
|
||||
dispatcher = http.HttpDispatcher(self.CONF)
|
||||
|
||||
with mock.patch('requests.post') as post:
|
||||
dispatcher.record_events([self.event, self.event])
|
||||
self.assertEqual(1, post.call_count)
|
||||
|
@ -0,0 +1,7 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
In the [dispatcher_http] section of ceilometer.conf, batch_mode can be
|
||||
set to True to activate sending meters and events in batches, or
|
||||
False (default value) to send each meter and event with a fresh HTTP call.
|
||||
|
Loading…
x
Reference in New Issue
Block a user