From 5a7da39e7d20e8a9cb062c116abece1564577fee Mon Sep 17 00:00:00 2001 From: Ilya Shakhat Date: Mon, 19 Aug 2013 16:57:18 +0400 Subject: [PATCH] Users are kept in runtime storage during update cycle * Users are persisted by user_id and accessed from web font directly * Web app initializing section is refactored * 404 error handler is made more looking like error Closes bug 1213841 Change-Id: Ie6ff57505dcc14291cd3267a7448cddb2b478eb2 --- dashboard/memory_storage.py | 25 ++-- dashboard/templates/404.html | 14 +-- dashboard/web.py | 109 ++++++++++-------- .../processor/default_data_processor.py | 16 +-- stackalytics/processor/record_processor.py | 4 + stackalytics/processor/runtime_storage.py | 4 +- tests/unit/test_record_processor.py | 31 +++-- 7 files changed, 117 insertions(+), 86 deletions(-) diff --git a/dashboard/memory_storage.py b/dashboard/memory_storage.py index 7e8d44690..71105f35a 100644 --- a/dashboard/memory_storage.py +++ b/dashboard/memory_storage.py @@ -17,13 +17,13 @@ MEMORY_STORAGE_CACHED = 0 class MemoryStorage(object): - def __init__(self, records): + def __init__(self): pass class CachedMemoryStorage(MemoryStorage): - def __init__(self, records): - super(CachedMemoryStorage, self).__init__(records) + def __init__(self): + super(CachedMemoryStorage, self).__init__() # common indexes self.records = {} @@ -43,25 +43,28 @@ class CachedMemoryStorage(MemoryStorage): 'release': self.release_index, } - for record in records: - self._save_record(record) - - self.company_name_mapping = dict((c.lower(), c) - for c in self.company_index.keys()) - def _save_record(self, record): self.records[record['record_id']] = record for key, index in self.indexes.iteritems(): self._add_to_index(index, record, key) def update(self, records): + have_updates = False + for record in records: + have_updates = True record_id = record['record_id'] if record_id in self.records: # remove existing record from indexes self._remove_record_from_index(self.records[record_id]) self._save_record(record) + if have_updates: + self.company_name_mapping = dict( + (c.lower(), c) for c in self.company_index.keys()) + + return have_updates + def _remove_record_from_index(self, record): for key, index in self.indexes.iteritems(): index[record[key]].remove(record['record_id']) @@ -134,8 +137,8 @@ class CachedMemoryStorage(MemoryStorage): return self.user_id_index.keys() -def get_memory_storage(memory_storage_type, records): +def get_memory_storage(memory_storage_type): if memory_storage_type == MEMORY_STORAGE_CACHED: - return CachedMemoryStorage(records) + return CachedMemoryStorage() else: raise Exception('Unknown memory storage type %s' % memory_storage_type) diff --git a/dashboard/templates/404.html b/dashboard/templates/404.html index 4e6138d18..f90525863 100644 --- a/dashboard/templates/404.html +++ b/dashboard/templates/404.html @@ -1,19 +1,15 @@ -{% extends "layout.html" %} +{% extends "base.html" %} -{% block title %} - 404 +{% block head %} + {% endblock %} -{% block left_frame %} +{% block body %}

404 Not Found

-
The requested page is not found. Return to Overview +
The requested page is not found. The page will be automatically redirected to Overview
{% endblock %} - -{% block right_frame %} - -{% endblock %} diff --git a/dashboard/web.py b/dashboard/web.py index e2a0f1453..5493b0117 100644 --- a/dashboard/web.py +++ b/dashboard/web.py @@ -75,35 +75,32 @@ else: def get_vault(): vault = getattr(app, 'stackalytics_vault', None) if not vault: - vault = {} - runtime_storage_inst = runtime_storage.get_runtime_storage( - cfg.CONF.runtime_storage_uri) - vault['runtime_storage'] = runtime_storage_inst - vault['memory_storage'] = memory_storage.get_memory_storage( - memory_storage.MEMORY_STORAGE_CACHED, + try: + vault = {} + runtime_storage_inst = runtime_storage.get_runtime_storage( + cfg.CONF.runtime_storage_uri) + vault['runtime_storage'] = runtime_storage_inst + vault['memory_storage'] = memory_storage.get_memory_storage( + memory_storage.MEMORY_STORAGE_CACHED) + + init_project_types(vault) + init_releases(vault) + + app.stackalytics_vault = vault + except Exception as e: + LOG.critical('Failed to initialize application: %s', e) + LOG.exception(e) + flask.abort(500) + + if not getattr(flask.request, 'stackalytics_updated', None): + flask.request.stackalytics_updated = True + memory_storage_inst = vault['memory_storage'] + have_updates = memory_storage_inst.update( vault['runtime_storage'].get_update(os.getpid())) - releases = list(runtime_storage_inst.get_by_key('releases')) - vault['start_date'] = releases[0]['end_date'] - vault['end_date'] = releases[-1]['end_date'] - start_date = releases[0]['end_date'] - for r in releases[1:]: - r['start_date'] = start_date - start_date = r['end_date'] - vault['releases'] = dict((r['release_name'].lower(), r) - for r in releases[1:]) - modules = runtime_storage_inst.get_by_key('repos') - vault['modules'] = dict((r['module'].lower(), - r['project_type'].lower()) for r in modules) - app.stackalytics_vault = vault - - init_project_types(vault) - else: - if not getattr(flask.request, 'stackalytics_updated', None): - flask.request.stackalytics_updated = True - memory_storage_inst = vault['memory_storage'] - memory_storage_inst.update( - vault['runtime_storage'].get_update(os.getpid())) + if have_updates: + init_project_types(vault) + init_releases(vault) return vault @@ -112,12 +109,27 @@ def get_memory_storage(): return get_vault()['memory_storage'] +def init_releases(vault): + runtime_storage_inst = vault['runtime_storage'] + releases = runtime_storage_inst.get_by_key('releases') + if not releases: + raise Exception('Releases are missing in runtime storage') + vault['start_date'] = releases[0]['end_date'] + vault['end_date'] = releases[-1]['end_date'] + start_date = releases[0]['end_date'] + for r in releases[1:]: + r['start_date'] = start_date + start_date = r['end_date'] + vault['releases'] = dict((r['release_name'].lower(), r) + for r in releases[1:]) + + def init_project_types(vault): runtime_storage_inst = vault['runtime_storage'] project_type_options = {} project_type_group_index = {'all': set()} - for repo in runtime_storage_inst.get_by_key('repos'): + for repo in runtime_storage_inst.get_by_key('repos') or []: project_type = repo['project_type'].lower() project_group = None if ('project_group' in repo) and (repo['project_group']): @@ -176,6 +188,11 @@ def is_project_type_valid(project_type): return False +def get_user_from_runtime_storage(user_id): + runtime_storage_inst = get_vault()['runtime_storage'] + return runtime_storage_inst.get_by_key('user:%s' % user_id) + + # Utils --------- def get_default(param_name): @@ -209,12 +226,6 @@ def get_single_parameter(kwargs, singular_name, use_default=True): return '' -def validate_user_id(user_id): - runtime_storage_inst = get_vault()['runtime_storage'] - users_index = runtime_storage_inst.get_by_key('users') - return user_id in users_index - - # Decorators --------- def record_filter(ignore=None, use_default=True): @@ -250,7 +261,7 @@ def record_filter(ignore=None, use_default=True): if 'user_id' not in ignore: param = get_parameter(kwargs, 'user_id', 'user_ids') - param = [u for u in param if validate_user_id(u)] + param = [u for u in param if get_user_from_runtime_storage(u)] if param: record_ids &= ( memory_storage.get_record_ids_by_user_ids(param)) @@ -674,21 +685,19 @@ def get_users_json(records): @app.route('/data/users/.json') def get_user(user_id): - runtime_storage_inst = get_vault()['runtime_storage'] - users_index = runtime_storage_inst.get_by_key('users') - if user_id in users_index: - res = users_index[user_id].copy() - res['id'] = res['user_id'] - res['text'] = res['user_name'] - if res['companies']: - company_name = res['companies'][-1]['company_name'] - res['company_link'] = make_link( - company_name, '/', {'company': company_name}) - else: - res['company_link'] = '' - res['gravatar'] = gravatar(res['emails'][0]) - return json.dumps({'user': res}) - return json.dumps({}) + user = get_user_from_runtime_storage(user_id) + if not user: + flask.abort(404) + user['id'] = user['user_id'] + user['text'] = user['user_name'] + if user['companies']: + company_name = user['companies'][-1]['company_name'] + user['company_link'] = make_link( + company_name, '/', {'company': company_name}) + else: + user['company_link'] = '' + user['gravatar'] = gravatar(user['emails'][0]) + return json.dumps({'user': user}) @app.route('/data/timeline') diff --git a/stackalytics/processor/default_data_processor.py b/stackalytics/processor/default_data_processor.py index aa06e2c26..94ad7d8b2 100644 --- a/stackalytics/processor/default_data_processor.py +++ b/stackalytics/processor/default_data_processor.py @@ -76,24 +76,25 @@ def _retrieve_project_list(runtime_storage_inst, project_sources): runtime_storage_inst.set_by_key('repos', stored_repos) -def _process_users(users): +def _process_users(runtime_storage_inst, users): users_index = {} for user in users: + runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user) if 'user_id' in user: users_index[user['user_id']] = user if 'launchpad_id' in user: users_index[user['launchpad_id']] = user for email in user['emails']: users_index[email] = user - return users_index + runtime_storage_inst.set_by_key('users', users_index) -def _process_companies(companies): +def _process_companies(runtime_storage_inst, companies): domains_index = {} for company in companies: for domain in company['domains']: domains_index[domain] = company['company_name'] - return domains_index + runtime_storage_inst.set_by_key('companies', domains_index) KEYS = { @@ -105,15 +106,16 @@ KEYS = { def _update_default_data(runtime_storage_inst, default_data): + LOG.debug('Update runtime storage with default data') for key, processor in KEYS.iteritems(): if processor: - value = processor(default_data[key]) + processor(runtime_storage_inst, default_data[key]) else: - value = default_data[key] - runtime_storage_inst.set_by_key(key, value) + runtime_storage_inst.set_by_key(key, default_data[key]) def process(runtime_storage_inst, default_data, sources_root): + LOG.debug('Process default data') normalizer.normalize_default_data(default_data) diff --git a/stackalytics/processor/record_processor.py b/stackalytics/processor/record_processor.py index 0480098e1..96b8bd170 100644 --- a/stackalytics/processor/record_processor.py +++ b/stackalytics/processor/record_processor.py @@ -71,6 +71,9 @@ class RecordProcessor(object): LOG.debug('Create new user: %s', user) return user + def _store_user(self, user): + self.runtime_storage_inst.set_by_key('user:%s' % user['user_id'], user) + def _get_lp_info(self, email): lp_profile = None if not re.match(r'[\w\d_\.-]+@([\w\d_\.-]+\.)+[\w]+', email): @@ -115,6 +118,7 @@ class RecordProcessor(object): user_name = record['author_name'] user = self._create_user(launchpad_id, email, user_name) + self._store_user(user) self.users_index[email] = user if user['launchpad_id']: self.users_index[user['launchpad_id']] = user diff --git a/stackalytics/processor/runtime_storage.py b/stackalytics/processor/runtime_storage.py index 89d084124..7a044cbf9 100644 --- a/stackalytics/processor/runtime_storage.py +++ b/stackalytics/processor/runtime_storage.py @@ -115,10 +115,10 @@ class MemcachedStorage(RuntimeStorage): self._commit_update(record_id) def get_by_key(self, key): - return self.memcached.get(key) + return self.memcached.get(key.encode('utf8')) def set_by_key(self, key, value): - self.memcached.set(key, value) + self.memcached.set(key.encode('utf8'), value) def get_update(self, pid): last_update = self.memcached.get('pid:%s' % pid) diff --git a/tests/unit/test_record_processor.py b/tests/unit/test_record_processor.py index 052e91e2a..618ecddbf 100644 --- a/tests/unit/test_record_processor.py +++ b/tests/unit/test_record_processor.py @@ -16,7 +16,6 @@ import mock import testtools -from stackalytics.processor import default_data_processor from stackalytics.processor import record_processor from stackalytics.processor import runtime_storage from stackalytics.processor import utils @@ -25,6 +24,26 @@ from stackalytics.processor import utils LP_URI = 'https://api.launchpad.net/1.0/people/?ws.op=getByEmail&email=%s' +def _make_users(users): + users_index = {} + for user in users: + if 'user_id' in user: + users_index[user['user_id']] = user + if 'launchpad_id' in user: + users_index[user['launchpad_id']] = user + for email in user['emails']: + users_index[email] = user + return users_index + + +def _make_companies(companies): + domains_index = {} + for company in companies: + for domain in company['domains']: + domains_index[domain] = company['company_name'] + return domains_index + + class TestRecordProcessor(testtools.TestCase): def setUp(self): super(TestRecordProcessor, self).setUp() @@ -77,9 +96,9 @@ class TestRecordProcessor(testtools.TestCase): def get_by_key(table): if table == 'companies': - return default_data_processor._process_companies(companies) + return _make_companies(companies) elif table == 'users': - return default_data_processor._process_users(self.get_users()) + return _make_users(self.get_users()) elif table == 'releases': return releases else: @@ -160,8 +179,7 @@ class TestRecordProcessor(testtools.TestCase): commit = list(self.commit_processor.process(commit_generator))[0] - self.runtime_storage.set_by_key.assert_called_once_with('users', - mock.ANY) + self.runtime_storage.set_by_key.assert_called_with('users', mock.ANY) self.read_json.assert_called_once_with(LP_URI % email) self.assertIn(email, user['emails']) self.assertEquals('NEC', commit['company_name']) @@ -183,8 +201,7 @@ class TestRecordProcessor(testtools.TestCase): commit = list(self.commit_processor.process(commit_generator))[0] - self.runtime_storage.set_by_key.assert_called_once_with('users', - mock.ANY) + self.runtime_storage.set_by_key.assert_called_with('users', mock.ANY) self.read_json.assert_called_once_with(LP_URI % email) self.assertIn(email, user['emails']) self.assertEquals('SuperCompany', commit['company_name'])