Add ability to download images via BitTorrent.
This patch adds a new dom0 plugin which supports downloading images via BitTorrent. Torrent metadata files are assumed to be served from a webserver which is specified by the `torrent_base_url` config. Under the hood, the dom0 plugins calls out to rasterbar's libtorrent via Python bindings in order perform the initial download as well as the seeding thereafter. Implements BP xenserver-bittorrent-images Change-Id: I824720a6e3a37317080a22cd7405d2a88172c3ef
This commit is contained in:
parent
1d4506c16a
commit
216a83da47
@ -406,7 +406,8 @@ global_opts = [
|
|||||||
'image_type',
|
'image_type',
|
||||||
'backup_type',
|
'backup_type',
|
||||||
'min_ram',
|
'min_ram',
|
||||||
'min_disk'],
|
'min_disk',
|
||||||
|
'bittorrent'],
|
||||||
help='These are image properties which a snapshot should not'
|
help='These are image properties which a snapshot should not'
|
||||||
' inherit from an instance'),
|
' inherit from an instance'),
|
||||||
cfg.BoolOpt('defer_iptables_apply',
|
cfg.BoolOpt('defer_iptables_apply',
|
||||||
|
@ -80,6 +80,39 @@ xenapi_vm_utils_opts = [
|
|||||||
cfg.IntOpt('xenapi_num_vbd_unplug_retries',
|
cfg.IntOpt('xenapi_num_vbd_unplug_retries',
|
||||||
default=10,
|
default=10,
|
||||||
help='Maximum number of retries to unplug VBD'),
|
help='Maximum number of retries to unplug VBD'),
|
||||||
|
cfg.StrOpt('xenapi_torrent_images',
|
||||||
|
default='none',
|
||||||
|
help='Whether or not to download images via Bit Torrent '
|
||||||
|
'(all|some|none).'),
|
||||||
|
cfg.StrOpt('xenapi_torrent_base_url',
|
||||||
|
default=None,
|
||||||
|
help='Base URL for torrent files.'),
|
||||||
|
cfg.FloatOpt('xenapi_torrent_seed_chance',
|
||||||
|
default=1.0,
|
||||||
|
help='Probability that peer will become a seeder.'
|
||||||
|
' (1.0 = 100%)'),
|
||||||
|
cfg.IntOpt('xenapi_torrent_seed_duration',
|
||||||
|
default=3600,
|
||||||
|
help='Number of seconds after downloading an image via'
|
||||||
|
' BitTorrent that it should be seeded for other peers.'),
|
||||||
|
cfg.IntOpt('xenapi_torrent_max_last_accessed',
|
||||||
|
default=86400,
|
||||||
|
help='Cached torrent files not accessed within this number of'
|
||||||
|
' seconds can be reaped'),
|
||||||
|
cfg.IntOpt('xenapi_torrent_listen_port_start',
|
||||||
|
default=6881,
|
||||||
|
help='Beginning of port range to listen on'),
|
||||||
|
cfg.IntOpt('xenapi_torrent_listen_port_end',
|
||||||
|
default=6891,
|
||||||
|
help='End of port range to listen on'),
|
||||||
|
cfg.IntOpt('xenapi_torrent_download_stall_cutoff',
|
||||||
|
default=600,
|
||||||
|
help='Number of seconds a download can remain at the same'
|
||||||
|
' progress percentage w/o being considered a stall'),
|
||||||
|
cfg.IntOpt('xenapi_torrent_max_seeder_processes_per_host',
|
||||||
|
default=1,
|
||||||
|
help='Maximum number of seeder processes to run concurrently'
|
||||||
|
' within a given dom0. (-1 = no limit)')
|
||||||
]
|
]
|
||||||
|
|
||||||
FLAGS = flags.FLAGS
|
FLAGS = flags.FLAGS
|
||||||
@ -975,6 +1008,29 @@ def _make_uuid_stack():
|
|||||||
return [str(uuid.uuid4()) for i in xrange(MAX_VDI_CHAIN_SIZE)]
|
return [str(uuid.uuid4()) for i in xrange(MAX_VDI_CHAIN_SIZE)]
|
||||||
|
|
||||||
|
|
||||||
|
def _image_uses_bittorrent(context, instance):
|
||||||
|
bittorrent = False
|
||||||
|
xenapi_torrent_images = FLAGS.xenapi_torrent_images.lower()
|
||||||
|
|
||||||
|
if xenapi_torrent_images == 'all':
|
||||||
|
bittorrent = True
|
||||||
|
elif xenapi_torrent_images == 'some':
|
||||||
|
# FIXME(sirp): This should be eager loaded like instance metadata
|
||||||
|
sys_meta = db.instance_system_metadata_get(context,
|
||||||
|
instance['uuid'])
|
||||||
|
try:
|
||||||
|
bittorrent = utils.bool_from_str(sys_meta['image_bittorrent'])
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
elif xenapi_torrent_images == 'none':
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
LOG.warning(_("Invalid value '%s' for xenapi_torrent_images"),
|
||||||
|
xenapi_torrent_images)
|
||||||
|
|
||||||
|
return bittorrent
|
||||||
|
|
||||||
|
|
||||||
def _fetch_vhd_image(context, session, instance, image_id):
|
def _fetch_vhd_image(context, session, instance, image_id):
|
||||||
"""Tell glance to download an image and put the VHDs into the SR
|
"""Tell glance to download an image and put the VHDs into the SR
|
||||||
|
|
||||||
@ -985,21 +1041,40 @@ def _fetch_vhd_image(context, session, instance, image_id):
|
|||||||
|
|
||||||
params = {'image_id': image_id,
|
params = {'image_id': image_id,
|
||||||
'uuid_stack': _make_uuid_stack(),
|
'uuid_stack': _make_uuid_stack(),
|
||||||
'sr_path': get_sr_path(session),
|
'sr_path': get_sr_path(session)}
|
||||||
'auth_token': getattr(context, 'auth_token', None)}
|
|
||||||
|
|
||||||
glance_api_servers = glance.get_api_servers()
|
if _image_uses_bittorrent(context, instance):
|
||||||
|
plugin_name = 'bittorrent'
|
||||||
|
callback = None
|
||||||
|
params['torrent_base_url'] = FLAGS.xenapi_torrent_base_url
|
||||||
|
params['torrent_seed_duration'] = FLAGS.xenapi_torrent_seed_duration
|
||||||
|
params['torrent_seed_chance'] = FLAGS.xenapi_torrent_seed_chance
|
||||||
|
params['torrent_max_last_accessed'] =\
|
||||||
|
FLAGS.xenapi_torrent_max_last_accessed
|
||||||
|
params['torrent_listen_port_start'] =\
|
||||||
|
FLAGS.xenapi_torrent_listen_port_start
|
||||||
|
params['torrent_listen_port_end'] =\
|
||||||
|
FLAGS.xenapi_torrent_listen_port_end
|
||||||
|
params['torrent_download_stall_cutoff'] =\
|
||||||
|
FLAGS.xenapi_torrent_download_stall_cutoff
|
||||||
|
params['torrent_max_seeder_processes_per_host'] =\
|
||||||
|
FLAGS.xenapi_torrent_max_seeder_processes_per_host
|
||||||
|
else:
|
||||||
|
plugin_name = 'glance'
|
||||||
|
glance_api_servers = glance.get_api_servers()
|
||||||
|
|
||||||
def pick_glance(params):
|
def pick_glance(params):
|
||||||
glance_host, glance_port, glance_use_ssl = glance_api_servers.next()
|
g_host, g_port, g_use_ssl = glance_api_servers.next()
|
||||||
params['glance_host'] = glance_host
|
params['glance_host'] = g_host
|
||||||
params['glance_port'] = glance_port
|
params['glance_port'] = g_port
|
||||||
params['glance_use_ssl'] = glance_use_ssl
|
params['glance_use_ssl'] = g_use_ssl
|
||||||
|
params['auth_token'] = getattr(context, 'auth_token', None)
|
||||||
|
|
||||||
|
callback = pick_glance
|
||||||
|
|
||||||
plugin_name = 'glance'
|
|
||||||
vdis = _fetch_using_dom0_plugin_with_retry(
|
vdis = _fetch_using_dom0_plugin_with_retry(
|
||||||
context, session, image_id, plugin_name, params,
|
context, session, image_id, plugin_name, params,
|
||||||
callback=pick_glance)
|
callback=callback)
|
||||||
|
|
||||||
sr_ref = safe_find_sr(session)
|
sr_ref = safe_find_sr(session)
|
||||||
_scan_sr(session, sr_ref)
|
_scan_sr(session, sr_ref)
|
||||||
|
@ -28,8 +28,10 @@ rm -rf $RPM_BUILD_ROOT
|
|||||||
|
|
||||||
%files
|
%files
|
||||||
%defattr(-,root,root,-)
|
%defattr(-,root,root,-)
|
||||||
|
/etc/xapi.d/plugins/_bittorrent_seeder
|
||||||
/etc/xapi.d/plugins/agent
|
/etc/xapi.d/plugins/agent
|
||||||
/etc/xapi.d/plugins/bandwidth
|
/etc/xapi.d/plugins/bandwidth
|
||||||
|
/etc/xapi.d/plugins/bittorrent
|
||||||
/etc/xapi.d/plugins/glance
|
/etc/xapi.d/plugins/glance
|
||||||
/etc/xapi.d/plugins/kernel
|
/etc/xapi.d/plugins/kernel
|
||||||
/etc/xapi.d/plugins/migration
|
/etc/xapi.d/plugins/migration
|
||||||
|
121
plugins/xenserver/xenapi/etc/xapi.d/plugins/_bittorrent_seeder
Executable file
121
plugins/xenserver/xenapi/etc/xapi.d/plugins/_bittorrent_seeder
Executable file
@ -0,0 +1,121 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
# Copyright (c) 2012 Openstack, LLC
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""Seed a bittorent image. This file should not be executed directly, rather it
|
||||||
|
should be kicked off by the `bittorent` dom0 plugin."""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
import libtorrent
|
||||||
|
|
||||||
|
#FIXME(sirp): should this use pluginlib from 5.6?
|
||||||
|
from pluginlib_nova import *
|
||||||
|
configure_logging('_bittorrent_seeder')
|
||||||
|
|
||||||
|
|
||||||
|
def _daemonize(stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
|
||||||
|
"""
|
||||||
|
do the UNIX double-fork magic, see Stevens' "Advanced
|
||||||
|
Programming in the UNIX Environment" for details (ISBN 0201563177)
|
||||||
|
|
||||||
|
Source: http://www.jejik.com/articles/2007/02/
|
||||||
|
a_simple_unix_linux_daemon_in_python/
|
||||||
|
"""
|
||||||
|
# 1st fork
|
||||||
|
try:
|
||||||
|
pid = os.fork()
|
||||||
|
if pid > 0:
|
||||||
|
# first parent returns
|
||||||
|
return False
|
||||||
|
except OSError, e:
|
||||||
|
logging.error("fork #1 failed: %d (%s)" % (
|
||||||
|
e.errno, e.strerror))
|
||||||
|
return
|
||||||
|
|
||||||
|
# decouple from parent environment
|
||||||
|
os.chdir("/")
|
||||||
|
os.setsid()
|
||||||
|
os.umask(0)
|
||||||
|
|
||||||
|
# 2nd fork
|
||||||
|
try:
|
||||||
|
pid = os.fork()
|
||||||
|
if pid > 0:
|
||||||
|
# second parent exits
|
||||||
|
sys.exit(0)
|
||||||
|
except OSError, e:
|
||||||
|
logging.error("fork #2 failed: %d (%s)" % (
|
||||||
|
e.errno, e.strerror))
|
||||||
|
return
|
||||||
|
|
||||||
|
# redirect standard file descriptors
|
||||||
|
sys.stdout.flush()
|
||||||
|
sys.stderr.flush()
|
||||||
|
si = open(stdin, 'r')
|
||||||
|
so = open(stdout, 'a+')
|
||||||
|
se = open(stderr, 'a+', 0)
|
||||||
|
os.dup2(si.fileno(), sys.stdin.fileno())
|
||||||
|
os.dup2(so.fileno(), sys.stdout.fileno())
|
||||||
|
os.dup2(se.fileno(), sys.stderr.fileno())
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def main(torrent_path, seed_cache_path, torrent_seed_duration,
|
||||||
|
torrent_listen_port_start, torrent_listen_port_end):
|
||||||
|
seed_time = time.time() + torrent_seed_duration
|
||||||
|
logging.debug("Seeding '%s' for %d secs" % (
|
||||||
|
torrent_path, torrent_seed_duration))
|
||||||
|
|
||||||
|
child = _daemonize()
|
||||||
|
if not child:
|
||||||
|
return
|
||||||
|
|
||||||
|
# At this point we're the daemonized child...
|
||||||
|
session = libtorrent.session()
|
||||||
|
session.listen_on(torrent_listen_port_start, torrent_listen_port_end)
|
||||||
|
|
||||||
|
torrent_file = open(torrent_path, 'rb')
|
||||||
|
try:
|
||||||
|
torrent_data = torrent_file.read()
|
||||||
|
finally:
|
||||||
|
torrent_file.close()
|
||||||
|
|
||||||
|
decoded_data = libtorrent.bdecode(torrent_data)
|
||||||
|
|
||||||
|
info = libtorrent.torrent_info(decoded_data)
|
||||||
|
torrent = session.add_torrent(
|
||||||
|
info, seed_cache_path,
|
||||||
|
storage_mode=libtorrent.storage_mode_t.storage_mode_sparse)
|
||||||
|
try:
|
||||||
|
while time.time() < seed_time:
|
||||||
|
time.sleep(5)
|
||||||
|
finally:
|
||||||
|
session.remove_torrent(torrent)
|
||||||
|
|
||||||
|
logging.debug("Seeding of '%s' finished" % torrent_path)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
(torrent_path, seed_cache_path, torrent_seed_duration,
|
||||||
|
torrent_listen_port_start, torrent_listen_port_end) = sys.argv[1:]
|
||||||
|
torrent_seed_duration = int(torrent_seed_duration)
|
||||||
|
torrent_listen_port_start = int(torrent_listen_port_start)
|
||||||
|
torrent_listen_port_end = int(torrent_listen_port_end)
|
||||||
|
|
||||||
|
main(torrent_path, seed_cache_path, torrent_seed_duration,
|
||||||
|
torrent_listen_port_start, torrent_listen_port_end)
|
299
plugins/xenserver/xenapi/etc/xapi.d/plugins/bittorrent
Executable file
299
plugins/xenserver/xenapi/etc/xapi.d/plugins/bittorrent
Executable file
@ -0,0 +1,299 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
# Copyright (c) 2012 Openstack, LLC
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""Download images via BitTorrent."""
|
||||||
|
|
||||||
|
import errno
|
||||||
|
import inspect
|
||||||
|
import os
|
||||||
|
import random
|
||||||
|
import shutil
|
||||||
|
import tempfile
|
||||||
|
import time
|
||||||
|
|
||||||
|
import libtorrent
|
||||||
|
import urllib2
|
||||||
|
import XenAPIPlugin
|
||||||
|
|
||||||
|
import utils
|
||||||
|
|
||||||
|
#FIXME(sirp): should this use pluginlib from 5.6?
|
||||||
|
from pluginlib_nova import *
|
||||||
|
configure_logging('bittorrent')
|
||||||
|
|
||||||
|
DEFAULT_TORRENT_CACHE = '/images/torrents'
|
||||||
|
DEFAULT_SEED_CACHE = '/images/seeds'
|
||||||
|
SEEDER_PROCESS = '_bittorrent_seeder'
|
||||||
|
|
||||||
|
|
||||||
|
def _make_torrent_cache():
|
||||||
|
torrent_cache_path = os.environ.get(
|
||||||
|
'TORRENT_CACHE', DEFAULT_TORRENT_CACHE)
|
||||||
|
|
||||||
|
if not os.path.exists(torrent_cache_path):
|
||||||
|
os.mkdir(torrent_cache_path)
|
||||||
|
|
||||||
|
return torrent_cache_path
|
||||||
|
|
||||||
|
|
||||||
|
def _fetch_torrent_file(torrent_cache_path, image_id, torrent_base_url):
|
||||||
|
torrent_path = os.path.join(
|
||||||
|
torrent_cache_path, image_id + '.torrent')
|
||||||
|
|
||||||
|
if not os.path.exists(torrent_path):
|
||||||
|
torrent_url = torrent_base_url + "/%s.torrent" % image_id
|
||||||
|
logging.info("Downloading %s" % torrent_url)
|
||||||
|
|
||||||
|
# Write contents to temporary path to ensure we don't have partially
|
||||||
|
# completed files in the cache.
|
||||||
|
temp_directory = tempfile.mkdtemp(dir=torrent_cache_path)
|
||||||
|
try:
|
||||||
|
temp_path = os.path.join(
|
||||||
|
temp_directory, os.path.basename(torrent_path))
|
||||||
|
temp_file = open(temp_path, 'wb')
|
||||||
|
try:
|
||||||
|
remote_torrent_file = urllib2.urlopen(torrent_url)
|
||||||
|
shutil.copyfileobj(remote_torrent_file, temp_file)
|
||||||
|
finally:
|
||||||
|
temp_file.close()
|
||||||
|
|
||||||
|
os.rename(temp_path, torrent_path)
|
||||||
|
finally:
|
||||||
|
shutil.rmtree(temp_directory)
|
||||||
|
|
||||||
|
return torrent_path
|
||||||
|
|
||||||
|
|
||||||
|
def _reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed):
|
||||||
|
"""Delete any torrent files that haven't been accessed recently."""
|
||||||
|
if not torrent_max_last_accessed:
|
||||||
|
logging.debug("Reaping old torrent files disabled, skipping...")
|
||||||
|
return
|
||||||
|
|
||||||
|
logging.debug("Preparing to reap old torrent files,"
|
||||||
|
" torrent_max_last_accessed=%d" % torrent_max_last_accessed)
|
||||||
|
|
||||||
|
for fname in os.listdir(torrent_cache_path):
|
||||||
|
torrent_path = os.path.join(torrent_cache_path, fname)
|
||||||
|
last_accessed = time.time() - os.path.getatime(torrent_path)
|
||||||
|
if last_accessed > torrent_max_last_accessed:
|
||||||
|
logging.debug("Reaping '%s', last_accessed=%d" % (
|
||||||
|
torrent_path, last_accessed))
|
||||||
|
utils.delete_if_exists(torrent_path)
|
||||||
|
|
||||||
|
|
||||||
|
def _download(torrent_path, save_as_path, torrent_listen_port_start,
|
||||||
|
torrent_listen_port_end, torrent_download_stall_cutoff):
|
||||||
|
session = libtorrent.session()
|
||||||
|
session.listen_on(torrent_listen_port_start, torrent_listen_port_end)
|
||||||
|
info = libtorrent.torrent_info(
|
||||||
|
libtorrent.bdecode(open(torrent_path, 'rb').read()))
|
||||||
|
|
||||||
|
torrent = session.add_torrent(
|
||||||
|
info, save_as_path,
|
||||||
|
storage_mode=libtorrent.storage_mode_t.storage_mode_sparse)
|
||||||
|
|
||||||
|
try:
|
||||||
|
last_progress = 0
|
||||||
|
last_progress_updated = time.time()
|
||||||
|
|
||||||
|
while not torrent.is_seed():
|
||||||
|
s = torrent.status()
|
||||||
|
|
||||||
|
progress = s.progress * 100
|
||||||
|
|
||||||
|
if progress != last_progress:
|
||||||
|
last_progress = progress
|
||||||
|
last_progress_updated = time.time()
|
||||||
|
|
||||||
|
stall_duration = time.time() - last_progress_updated
|
||||||
|
if stall_duration > torrent_download_stall_cutoff:
|
||||||
|
logging.error(
|
||||||
|
"Download stalled: stall_duration=%d,"
|
||||||
|
" torrent_download_stall_cutoff=%d" % (
|
||||||
|
stall_duration, torrent_download_stall_cutoff))
|
||||||
|
raise Exception("Bittorrent download stall detected, bailing!")
|
||||||
|
|
||||||
|
logging.debug(
|
||||||
|
'%.2f%% complete (down: %.1f kb/s up: %.1f kB/s peers: %d)'
|
||||||
|
' %s %s' % (progress, s.download_rate / 1000,
|
||||||
|
s.upload_rate / 1000, s.num_peers, s.state,
|
||||||
|
torrent_path))
|
||||||
|
time.sleep(1)
|
||||||
|
finally:
|
||||||
|
session.remove_torrent(torrent)
|
||||||
|
|
||||||
|
logging.debug("Download of '%s' finished" % torrent_path)
|
||||||
|
|
||||||
|
|
||||||
|
def _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance,
|
||||||
|
torrent_max_seeder_processes_per_host):
|
||||||
|
if not torrent_seed_duration:
|
||||||
|
logging.debug("Seeding disabled, skipping...")
|
||||||
|
return False
|
||||||
|
|
||||||
|
if os.path.exists(seed_path):
|
||||||
|
logging.debug("Seed is already present, skipping....")
|
||||||
|
return False
|
||||||
|
|
||||||
|
rand = random.random()
|
||||||
|
if rand > torrent_seed_chance:
|
||||||
|
logging.debug("%.2f > %.2f, seeding randomly skipping..." % (
|
||||||
|
rand, torrent_seed_chance))
|
||||||
|
return False
|
||||||
|
|
||||||
|
num_active_seeders = len(list(_active_seeder_processes()))
|
||||||
|
if (torrent_max_seeder_processes_per_host >= 0 and
|
||||||
|
num_active_seeders >= torrent_max_seeder_processes_per_host):
|
||||||
|
logging.debug("max number of seeder processes for this host reached"
|
||||||
|
" (%d), skipping..." %
|
||||||
|
torrent_max_seeder_processes_per_host)
|
||||||
|
return False
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def _seed(torrent_path, seed_cache_path, torrent_seed_duration,
|
||||||
|
torrent_listen_port_start, torrent_listen_port_end):
|
||||||
|
plugin_path = os.path.dirname(inspect.getabsfile(inspect.currentframe()))
|
||||||
|
seeder_path = os.path.join(plugin_path, SEEDER_PROCESS)
|
||||||
|
seed_cmd = "%s %s %s %d %d %d" % (
|
||||||
|
seeder_path, torrent_path, seed_cache_path, torrent_seed_duration,
|
||||||
|
torrent_listen_port_start, torrent_listen_port_end)
|
||||||
|
|
||||||
|
seed_proc = utils.make_subprocess(seed_cmd)
|
||||||
|
utils.finish_subprocess(seed_proc, seed_cmd)
|
||||||
|
|
||||||
|
|
||||||
|
def _seed_if_needed(seed_cache_path, tarball_path, torrent_path,
|
||||||
|
torrent_seed_duration, torrent_seed_chance,
|
||||||
|
torrent_listen_port_start, torrent_listen_port_end,
|
||||||
|
torrent_max_seeder_processes_per_host):
|
||||||
|
seed_filename = os.path.basename(tarball_path)
|
||||||
|
seed_path = os.path.join(seed_cache_path, seed_filename)
|
||||||
|
|
||||||
|
if _should_seed(seed_path, torrent_seed_duration, torrent_seed_chance,
|
||||||
|
torrent_max_seeder_processes_per_host):
|
||||||
|
logging.debug("Preparing to seed '%s' for %d secs" % (
|
||||||
|
seed_path, torrent_seed_duration))
|
||||||
|
utils._rename(tarball_path, seed_path)
|
||||||
|
|
||||||
|
# Daemonize and seed the image
|
||||||
|
_seed(torrent_path, seed_cache_path, torrent_seed_duration,
|
||||||
|
torrent_listen_port_start, torrent_listen_port_end)
|
||||||
|
else:
|
||||||
|
utils.delete_if_exists(tarball_path)
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_tarball(tarball_path, staging_path):
|
||||||
|
"""Extract the tarball into the staging directory."""
|
||||||
|
tarball_fileobj = open(tarball_path, 'rb')
|
||||||
|
try:
|
||||||
|
utils.extract_tarball(tarball_fileobj, staging_path)
|
||||||
|
finally:
|
||||||
|
tarball_fileobj.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _active_seeder_processes():
|
||||||
|
"""Yields command-line of active seeder processes.
|
||||||
|
|
||||||
|
Roughly equivalent to performing ps | grep _bittorrent_seeder
|
||||||
|
"""
|
||||||
|
pids = [pid for pid in os.listdir('/proc') if pid.isdigit()]
|
||||||
|
for pid in pids:
|
||||||
|
try:
|
||||||
|
cmdline = open(os.path.join('/proc', pid, 'cmdline'), 'rb').read()
|
||||||
|
except IOError, e:
|
||||||
|
if e.errno != errno.ENOENT:
|
||||||
|
raise
|
||||||
|
|
||||||
|
if SEEDER_PROCESS in cmdline:
|
||||||
|
yield cmdline
|
||||||
|
|
||||||
|
|
||||||
|
def _reap_finished_seeds(seed_cache_path):
|
||||||
|
"""Delete any cached seeds where the seeder process has died."""
|
||||||
|
logging.debug("Preparing to reap finished seeds")
|
||||||
|
missing = {}
|
||||||
|
for fname in os.listdir(seed_cache_path):
|
||||||
|
seed_path = os.path.join(seed_cache_path, fname)
|
||||||
|
missing[seed_path] = None
|
||||||
|
|
||||||
|
for cmdline in _active_seeder_processes():
|
||||||
|
for seed_path in missing.keys():
|
||||||
|
seed_filename = os.path.basename(seed_path)
|
||||||
|
if seed_filename in cmdline:
|
||||||
|
del missing[seed_path]
|
||||||
|
|
||||||
|
for seed_path in missing:
|
||||||
|
logging.debug("Reaping cached seed '%s'" % seed_path)
|
||||||
|
utils.delete_if_exists(seed_path)
|
||||||
|
|
||||||
|
|
||||||
|
def _make_seed_cache():
|
||||||
|
seed_cache_path = os.environ.get('SEED_CACHE', DEFAULT_SEED_CACHE)
|
||||||
|
if not os.path.exists(seed_cache_path):
|
||||||
|
os.mkdir(seed_cache_path)
|
||||||
|
return seed_cache_path
|
||||||
|
|
||||||
|
|
||||||
|
def download_vhd(session, image_id, torrent_base_url, torrent_seed_duration,
|
||||||
|
torrent_seed_chance, torrent_max_last_accessed,
|
||||||
|
torrent_listen_port_start, torrent_listen_port_end,
|
||||||
|
torrent_download_stall_cutoff, uuid_stack, sr_path,
|
||||||
|
torrent_max_seeder_processes_per_host):
|
||||||
|
"""Download an image from BitTorrent, unbundle it, and then deposit the
|
||||||
|
VHDs into the storage repository
|
||||||
|
"""
|
||||||
|
seed_cache_path = _make_seed_cache()
|
||||||
|
torrent_cache_path = _make_torrent_cache()
|
||||||
|
|
||||||
|
# Housekeeping
|
||||||
|
_reap_finished_seeds(seed_cache_path)
|
||||||
|
_reap_old_torrent_files(torrent_cache_path, torrent_max_last_accessed)
|
||||||
|
|
||||||
|
torrent_path = _fetch_torrent_file(
|
||||||
|
torrent_cache_path, image_id, torrent_base_url)
|
||||||
|
|
||||||
|
staging_path = utils.make_staging_area(sr_path)
|
||||||
|
try:
|
||||||
|
tarball_filename = os.path.basename(torrent_path).replace(
|
||||||
|
'.torrent', '')
|
||||||
|
tarball_path = os.path.join(staging_path, tarball_filename)
|
||||||
|
|
||||||
|
# Download tarball into staging area
|
||||||
|
_download(torrent_path, staging_path, torrent_listen_port_start,
|
||||||
|
torrent_listen_port_end, torrent_download_stall_cutoff)
|
||||||
|
|
||||||
|
# Extract the tarball into the staging area
|
||||||
|
_extract_tarball(tarball_path, staging_path)
|
||||||
|
|
||||||
|
# Move the VHDs from the staging area into the storage repository
|
||||||
|
vdi_list = utils.import_vhds(sr_path, staging_path, uuid_stack)
|
||||||
|
|
||||||
|
# Seed image for others in the swarm
|
||||||
|
_seed_if_needed(seed_cache_path, tarball_path, torrent_path,
|
||||||
|
torrent_seed_duration, torrent_seed_chance,
|
||||||
|
torrent_listen_port_start, torrent_listen_port_end,
|
||||||
|
torrent_max_seeder_processes_per_host)
|
||||||
|
finally:
|
||||||
|
utils.cleanup_staging_area(staging_path)
|
||||||
|
|
||||||
|
return vdi_list
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
utils.register_plugin_calls(download_vhd)
|
@ -27,6 +27,16 @@ import XenAPIPlugin
|
|||||||
CHUNK_SIZE = 8192
|
CHUNK_SIZE = 8192
|
||||||
|
|
||||||
|
|
||||||
|
def delete_if_exists(path):
|
||||||
|
try:
|
||||||
|
os.unlink(path)
|
||||||
|
except OSError, e:
|
||||||
|
if e.errno == errno.ENOENT:
|
||||||
|
logging.warning("'%s' was already deleted, skipping delete" % path)
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
def _link(src, dst):
|
def _link(src, dst):
|
||||||
logging.info("Hard-linking file '%s' -> '%s'" % (src, dst))
|
logging.info("Hard-linking file '%s' -> '%s'" % (src, dst))
|
||||||
os.link(src, dst)
|
os.link(src, dst)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user