Fixup python code in bindings to pass PEP8 codestyle

* Passes Flake8 default settings.
 * Tabs replace spaces
 * Ensure print_function available for python2
This commit is contained in:
Calum Lind 2017-01-20 15:07:09 +00:00 committed by Arvid Norberg
parent c4c80b59d5
commit d32ed4b9de
6 changed files with 456 additions and 379 deletions

View File

@ -3,13 +3,14 @@
# Copyright Daniel Wallin 2006. Use, modification and distribution is
# subject to the Boost Software License, Version 1.0. (See accompanying
# file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
from __future__ import print_function
import sys
import atexit
import libtorrent as lt
import time
import os.path
import sys
class WindowsConsole:
def __init__(self):
@ -27,6 +28,7 @@ class WindowsConsole:
return msvcrt.getch()
return None
class UnixConsole:
def __init__(self):
self.fd = sys.stdin
@ -51,11 +53,13 @@ class UnixConsole:
sys.stdout.flush()
def sleep_and_input(self, seconds):
read,_,_ = select.select([self.fd.fileno()], [], [], seconds)
read, __, __ = select.select(
[self.fd.fileno()], [], [], seconds)
if len(read) > 0:
return self.fd.read(1)
return None
if os.name == 'nt':
import Console
import msvcrt
@ -63,9 +67,11 @@ else:
import termios
import select
def write_line(console, line):
console.write(line)
def add_suffix(val):
prefix = ['B', 'kB', 'MB', 'GB', 'TB']
for i in range(len(prefix)):
@ -78,14 +84,17 @@ def add_suffix(val):
return '%6.3gPB' % val
def progress_bar(progress, width):
assert(progress <= 1)
progress_chars = int(progress * width + 0.5)
return progress_chars * '#' + (width - progress_chars) * '-'
def print_peer_info(console, peers):
out = ' down (total ) up (total ) q r flags block progress client\n'
out = (' down (total ) up (total )'
' q r flags block progress client\n')
for p in peers:
@ -96,23 +105,36 @@ def print_peer_info(console, peers):
out += '%2d ' % p.download_queue_length
out += '%2d ' % p.upload_queue_length
if p.flags & lt.peer_info.interesting: out += 'I'
else: out += '.'
if p.flags & lt.peer_info.choked: out += 'C'
else: out += '.'
if p.flags & lt.peer_info.remote_interested: out += 'i'
else: out += '.'
if p.flags & lt.peer_info.remote_choked: out += 'c'
else: out += '.'
if p.flags & lt.peer_info.supports_extensions: out += 'e'
else: out += '.'
if p.flags & lt.peer_info.local_connection: out += 'l'
else: out += 'r'
if p.flags & lt.peer_info.interesting:
out += 'I'
else:
out += '.'
if p.flags & lt.peer_info.choked:
out += 'C'
else:
out += '.'
if p.flags & lt.peer_info.remote_interested:
out += 'i'
else:
out += '.'
if p.flags & lt.peer_info.remote_choked:
out += 'c'
else:
out += '.'
if p.flags & lt.peer_info.supports_extensions:
out += 'e'
else:
out += '.'
if p.flags & lt.peer_info.local_connection:
out += 'l'
else:
out += 'r'
out += ' '
if p.downloading_piece_index >= 0:
assert(p.downloading_progress <= p.downloading_total)
out += progress_bar(float(p.downloading_progress) / p.downloading_total, 15)
out += progress_bar(float(p.downloading_progress) /
p.downloading_total, 15)
else:
out += progress_bar(0, 15)
out += ' '
@ -120,9 +142,9 @@ def print_peer_info(console, peers):
if p.flags & lt.peer_info.handshake:
id = 'waiting for handshake'
elif p.flags & lt.peer_info.connecting:
id = 'connecting to peer'
id = 'connecting to peer'
elif p.flags & lt.peer_info.queued:
id = 'queued'
id = 'queued'
else:
id = p.client
@ -136,7 +158,7 @@ def print_download_queue(console, download_queue):
out = ""
for e in download_queue:
out += '%4d: [' % e['piece_index'];
out += '%4d: [' % e['piece_index']
for b in e['blocks']:
s = b['state']
if s == 3:
@ -151,32 +173,36 @@ def print_download_queue(console, download_queue):
write_line(console, out)
def main():
from optparse import OptionParser
parser = OptionParser()
parser.add_option('-p', '--port',
type='int', help='set listening port')
parser.add_option('-p', '--port', type='int', help='set listening port')
parser.add_option('-d', '--max-download-rate',
type='float', help='the maximum download rate given in kB/s. 0 means infinite.')
parser.add_option(
'-d', '--max-download-rate', type='float',
help='the maximum download rate given in kB/s. 0 means infinite.')
parser.add_option('-u', '--max-upload-rate',
type='float', help='the maximum upload rate given in kB/s. 0 means infinite.')
parser.add_option(
'-u', '--max-upload-rate', type='float',
help='the maximum upload rate given in kB/s. 0 means infinite.')
parser.add_option('-s', '--save-path',
type='string', help='the path where the downloaded file/folder should be placed.')
parser.add_option(
'-s', '--save-path', type='string',
help='the path where the downloaded file/folder should be placed.')
parser.add_option('-r', '--proxy-host',
type='string', help='sets HTTP proxy host and port (separated by \':\')')
parser.add_option(
'-r', '--proxy-host', type='string',
help='sets HTTP proxy host and port (separated by \':\')')
parser.set_defaults(
port=6881
, max_download_rate=0
, max_upload_rate=0
, save_path='.'
, proxy_host=''
port=6881,
max_download_rate=0,
max_upload_rate=0,
save_path='.',
proxy_host=''
)
(options, args) = parser.parse_args()
@ -220,14 +246,17 @@ def main():
atp["paused"] = False
atp["auto_managed"] = True
atp["duplicate_is_error"] = True
if f.startswith('magnet:') or f.startswith('http://') or f.startswith('https://'):
if f.startswith('magnet:') or f.startswith(
'http://') or f.startswith('https://'):
atp["url"] = f
else:
info = lt.torrent_info(f)
print('Adding \'%s\'...' % info.name())
try:
atp["resume_data"] = open(os.path.join(options.save_path, info.name() + '.fastresume'), 'rb').read()
atp["resume_data"] = open(os.path.join(
options.save_path,
info.name() + '.fastresume'), 'rb').read()
except:
pass
@ -261,8 +290,8 @@ def main():
s = h.status()
if s.state != lt.torrent_status.seeding:
state_str = ['queued', 'checking', 'downloading metadata', \
'downloading', 'finished', 'seeding', \
state_str = ['queued', 'checking', 'downloading metadata',
'downloading', 'finished', 'seeding',
'allocating', 'checking fastresume']
out += state_str[s.state] + ' '
@ -295,7 +324,7 @@ def main():
out = '\n'
fp = h.file_progress()
ti = h.get_torrent_info()
for f,p in zip(ti.files(), fp):
for f, p in zip(ti.files(), fp):
out += progress_bar(p / float(f.size), 20)
out += ' ' + f.path + '\n'
write_line(console, out)
@ -308,7 +337,8 @@ def main():
while 1:
a = ses.pop_alert()
if not a: break
if not a:
break
alerts.append(a)
if len(alerts) > 8:
@ -326,20 +356,24 @@ def main():
continue
if c == 'r':
for h in handles: h.force_reannounce()
for h in handles:
h.force_reannounce()
elif c == 'q':
alive = False
elif c == 'p':
for h in handles: h.pause()
for h in handles:
h.pause()
elif c == 'u':
for h in handles: h.resume()
for h in handles:
h.resume()
ses.pause()
for h in handles:
if not h.is_valid() or not h.has_metadata():
continue
data = lt.bencode(h.write_resume_data())
open(os.path.join(options.save_path, h.get_torrent_info().name() + '.fastresume'), 'wb').write(data)
open(os.path.join(options.save_path, h.get_torrent_info().name() +
'.fastresume'), 'wb').write(data)
main()

View File

@ -1,43 +1,47 @@
#!/usr/bin/env python
from __future__ import print_function
import sys
import os
import libtorrent
if len(sys.argv) < 3:
print('usage make_torrent.py file tracker-url')
sys.exit(1)
print('usage make_torrent.py file tracker-url')
sys.exit(1)
input = os.path.abspath(sys.argv[1])
fs = libtorrent.file_storage()
#def predicate(f):
# print f
# return True
#libtorrent.add_files(fs, input, predicate)
# def predicate(f):
# print f
# return True
# libtorrent.add_files(fs, input, predicate)
parent_input = os.path.split(input)[0]
for root, dirs, files in os.walk(input):
# skip directories starting with .
if os.path.split(root)[1][0] == '.': continue
# skip directories starting with .
if os.path.split(root)[1][0] == '.':
continue
for f in files:
# skip files starting with .
if f[0] == '.': continue
for f in files:
# skip files starting with .
if f[0] == '.':
continue
# skip thumbs.db on windows
if f == 'Thumbs.db': continue
# skip thumbs.db on windows
if f == 'Thumbs.db':
continue
fname = os.path.join(root[len(parent_input)+1:], f)
size = os.path.getsize(os.path.join(parent_input, fname))
print('%10d kiB %s' % (size / 1024, fname))
fs.add_file(fname, size);
fname = os.path.join(root[len(parent_input)+1:], f)
size = os.path.getsize(os.path.join(parent_input, fname))
print('%10d kiB %s' % (size / 1024, fname))
fs.add_file(fname, size)
if fs.num_files() == 0:
print('no files added')
sys.exit(1)
print('no files added')
sys.exit(1)
t = libtorrent.create_torrent(fs, 0, 4 * 1024 * 1024)
@ -50,4 +54,3 @@ sys.stderr.write('\n')
f = open('out.torrent', 'wb+')
f.write(libtorrent.bencode(t.generate()))
f.close()

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python
from __future__ import print_function
from distutils.core import setup, Extension
from distutils.sysconfig import get_config_vars
@ -7,129 +8,144 @@ import platform
import sys
import shutil
import multiprocessing
import subprocess
class flags_parser:
def __init__(self):
self.include_dirs = []
self.library_dirs = []
self.libraries = []
def __init__(self):
self.include_dirs = []
self.library_dirs = []
self.libraries = []
def parse(self, args):
"""Parse out the -I -L -l directives
Returns:
list: All other arguments
"""
ret = []
for token in args.split():
prefix = token[:2]
if prefix == '-I':
self.include_dirs.append(token[2:])
elif prefix == '-L':
self.library_dirs.append(token[2:])
elif prefix == '-l':
self.libraries.append(token[2:])
else:
ret.append(token)
return ret
def parse(self, args):
"""Parse out the -I -L -l directives and return a list of all other arguments"""
ret = []
for token in args.split():
prefix = token[:2]
if prefix == '-I':
self.include_dirs.append(token[2:])
elif prefix == '-L':
self.library_dirs.append(token[2:])
elif prefix == '-l':
self.libraries.append(token[2:])
else:
ret.append(token)
return ret
def arch():
if platform.system() != 'Darwin': return []
a = os.uname()[4]
if a == 'Power Macintosh': a = 'ppc'
return ['-arch', a]
if platform.system() == 'Darwin':
__, __, machine = platform.mac_ver()
if machine.startswith('ppc'):
return ['-arch', machine]
return []
def target_specific():
if platform.system() == 'Darwin':
# On mavericks, clang will fail when unknown arguments are passed in.
# python distutils will pass in arguments it doesn't know about.
return ['-Wno-error=unused-command-line-argument-hard-error-in-future']
return []
if platform.system() != 'Darwin': return []
# on mavericks, clang will fail when unknown arguments are
# passed in. python distutils will pass in arguments it doesn't
# know about
return ['-Wno-error=unused-command-line-argument-hard-error-in-future']
try:
with open('compile_flags') as _file:
extra_cmd = _file.read()
with open('compile_flags') as _file:
extra_cmd = _file.read()
except:
extra_cmd = None
extra_cmd = None
try:
with open('link_flags') as _file:
ldflags = _file.read()
with open('link_flags') as _file:
ldflags = _file.read()
except:
ldflags = None
ldflags = None
ext = None
packages = None
if '--bjam' in sys.argv:
del sys.argv[sys.argv.index('--bjam')]
if '--bjam' in sys.argv:
del sys.argv[sys.argv.index('--bjam')]
if '--help' not in sys.argv and '--help-commands' not in sys.argv:
if platform.system() == 'Windows':
file_ext = '.pyd'
else:
file_ext = '.so'
if not '--help' in sys.argv \
and not '--help-commands' in sys.argv:
parallel_builds = ' -j%d' % multiprocessing.cpu_count()
file_ext = '.so'
# build libtorrent using bjam and build the installer with distutils
cmdline = ('b2 libtorrent-link=static boost-link=static release '
'optimization=space stage_module --abbreviate-paths' +
parallel_builds)
print(cmdline)
if os.system(cmdline) != 0:
print('build failed')
sys.exit(1)
if platform.system() == 'Windows':
file_ext = '.pyd'
try:
os.mkdir('build')
except:
pass
try:
shutil.rmtree('build/lib')
except:
pass
try:
os.mkdir('build/lib')
except:
pass
try:
os.mkdir('libtorrent')
except:
pass
shutil.copyfile('libtorrent' + file_ext,
'build/lib/libtorrent' + file_ext)
parallel_builds = ' -j%d' % multiprocessing.cpu_count()
# build libtorrent using bjam and build the installer with distutils
cmdline = 'b2 libtorrent-link=static boost-link=static release optimization=space stage_module --abbreviate-paths' + parallel_builds
print(cmdline)
if os.system(cmdline) != 0:
print('build failed')
sys.exit(1)
try: os.mkdir('build')
except: pass
try: shutil.rmtree('build/lib')
except: pass
try: os.mkdir('build/lib')
except: pass
try: os.mkdir('libtorrent')
except: pass
shutil.copyfile('libtorrent' + file_ext, 'build/lib/libtorrent' + file_ext)
packages = ['libtorrent']
packages = ['libtorrent']
else:
# Remove the '-Wstrict-prototypes' compiler option, which isn't valid for C++.
cfg_vars = get_config_vars()
for key, value in cfg_vars.items():
if isinstance(value, str):
cfg_vars[key] = value.replace('-Wstrict-prototypes', '')
# Remove '-Wstrict-prototypes' compiler option, which isn't valid for C++.
cfg_vars = get_config_vars()
for key, value in cfg_vars.items():
if isinstance(value, str):
cfg_vars[key] = value.replace('-Wstrict-prototypes', '')
source_list = os.listdir(os.path.join(os.path.dirname(__file__), "src"))
source_list = [os.path.abspath(os.path.join(os.path.dirname(__file__), "src", s)) for s in source_list if s.endswith(".cpp")]
source_list = os.listdir(os.path.join(os.path.dirname(__file__), "src"))
source_list = [os.path.abspath(os.path.join(os.path.dirname(__file__),
"src", s)) for s in source_list if s.endswith(".cpp")]
if extra_cmd:
flags = flags_parser()
# ldflags must be parsed first to ensure the correct library search path order
extra_link = flags.parse(ldflags)
extra_compile = flags.parse(extra_cmd)
if extra_cmd:
flags = flags_parser()
# ldflags parsed first to ensure the correct library search path order
extra_link = flags.parse(ldflags)
extra_compile = flags.parse(extra_cmd)
ext = [Extension('libtorrent',
sources = source_list,
language='c++',
include_dirs = flags.include_dirs,
library_dirs = flags.library_dirs,
extra_link_args = extra_link + arch(),
extra_compile_args = extra_compile + arch() + target_specific(),
libraries = ['torrent-rasterbar'] + flags.libraries)]
ext = [Extension(
'libtorrent',
sources=source_list,
language='c++',
include_dirs=flags.include_dirs,
library_dirs=flags.library_dirs,
extra_link_args=extra_link + arch(),
extra_compile_args=extra_compile + arch() + target_specific(),
libraries=['torrent-rasterbar'] + flags.libraries)
]
setup(name = 'python-libtorrent',
version = '1.2.0',
author = 'Arvid Norberg',
author_email = 'arvid@libtorrent.org',
description = 'Python bindings for libtorrent-rasterbar',
long_description = 'Python bindings for libtorrent-rasterbar',
url = 'http://libtorrent.org',
platforms = [platform.system() + '-' + platform.machine()],
license = 'BSD',
packages = packages,
ext_modules = ext
setup(
name='python-libtorrent',
version='1.2.0',
author='Arvid Norberg',
author_email='arvid@libtorrent.org',
description='Python bindings for libtorrent-rasterbar',
long_description='Python bindings for libtorrent-rasterbar',
url='http://libtorrent.org',
platforms=[platform.system() + '-' + platform.machine()],
license='BSD',
packages=packages,
ext_modules=ext
)

View File

@ -16,21 +16,23 @@ h = ses.add_torrent({'ti': info, 'save_path': '.'})
print('starting', h.name())
while (not h.is_seed()):
s = h.status()
s = h.status()
state_str = ['queued', 'checking', 'downloading metadata', \
'downloading', 'finished', 'seeding', 'allocating', 'checking fastresume']
print('\r%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d) %s' % \
(s.progress * 100, s.download_rate / 1000, s.upload_rate / 1000, \
s.num_peers, state_str[s.state]), end=' ')
state_str = [
'queued', 'checking', 'downloading metadata',
'downloading', 'finished', 'seeding', 'allocating',
'checking fastresume']
print('\r%.2f%% complete (down: %.1f kB/s up: %.1f kB/s peers: %d) %s' % (
s.progress * 100, s.download_rate / 1000, s.upload_rate / 1000,
s.num_peers, state_str[s.state]), end=' ')
alerts = ses.pop_alerts()
for a in alerts:
if a.category() & lt.alert.category_t.error_notification:
print(a)
alerts = ses.pop_alerts()
for a in alerts:
if a.category() & lt.alert.category_t.error_notification:
print(a)
sys.stdout.flush()
sys.stdout.flush()
time.sleep(1)
time.sleep(1)
print(h.name(), 'complete')

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python
from __future__ import print_function
import libtorrent as lt
@ -8,204 +9,219 @@ import os
import shutil
import binascii
class test_create_torrent(unittest.TestCase):
def test_from_torrent_info(self):
ti = lt.torrent_info('unordered.torrent')
ct = lt.create_torrent(ti)
entry = ct.generate()
content = lt.bencode(entry).strip()
with open('unordered.torrent', 'rb') as f:
file_content = bytearray(f.read().strip())
print(content)
print(file_content)
print(entry)
self.assertEqual(content, file_content)
def test_from_torrent_info(self):
ti = lt.torrent_info('unordered.torrent')
ct = lt.create_torrent(ti)
entry = ct.generate()
content = lt.bencode(entry).strip()
with open('unordered.torrent', 'rb') as f:
file_content = bytearray(f.read().strip())
print(content)
print(file_content)
print(entry)
self.assertEqual(content, file_content)
class test_session_stats(unittest.TestCase):
def test_unique(self):
l = lt.session_stats_metrics()
self.assertTrue(len(l) > 40);
idx = set()
for m in l:
self.assertTrue(m.value_index not in idx)
idx.add(m.value_index)
def test_unique(self):
l = lt.session_stats_metrics()
self.assertTrue(len(l) > 40)
idx = set()
for m in l:
self.assertTrue(m.value_index not in idx)
idx.add(m.value_index)
def test_find_idx(self):
self.assertEqual(lt.find_metric_idx("peer.error_peers"), 0)
def test_find_idx(self):
self.assertEqual(lt.find_metric_idx("peer.error_peers"), 0)
class test_torrent_handle(unittest.TestCase):
def setup(self):
self.ses = lt.session({'alert_mask': lt.alert.category_t.all_categories, 'enable_dht': False})
self.ti = lt.torrent_info('url_seed_multi.torrent');
self.h = self.ses.add_torrent({'ti': self.ti, 'save_path': os.getcwd()})
def setup(self):
self.ses = lt.session({
'alert_mask': lt.alert.category_t.all_categories,
'enable_dht': False})
self.ti = lt.torrent_info('url_seed_multi.torrent')
self.h = self.ses.add_torrent({
'ti': self.ti, 'save_path': os.getcwd()})
def test_torrent_handle(self):
self.setup()
self.assertEqual(self.h.file_priorities(), [4,4])
self.assertEqual(self.h.piece_priorities(), [4])
def test_torrent_handle(self):
self.setup()
self.assertEqual(self.h.file_priorities(), [4, 4])
self.assertEqual(self.h.piece_priorities(), [4])
self.h.prioritize_files([0,1])
self.assertEqual(self.h.file_priorities(), [0,1])
self.h.prioritize_files([0, 1])
self.assertEqual(self.h.file_priorities(), [0, 1])
self.h.prioritize_pieces([0])
self.assertEqual(self.h.piece_priorities(), [0])
self.h.prioritize_pieces([0])
self.assertEqual(self.h.piece_priorities(), [0])
# also test the overload that takes a list of piece->priority mappings
self.h.prioritize_pieces([(0, 1)])
self.assertEqual(self.h.piece_priorities(), [1])
# also test the overload that takes a list of piece->priority mappings
self.h.prioritize_pieces([(0, 1)])
self.assertEqual(self.h.piece_priorities(), [1])
def test_file_status(self):
self.setup()
l = self.h.file_status()
print(l)
def test_file_status(self):
self.setup()
l = self.h.file_status()
print(l)
def test_piece_deadlines(self):
self.setup()
self.h.clear_piece_deadlines()
def test_piece_deadlines(self):
self.setup()
self.h.clear_piece_deadlines()
def test_torrent_status(self):
self.setup()
st = self.h.status()
ti = st.handle;
self.assertEqual(ti.info_hash(), self.ti.info_hash())
# make sure we can compare torrent_status objects
st2 = self.h.status()
self.assertEqual(st2, st)
def test_torrent_status(self):
self.setup()
st = self.h.status()
ti = st.handle
self.assertEqual(ti.info_hash(), self.ti.info_hash())
# make sure we can compare torrent_status objects
st2 = self.h.status()
self.assertEqual(st2, st)
def test_read_resume_data(self):
def test_read_resume_data(self):
resume_data = lt.bencode({'file-format': 'libtorrent resume file',
'info-hash': 'abababababababababab',
'name': 'test',
'save_path': '.',
'peers': '\x01\x01\x01\x01\x00\x01\x02\x02\x02\x02\x00\x02',
'file_priority': [0, 1, 1]})
tp = lt.read_resume_data(resume_data)
resume_data = lt.bencode({
'file-format': 'libtorrent resume file',
'info-hash': 'abababababababababab',
'name': 'test',
'save_path': '.',
'peers': '\x01\x01\x01\x01\x00\x01\x02\x02\x02\x02\x00\x02',
'file_priority': [0, 1, 1]})
tp = lt.read_resume_data(resume_data)
self.assertEqual(tp.name, 'test')
self.assertEqual(tp.info_hash, lt.sha1_hash('abababababababababab'))
self.assertEqual(tp.file_priorities, [0, 1, 1])
self.assertEqual(tp.peers, [('1.1.1.1', 1), ('2.2.2.2', 2)])
self.assertEqual(tp.name, 'test')
self.assertEqual(tp.info_hash, lt.sha1_hash('abababababababababab'))
self.assertEqual(tp.file_priorities, [0, 1, 1])
self.assertEqual(tp.peers, [('1.1.1.1', 1), ('2.2.2.2', 2)])
ses = lt.session({'alert_mask': lt.alert.category_t.all_categories})
h = ses.add_torrent(tp)
ses = lt.session({'alert_mask': lt.alert.category_t.all_categories})
h = ses.add_torrent(tp)
h.connect_peer(('3.3.3.3', 3))
h.connect_peer(('3.3.3.3', 3))
for i in range(0, 10):
alerts = ses.pop_alerts()
for a in alerts:
print(a.message())
time.sleep(0.1)
for i in range(0, 10):
alerts = ses.pop_alerts()
for a in alerts:
print(a.message())
time.sleep(0.1)
def test_scrape(self):
self.setup()
# this is just to make sure this function can be called like this
# from python
self.h.scrape_tracker()
def test_scrape(self):
self.setup()
# this is just to make sure this function can be called like this
# from python
self.h.scrape_tracker()
def test_cache_info(self):
self.setup()
cs = self.ses.get_cache_info(self.h)
self.assertEqual(cs.pieces, [])
def test_cache_info(self):
self.setup()
cs = self.ses.get_cache_info(self.h)
self.assertEqual(cs.pieces, [])
class test_torrent_info(unittest.TestCase):
def test_bencoded_constructor(self):
info = lt.torrent_info({ 'info': {'name': 'test_torrent', 'length': 1234,
'piece length': 16 * 1024,
'pieces': 'aaaaaaaaaaaaaaaaaaaa'}})
def test_bencoded_constructor(self):
info = lt.torrent_info({'info': {
'name': 'test_torrent', 'length': 1234,
'piece length': 16 * 1024,
'pieces': 'aaaaaaaaaaaaaaaaaaaa'}})
self.assertEqual(info.num_files(), 1)
self.assertEqual(info.num_files(), 1)
f = info.files()
self.assertEqual(f.file_path(0), 'test_torrent')
self.assertEqual(f.file_size(0), 1234)
self.assertEqual(info.total_size(), 1234)
f = info.files()
self.assertEqual(f.file_path(0), 'test_torrent')
self.assertEqual(f.file_size(0), 1234)
self.assertEqual(info.total_size(), 1234)
def test_metadata(self):
ti = lt.torrent_info('base.torrent');
def test_metadata(self):
ti = lt.torrent_info('base.torrent')
self.assertTrue(len(ti.metadata()) != 0)
self.assertTrue(len(ti.hash_for_piece(0)) != 0)
self.assertTrue(len(ti.metadata()) != 0)
self.assertTrue(len(ti.hash_for_piece(0)) != 0)
def test_web_seeds(self):
ti = lt.torrent_info('base.torrent');
def test_web_seeds(self):
ti = lt.torrent_info('base.torrent')
ws = [{'url': 'http://foo/test', 'auth': '', 'type': 0},
{'url': 'http://bar/test', 'auth': '', 'type': 1} ]
ti.set_web_seeds(ws)
web_seeds = ti.web_seeds()
self.assertEqual(len(ws), len(web_seeds))
for i in range(len(web_seeds)):
self.assertEqual(web_seeds[i]["url"], ws[i]["url"])
self.assertEqual(web_seeds[i]["auth"], ws[i]["auth"])
self.assertEqual(web_seeds[i]["type"], ws[i]["type"])
ws = [{'url': 'http://foo/test', 'auth': '', 'type': 0},
{'url': 'http://bar/test', 'auth': '', 'type': 1}]
ti.set_web_seeds(ws)
web_seeds = ti.web_seeds()
self.assertEqual(len(ws), len(web_seeds))
for i in range(len(web_seeds)):
self.assertEqual(web_seeds[i]["url"], ws[i]["url"])
self.assertEqual(web_seeds[i]["auth"], ws[i]["auth"])
self.assertEqual(web_seeds[i]["type"], ws[i]["type"])
def test_iterable_files(self):
def test_iterable_files(self):
# this detects whether libtorrent was built with deprecated APIs
# the file_strage object is only iterable for backwards compatibility
if not hasattr(lt, 'version'):
return
# this detects whether libtorrent was built with deprecated APIs
# the file_strage object is only iterable for backwards compatibility
if not hasattr(lt, 'version'): return
lt.session({'alert_mask': lt.alert.category_t.all_categories,
'enable_dht': False})
ti = lt.torrent_info('url_seed_multi.torrent')
files = ti.files()
ses = lt.session({'alert_mask': lt.alert.category_t.all_categories, 'enable_dht': False})
ti = lt.torrent_info('url_seed_multi.torrent');
files = ti.files()
idx = 0
expected = ['bar.txt', 'var.txt']
for f in files:
print(f.path)
idx = 0
expected = ['bar.txt', 'var.txt']
for f in files:
print(f.path)
self.assertEqual(os.path.split(f.path)[1], expected[idx])
self.assertEqual(os.path.split(f.path)[0],
os.path.join('temp', 'foo'))
idx += 1
self.assertEqual(os.path.split(f.path)[1], expected[idx])
self.assertEqual(os.path.split(f.path)[0], os.path.join('temp', 'foo'))
idx += 1
class test_alerts(unittest.TestCase):
def test_alert(self):
def test_alert(self):
ses = lt.session({'alert_mask': lt.alert.category_t.all_categories, 'enable_dht': False})
ti = lt.torrent_info('base.torrent');
h = ses.add_torrent({'ti': ti, 'save_path': os.getcwd()})
st = h.status()
time.sleep(1)
ses.remove_torrent(h)
ses.wait_for_alert(1000) # milliseconds
alerts = ses.pop_alerts()
for a in alerts:
print(a.message())
for field_name in dir(a):
if field_name.startswith('__'): continue
field = getattr(a, field_name)
if callable(field):
print(' ', field_name, ' = ', field())
else:
print(' ', field_name, ' = ', field)
ses = lt.session({'alert_mask': lt.alert.category_t.all_categories,
'enable_dht': False})
ti = lt.torrent_info('base.torrent')
h = ses.add_torrent({'ti': ti, 'save_path': os.getcwd()})
st = h.status()
time.sleep(1)
ses.remove_torrent(h)
ses.wait_for_alert(1000) # milliseconds
alerts = ses.pop_alerts()
for a in alerts:
print(a.message())
for field_name in dir(a):
if field_name.startswith('__'):
continue
field = getattr(a, field_name)
if callable(field):
print(' ', field_name, ' = ', field())
else:
print(' ', field_name, ' = ', field)
print(st.next_announce)
self.assertEqual(st.name, 'temp')
print(st.errc.message())
print(st.pieces)
print(st.last_seen_complete)
print(st.completed_time)
print(st.progress)
print(st.num_pieces)
print(st.distributed_copies)
print(st.paused)
print(st.info_hash)
print(st.seeding_duration)
print(st.last_upload)
print(st.last_download)
self.assertEqual(st.save_path, os.getcwd())
print(st.next_announce)
self.assertEqual(st.name, 'temp')
print(st.errc.message())
print(st.pieces)
print(st.last_seen_complete)
print(st.completed_time)
print(st.progress)
print(st.num_pieces)
print(st.distributed_copies)
print(st.paused)
print(st.info_hash)
print(st.seeding_duration)
print(st.last_upload)
print(st.last_download)
self.assertEqual(st.save_path, os.getcwd())
def test_pop_alerts(self):
ses = lt.session({'alert_mask': lt.alert.category_t.all_categories, 'enable_dht': False})
ses.async_add_torrent({"ti": lt.torrent_info("base.torrent"), "save_path": "."})
def test_pop_alerts(self):
ses = lt.session({'alert_mask': lt.alert.category_t.all_categories,
'enable_dht': False})
ses.async_add_torrent(
{"ti": lt.torrent_info("base.torrent"), "save_path": "."})
# this will cause an error (because of duplicate torrents) and the
# torrent_info object created here will be deleted once the alert goes out
# of scope. When that happens, it will decrement the python object, to allow
@ -213,74 +229,81 @@ class test_alerts(unittest.TestCase):
# we're trying to catch the error described in this post, with regards to
# torrent_info.
# https://mail.python.org/pipermail/cplusplus-sig/2007-June/012130.html
ses.async_add_torrent({"ti": lt.torrent_info("base.torrent"), "save_path": "."})
time.sleep(1)
for i in range(0, 10):
alerts = ses.pop_alerts()
for a in alerts:
print(a.message())
time.sleep(0.1)
ses.async_add_torrent(
{"ti": lt.torrent_info("base.torrent"), "save_path": "."})
time.sleep(1)
for i in range(0, 10):
alerts = ses.pop_alerts()
for a in alerts:
print(a.message())
time.sleep(0.1)
class test_bencoder(unittest.TestCase):
def test_bencode(self):
def test_bencode(self):
encoded = lt.bencode({'a': 1, 'b': [1,2,3], 'c': 'foo'})
self.assertEqual(encoded, b'd1:ai1e1:bli1ei2ei3ee1:c3:fooe')
encoded = lt.bencode({'a': 1, 'b': [1, 2, 3], 'c': 'foo'})
self.assertEqual(encoded, b'd1:ai1e1:bli1ei2ei3ee1:c3:fooe')
def test_bdecode(self):
def test_bdecode(self):
encoded = b'd1:ai1e1:bli1ei2ei3ee1:c3:fooe'
decoded = lt.bdecode(encoded)
self.assertEqual(decoded, {b'a': 1, b'b': [1, 2, 3], b'c': b'foo'})
encoded = b'd1:ai1e1:bli1ei2ei3ee1:c3:fooe'
decoded = lt.bdecode(encoded)
self.assertEqual(decoded, {b'a': 1, b'b': [1,2,3], b'c': b'foo'})
class test_sha1hash(unittest.TestCase):
def test_sha1hash(self):
h = 'a0'*20
s = lt.sha1_hash(binascii.unhexlify(h))
self.assertEqual(h, str(s))
def test_sha1hash(self):
h = 'a0'*20
s = lt.sha1_hash(binascii.unhexlify(h))
self.assertEqual(h, str(s))
class test_session(unittest.TestCase):
def test_post_session_stats(self):
s = lt.session({'alert_mask': lt.alert.category_t.stats_notification, 'enable_dht': False})
s.post_session_stats()
a = s.wait_for_alert(1000)
self.assertTrue(isinstance(a, lt.session_stats_alert))
self.assertTrue(isinstance(a.values, dict))
self.assertTrue(len(a.values) > 0)
def test_post_session_stats(self):
s = lt.session({'alert_mask': lt.alert.category_t.stats_notification,
'enable_dht': False})
s.post_session_stats()
a = s.wait_for_alert(1000)
self.assertTrue(isinstance(a, lt.session_stats_alert))
self.assertTrue(isinstance(a.values, dict))
self.assertTrue(len(a.values) > 0)
def test_add_torrent(self):
s = lt.session({'alert_mask': lt.alert.category_t.stats_notification, 'enable_dht': False})
h = s.add_torrent({'ti': lt.torrent_info('base.torrent'),
'save_path': '.',
'dht_nodes': [('1.2.3.4', 6881), ('4.3.2.1', 6881)],
'http_seeds': ['http://test.com/seed'],
'peers': [('5.6.7.8', 6881)],
'banned_peers': [('8.7.6.5', 6881)],
'file_priorities': [1,1,1,2,0]})
def test_add_torrent(self):
s = lt.session({'alert_mask': lt.alert.category_t.stats_notification,
'enable_dht': False})
s.add_torrent({
'ti': lt.torrent_info('base.torrent'),
'save_path': '.',
'dht_nodes': [('1.2.3.4', 6881), ('4.3.2.1', 6881)],
'http_seeds': ['http://test.com/seed'],
'peers': [('5.6.7.8', 6881)],
'banned_peers': [('8.7.6.5', 6881)],
'file_priorities': [1, 1, 1, 2, 0]})
def test_unknown_settings(self):
try:
s = lt.session({'unexpected-key-name': 42})
self.assertFalse('should have thrown an exception')
except KeyError as e:
print(e)
def test_unknown_settings(self):
try:
lt.session({'unexpected-key-name': 42})
self.assertFalse('should have thrown an exception')
except KeyError as e:
print(e)
def test_apply_settings(self):
s = lt.session({'enable_dht': False})
s.apply_settings({'num_want': 66, 'user_agent': 'test123'})
self.assertEqual(s.get_settings()['num_want'], 66)
self.assertEqual(s.get_settings()['user_agent'], 'test123')
def test_apply_settings(self):
s = lt.session({'enable_dht': False})
s.apply_settings({'num_want': 66, 'user_agent': 'test123'})
self.assertEqual(s.get_settings()['num_want'], 66)
self.assertEqual(s.get_settings()['user_agent'], 'test123')
if __name__ == '__main__':
print(lt.__version__)
shutil.copy(os.path.join('..', '..', 'test', 'test_torrents', 'url_seed_multi.torrent'), '.')
shutil.copy(os.path.join('..', '..', 'test', 'test_torrents', 'base.torrent'), '.')
shutil.copy(os.path.join('..', '..', 'test', 'test_torrents', 'unordered.torrent'), '.')
unittest.main()
print(lt.__version__)
shutil.copy(os.path.join('..', '..', 'test', 'test_torrents',
'url_seed_multi.torrent'), '.')
shutil.copy(os.path.join('..', '..', 'test', 'test_torrents',
'base.torrent'), '.')
shutil.copy(os.path.join('..', '..', 'test', 'test_torrents',
'unordered.torrent'), '.')
unittest.main()

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python
import os
import sys
os.chdir('bindings/python')
with open('setup.py') as filename: