diff --git a/nova/flags.py b/nova/flags.py index 239e45385a8e..9ab59bbe352b 100644 --- a/nova/flags.py +++ b/nova/flags.py @@ -406,7 +406,8 @@ global_opts = [ 'image_type', 'backup_type', 'min_ram', - 'min_disk'], + 'min_disk', + 'bittorrent'], help='These are image properties which a snapshot should not' ' inherit from an instance'), cfg.BoolOpt('defer_iptables_apply', diff --git a/nova/virt/xenapi/vm_utils.py b/nova/virt/xenapi/vm_utils.py index 7a37bcdacc0e..bd8fad4749ca 100644 --- a/nova/virt/xenapi/vm_utils.py +++ b/nova/virt/xenapi/vm_utils.py @@ -80,6 +80,39 @@ xenapi_vm_utils_opts = [ cfg.IntOpt('xenapi_num_vbd_unplug_retries', default=10, help='Maximum number of retries to unplug VBD'), + cfg.StrOpt('xenapi_torrent_images', + default='none', + help='Whether or not to download images via Bit Torrent ' + '(all|some|none).'), + cfg.StrOpt('xenapi_torrent_base_url', + default=None, + help='Base URL for torrent files.'), + cfg.FloatOpt('xenapi_torrent_seed_chance', + default=1.0, + help='Probability that peer will become a seeder.' + ' (1.0 = 100%)'), + cfg.IntOpt('xenapi_torrent_seed_duration', + default=3600, + help='Number of seconds after downloading an image via' + ' BitTorrent that it should be seeded for other peers.'), + cfg.IntOpt('xenapi_torrent_max_last_accessed', + default=86400, + help='Cached torrent files not accessed within this number of' + ' seconds can be reaped'), + cfg.IntOpt('xenapi_torrent_listen_port_start', + default=6881, + help='Beginning of port range to listen on'), + cfg.IntOpt('xenapi_torrent_listen_port_end', + default=6891, + help='End of port range to listen on'), + cfg.IntOpt('xenapi_torrent_download_stall_cutoff', + default=600, + help='Number of seconds a download can remain at the same' + ' progress percentage w/o being considered a stall'), + cfg.IntOpt('xenapi_torrent_max_seeder_processes_per_host', + default=1, + help='Maximum number of seeder processes to run concurrently' + ' within a given dom0. (-1 = no limit)') ] FLAGS = flags.FLAGS @@ -975,6 +1008,29 @@ def _make_uuid_stack(): return [str(uuid.uuid4()) for i in xrange(MAX_VDI_CHAIN_SIZE)] +def _image_uses_bittorrent(context, instance): + bittorrent = False + xenapi_torrent_images = FLAGS.xenapi_torrent_images.lower() + + if xenapi_torrent_images == 'all': + bittorrent = True + elif xenapi_torrent_images == 'some': + # FIXME(sirp): This should be eager loaded like instance metadata + sys_meta = db.instance_system_metadata_get(context, + instance['uuid']) + try: + bittorrent = utils.bool_from_str(sys_meta['image_bittorrent']) + except KeyError: + pass + elif xenapi_torrent_images == 'none': + pass + else: + LOG.warning(_("Invalid value '%s' for xenapi_torrent_images"), + xenapi_torrent_images) + + return bittorrent + + def _fetch_vhd_image(context, session, instance, image_id): """Tell glance to download an image and put the VHDs into the SR @@ -985,21 +1041,40 @@ def _fetch_vhd_image(context, session, instance, image_id): params = {'image_id': image_id, 'uuid_stack': _make_uuid_stack(), - 'sr_path': get_sr_path(session), - 'auth_token': getattr(context, 'auth_token', None)} + 'sr_path': get_sr_path(session)} - glance_api_servers = glance.get_api_servers() + if _image_uses_bittorrent(context, instance): + plugin_name = 'bittorrent' + callback = None + params['torrent_base_url'] = FLAGS.xenapi_torrent_base_url + params['torrent_seed_duration'] = FLAGS.xenapi_torrent_seed_duration + params['torrent_seed_chance'] = FLAGS.xenapi_torrent_seed_chance + params['torrent_max_last_accessed'] =\ + FLAGS.xenapi_torrent_max_last_accessed + params['torrent_listen_port_start'] =\ + FLAGS.xenapi_torrent_listen_port_start + params['torrent_listen_port_end'] =\ + FLAGS.xenapi_torrent_listen_port_end + params['torrent_download_stall_cutoff'] =\ + FLAGS.xenapi_torrent_download_stall_cutoff + params['torrent_max_seeder_processes_per_host'] =\ + FLAGS.xenapi_torrent_max_seeder_processes_per_host + else: + plugin_name = 'glance' + glance_api_servers = glance.get_api_servers() - def pick_glance(params): - glance_host, glance_port, glance_use_ssl = glance_api_servers.next() - params['glance_host'] = glance_host - params['glance_port'] = glance_port - params['glance_use_ssl'] = glance_use_ssl + def pick_glance(params): + g_host, g_port, g_use_ssl = glance_api_servers.next() + params['glance_host'] = g_host + params['glance_port'] = g_port + params['glance_use_ssl'] = g_use_ssl + params['auth_token'] = getattr(context, 'auth_token', None) + + callback = pick_glance - plugin_name = 'glance' vdis = _fetch_using_dom0_plugin_with_retry( context, session, image_id, plugin_name, params, - callback=pick_glance) + callback=callback) sr_ref = safe_find_sr(session) _scan_sr(session, sr_ref) diff --git a/plugins/xenserver/xenapi/contrib/rpmbuild/SPECS/openstack-xen-plugins.spec b/plugins/xenserver/xenapi/contrib/rpmbuild/SPECS/openstack-xen-plugins.spec index 2260392b1558..19b508d2e1a4 100644 --- a/plugins/xenserver/xenapi/contrib/rpmbuild/SPECS/openstack-xen-plugins.spec +++ b/plugins/xenserver/xenapi/contrib/rpmbuild/SPECS/openstack-xen-plugins.spec @@ -28,8 +28,10 @@ rm -rf $RPM_BUILD_ROOT %files %defattr(-,root,root,-) +/etc/xapi.d/plugins/_bittorrent_seeder /etc/xapi.d/plugins/agent /etc/xapi.d/plugins/bandwidth +/etc/xapi.d/plugins/bittorrent /etc/xapi.d/plugins/glance /etc/xapi.d/plugins/kernel /etc/xapi.d/plugins/migration diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/_bittorrent_seeder b/plugins/xenserver/xenapi/etc/xapi.d/plugins/_bittorrent_seeder new file mode 100755 index 000000000000..88262139e4e1 --- /dev/null +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/_bittorrent_seeder @@ -0,0 +1,121 @@ +#!/usr/bin/env python + +# Copyright (c) 2012 Openstack, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Seed a bittorent image. This file should not be executed directly, rather it +should be kicked off by the `bittorent` dom0 plugin.""" + +import os +import sys +import time + +import libtorrent + +#FIXME(sirp): should this use pluginlib from 5.6? +from pluginlib_nova import * +configure_logging('_bittorrent_seeder') + + +def _daemonize(stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): + """ + do the UNIX double-fork magic, see Stevens' "Advanced + Programming in the UNIX Environment" for details (ISBN 0201563177) + + Source: http://www.jejik.com/articles/2007/02/ + a_simple_unix_linux_daemon_in_python/ + """ + # 1st fork + try: + pid = os.fork() + if pid > 0: + # first parent returns + return False + except OSError, e: + logging.error("fork #1 failed: %d (%s)" % ( + e.errno, e.strerror)) + return + + # decouple from parent environment + os.chdir("/") + os.setsid() + os.umask(0) + + # 2nd fork + try: + pid = os.fork() + if pid > 0: + # second parent exits + sys.exit(0) + except OSError, e: + logging.error("fork #2 failed: %d (%s)" % ( + e.errno, e.strerror)) + return + + # redirect standard file descriptors + sys.stdout.flush() + sys.stderr.flush() + si = open(stdin, 'r') + so = open(stdout, 'a+') + se = open(stderr, 'a+', 0) + os.dup2(si.fileno(), sys.stdin.fileno()) + os.dup2(so.fileno(), sys.stdout.fileno()) + os.dup2(se.fileno(), sys.stderr.fileno()) + return True + + +def main(torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end): + seed_time = time.time() + torrent_seed_duration + logging.debug("Seeding '%s' for %d secs" % ( + torrent_path, torrent_seed_duration)) + + child = _daemonize() + if not child: + return + + # At this point we're the daemonized child... + session = libtorrent.session() + session.listen_on(torrent_listen_port_start, torrent_listen_port_end) + + torrent_file = open(torrent_path, 'rb') + try: + torrent_data = torrent_file.read() + finally: + torrent_file.close() + + decoded_data = libtorrent.bdecode(torrent_data) + + info = libtorrent.torrent_info(decoded_data) + torrent = session.add_torrent( + info, seed_cache_path, + storage_mode=libtorrent.storage_mode_t.storage_mode_sparse) + try: + while time.time() < seed_time: + time.sleep(5) + finally: + session.remove_torrent(torrent) + + logging.debug("Seeding of '%s' finished" % torrent_path) + + +if __name__ == "__main__": + (torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end) = sys.argv[1:] + torrent_seed_duration = int(torrent_seed_duration) + torrent_listen_port_start = int(torrent_listen_port_start) + torrent_listen_port_end = int(torrent_listen_port_end) + + main(torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end) diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/bittorrent b/plugins/xenserver/xenapi/etc/xapi.d/plugins/bittorrent new file mode 100755 index 000000000000..fef1862b1e40 --- /dev/null +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/bittorrent @@ -0,0 +1,299 @@ +#!/usr/bin/env python + +# Copyright (c) 2012 Openstack, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Download images via BitTorrent.""" + +import errno +import inspect +import os +import random +import shutil +import tempfile +import time + +import libtorrent +import urllib2 +import XenAPIPlugin + +import utils + +#FIXME(sirp): should this use pluginlib from 5.6? +from pluginlib_nova import * +configure_logging('bittorrent') + +DEFAULT_TORRENT_CACHE = '/images/torrents' +DEFAULT_SEED_CACHE = '/images/seeds' +SEEDER_PROCESS = '_bittorrent_seeder' + + +def _make_torrent_cache(): + torrent_cache_path = os.environ.get( + 'TORRENT_CACHE', DEFAULT_TORRENT_CACHE) + + if not os.path.exists(torrent_cache_path): + os.mkdir(torrent_cache_path) + + return torrent_cache_path + + +def _fetch_torrent_file(torrent_cache_path, image_id, torrent_base_url): + torrent_path = os.path.join( + torrent_cache_path, image_id + '.torrent') + + if not os.path.exists(torrent_path): + torrent_url = torrent_base_url + "/%s.torrent" % image_id + logging.info("Downloading %s" % torrent_url) + + # Write contents to temporary path to ensure we don't have partially + # completed files in the cache. + temp_directory = tempfile.mkdtemp(dir=torrent_cache_path) + try: + temp_path = os.path.join( + temp_directory, os.path.basename(torrent_path)) + temp_file = open(temp_path, 'wb') + try: + remote_torrent_file = urllib2.urlopen(torrent_url) + shutil.copyfileobj(remote_torrent_file, temp_file) + finally: + temp_file.close() + + os.rename(temp_path, torrent_path) + finally: + shutil.rmtree(temp_directory) + + return torrent_path + + +def _reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed): + """Delete any torrent files that haven't been accessed recently.""" + if not torrent_max_last_accessed: + logging.debug("Reaping old torrent files disabled, skipping...") + return + + logging.debug("Preparing to reap old torrent files," + " torrent_max_last_accessed=%d" % torrent_max_last_accessed) + + for fname in os.listdir(torrent_cache_path): + torrent_path = os.path.join(torrent_cache_path, fname) + last_accessed = time.time() - os.path.getatime(torrent_path) + if last_accessed > torrent_max_last_accessed: + logging.debug("Reaping '%s', last_accessed=%d" % ( + torrent_path, last_accessed)) + utils.delete_if_exists(torrent_path) + + +def _download(torrent_path, save_as_path, torrent_listen_port_start, + torrent_listen_port_end, torrent_download_stall_cutoff): + session = libtorrent.session() + session.listen_on(torrent_listen_port_start, torrent_listen_port_end) + info = libtorrent.torrent_info( + libtorrent.bdecode(open(torrent_path, 'rb').read())) + + torrent = session.add_torrent( + info, save_as_path, + storage_mode=libtorrent.storage_mode_t.storage_mode_sparse) + + try: + last_progress = 0 + last_progress_updated = time.time() + + while not torrent.is_seed(): + s = torrent.status() + + progress = s.progress * 100 + + if progress != last_progress: + last_progress = progress + last_progress_updated = time.time() + + stall_duration = time.time() - last_progress_updated + if stall_duration > torrent_download_stall_cutoff: + logging.error( + "Download stalled: stall_duration=%d," + " torrent_download_stall_cutoff=%d" % ( + stall_duration, torrent_download_stall_cutoff)) + raise Exception("Bittorrent download stall detected, bailing!") + + logging.debug( + '%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d)' + ' %s %s' % (progress, s.download_rate / 1000, + s.upload_rate / 1000, s.num_peers, s.state, + torrent_path)) + time.sleep(1) + finally: + session.remove_torrent(torrent) + + logging.debug("Download of '%s' finished" % torrent_path) + + +def _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance, + torrent_max_seeder_processes_per_host): + if not torrent_seed_duration: + logging.debug("Seeding disabled, skipping...") + return False + + if os.path.exists(seed_path): + logging.debug("Seed is already present, skipping....") + return False + + rand = random.random() + if rand > torrent_seed_chance: + logging.debug("%.2f > %.2f, seeding randomly skipping..." % ( + rand, torrent_seed_chance)) + return False + + num_active_seeders = len(list(_active_seeder_processes())) + if (torrent_max_seeder_processes_per_host >= 0 and + num_active_seeders >= torrent_max_seeder_processes_per_host): + logging.debug("max number of seeder processes for this host reached" + " (%d), skipping..." % + torrent_max_seeder_processes_per_host) + return False + + return True + + +def _seed(torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end): + plugin_path = os.path.dirname(inspect.getabsfile(inspect.currentframe())) + seeder_path = os.path.join(plugin_path, SEEDER_PROCESS) + seed_cmd = "%s %s %s %d %d %d" % ( + seeder_path, torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end) + + seed_proc = utils.make_subprocess(seed_cmd) + utils.finish_subprocess(seed_proc, seed_cmd) + + +def _seed_if_needed(seed_cache_path, tarball_path, torrent_path, + torrent_seed_duration, torrent_seed_chance, + torrent_listen_port_start, torrent_listen_port_end, + torrent_max_seeder_processes_per_host): + seed_filename = os.path.basename(tarball_path) + seed_path = os.path.join(seed_cache_path, seed_filename) + + if _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance, + torrent_max_seeder_processes_per_host): + logging.debug("Preparing to seed '%s' for %d secs" % ( + seed_path, torrent_seed_duration)) + utils._rename(tarball_path, seed_path) + + # Daemonize and seed the image + _seed(torrent_path, seed_cache_path, torrent_seed_duration, + torrent_listen_port_start, torrent_listen_port_end) + else: + utils.delete_if_exists(tarball_path) + + +def _extract_tarball(tarball_path, staging_path): + """Extract the tarball into the staging directory.""" + tarball_fileobj = open(tarball_path, 'rb') + try: + utils.extract_tarball(tarball_fileobj, staging_path) + finally: + tarball_fileobj.close() + + +def _active_seeder_processes(): + """Yields command-line of active seeder processes. + + Roughly equivalent to performing ps | grep _bittorrent_seeder + """ + pids = [pid for pid in os.listdir('/proc') if pid.isdigit()] + for pid in pids: + try: + cmdline = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read() + except IOError, e: + if e.errno != errno.ENOENT: + raise + + if SEEDER_PROCESS in cmdline: + yield cmdline + + +def _reap_finished_seeds(seed_cache_path): + """Delete any cached seeds where the seeder process has died.""" + logging.debug("Preparing to reap finished seeds") + missing = {} + for fname in os.listdir(seed_cache_path): + seed_path = os.path.join(seed_cache_path, fname) + missing[seed_path] = None + + for cmdline in _active_seeder_processes(): + for seed_path in missing.keys(): + seed_filename = os.path.basename(seed_path) + if seed_filename in cmdline: + del missing[seed_path] + + for seed_path in missing: + logging.debug("Reaping cached seed '%s'" % seed_path) + utils.delete_if_exists(seed_path) + + +def _make_seed_cache(): + seed_cache_path = os.environ.get('SEED_CACHE', DEFAULT_SEED_CACHE) + if not os.path.exists(seed_cache_path): + os.mkdir(seed_cache_path) + return seed_cache_path + + +def download_vhd(session, image_id, torrent_base_url, torrent_seed_duration, + torrent_seed_chance, torrent_max_last_accessed, + torrent_listen_port_start, torrent_listen_port_end, + torrent_download_stall_cutoff, uuid_stack, sr_path, + torrent_max_seeder_processes_per_host): + """Download an image from BitTorrent, unbundle it, and then deposit the + VHDs into the storage repository + """ + seed_cache_path = _make_seed_cache() + torrent_cache_path = _make_torrent_cache() + + # Housekeeping + _reap_finished_seeds(seed_cache_path) + _reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed) + + torrent_path = _fetch_torrent_file( + torrent_cache_path, image_id, torrent_base_url) + + staging_path = utils.make_staging_area(sr_path) + try: + tarball_filename = os.path.basename(torrent_path).replace( + '.torrent', '') + tarball_path = os.path.join(staging_path, tarball_filename) + + # Download tarball into staging area + _download(torrent_path, staging_path, torrent_listen_port_start, + torrent_listen_port_end, torrent_download_stall_cutoff) + + # Extract the tarball into the staging area + _extract_tarball(tarball_path, staging_path) + + # Move the VHDs from the staging area into the storage repository + vdi_list = utils.import_vhds(sr_path, staging_path, uuid_stack) + + # Seed image for others in the swarm + _seed_if_needed(seed_cache_path, tarball_path, torrent_path, + torrent_seed_duration, torrent_seed_chance, + torrent_listen_port_start, torrent_listen_port_end, + torrent_max_seeder_processes_per_host) + finally: + utils.cleanup_staging_area(staging_path) + + return vdi_list + + +if __name__ == '__main__': + utils.register_plugin_calls(download_vhd) diff --git a/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py b/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py index 66f46a8390ec..510687d7b4b4 100644 --- a/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py +++ b/plugins/xenserver/xenapi/etc/xapi.d/plugins/utils.py @@ -27,6 +27,16 @@ import XenAPIPlugin CHUNK_SIZE = 8192 +def delete_if_exists(path): + try: + os.unlink(path) + except OSError, e: + if e.errno == errno.ENOENT: + logging.warning("'%s' was already deleted, skipping delete" % path) + else: + raise + + def _link(src, dst): logging.info("Hard-linking file '%s' -> '%s'" % (src, dst)) os.link(src, dst)