disable some session features to not run our of open files in python test

This commit is contained in:
arvidn 2018-01-23 10:55:21 +01:00 committed by Arvid Norberg
parent 1f2a2294cc
commit a8cf790833
2 changed files with 36 additions and 41 deletions

View File

@ -77,6 +77,7 @@ before_install:
# disable leak checking for now. it reports some suspicious reports against some
# tests
- export ASAN_OPTIONS=detect_leaks=0;
- ulimit -a
install:

View File

@ -23,6 +23,11 @@ if os.name != 'nt':
HAVE_DEPRECATED_APIS = hasattr(lt, 'version')
settings = {
'alert_mask': lt.alert.category_t.all_categories,
'enable_dht': False, 'enable_lsd': False, 'enable_natpmp': False,
'enable_upnp': False, 'listen_interfaces': '0.0.0.0:0', 'file_pool_size': 1}
class test_create_torrent(unittest.TestCase):
def test_from_torrent_info(self):
@ -55,9 +60,7 @@ class test_session_stats(unittest.TestCase):
class test_torrent_handle(unittest.TestCase):
def setup(self):
self.ses = lt.session({
'alert_mask': lt.alert.category_t.all_categories,
'enable_dht': False})
self.ses = lt.session(settings)
self.ti = lt.torrent_info('url_seed_multi.torrent')
self.h = self.ses.add_torrent({
'ti': self.ti, 'save_path': os.getcwd()})
@ -202,7 +205,7 @@ class test_torrent_handle(unittest.TestCase):
self.assertEqual(tp.file_priorities, [0, 1, 1])
self.assertEqual(tp.peers, [('1.1.1.1', 1), ('2.2.2.2', 2)])
ses = lt.session({'alert_mask': lt.alert.category_t.all_categories})
ses = lt.session(settings)
h = ses.add_torrent(tp)
for attr in dir(tp):
print('%s: %s' % (attr, getattr(tp, attr)))
@ -227,8 +230,7 @@ class test_torrent_handle(unittest.TestCase):
self.assertEqual(cs.pieces, [])
def test_unknown_torrent_parameter(self):
self.ses = lt.session({'alert_mask': lt.alert.category_t.all_categories,
'enable_dht': False})
self.ses = lt.session(settings)
try:
self.h = self.ses.add_torrent({'unexpected-key-name': ''})
self.assertFalse('should have thrown an exception')
@ -236,8 +238,7 @@ class test_torrent_handle(unittest.TestCase):
print(e)
def test_torrent_parameter(self):
self.ses = lt.session({'alert_mask': lt.alert.category_t.all_categories,
'enable_dht': False})
self.ses = lt.session(settings)
self.ti = lt.torrent_info('url_seed_multi.torrent');
self.h = self.ses.add_torrent({
'ti': self.ti,
@ -305,8 +306,7 @@ class test_torrent_info(unittest.TestCase):
if not HAVE_DEPRECATED_APIS:
return
lt.session({'alert_mask': lt.alert.category_t.all_categories,
'enable_dht': False})
lt.session(settings)
ti = lt.torrent_info('url_seed_multi.torrent')
files = ti.files()
@ -332,8 +332,7 @@ class test_alerts(unittest.TestCase):
def test_alert(self):
ses = lt.session({'alert_mask': lt.alert.category_t.all_categories,
'enable_dht': False})
ses = lt.session(settings)
ti = lt.torrent_info('base.torrent')
h = ses.add_torrent({'ti': ti, 'save_path': os.getcwd()})
st = h.status()
@ -372,8 +371,7 @@ class test_alerts(unittest.TestCase):
self.assertEqual(st.save_path, os.getcwd())
def test_pop_alerts(self):
ses = lt.session({'alert_mask': lt.alert.category_t.all_categories,
'enable_dht': False})
ses = lt.session(settings)
ses.async_add_torrent(
{"ti": lt.torrent_info("base.torrent"), "save_path": "."})
@ -418,7 +416,7 @@ class test_sha1hash(unittest.TestCase):
class test_magnet_link(unittest.TestCase):
def test_parse_magnet_uri(self):
ses = lt.session({})
ses = lt.session(settings)
magnet = 'magnet:?xt=urn:btih:C6EIF4CCYDBTIJVG3APAGM7M4NDONCTI'
p = lt.parse_magnet_uri(magnet)
p['save_path'] = '.'
@ -428,7 +426,7 @@ class test_magnet_link(unittest.TestCase):
class test_peer_class(unittest.TestCase):
def test_peer_class_ids(self):
s = lt.session({'enable_dht': False})
s = lt.session(settings)
print('global_peer_class_id:', lt.session.global_peer_class_id)
print('tcp_peer_class_id:', lt.session.tcp_peer_class_id)
@ -439,7 +437,7 @@ class test_peer_class(unittest.TestCase):
print('local: ', s.get_peer_class(s.local_peer_class_id))
def test_peer_class(self):
s = lt.session({'enable_dht': False})
s = lt.session(settings)
c = s.create_peer_class('test class')
print('new class: ', s.get_peer_class(c))
@ -474,14 +472,14 @@ class test_peer_class(unittest.TestCase):
filt.allow(lt.peer_class_type_filter.utp_socket, lt.session.local_peer_class_id);
def test_peer_class_ip_filter(self):
s = lt.session({'enable_dht': False})
s = lt.session(settings)
s.set_peer_class_type_filter(lt.peer_class_type_filter())
s.set_peer_class_filter(lt.ip_filter())
class test_session(unittest.TestCase):
def test_add_torrent(self):
s = lt.session({'alert_mask': lt.alert.category_t.stats_notification, 'enable_dht': False})
s = lt.session(settings)
h = s.add_torrent({'ti': lt.torrent_info('base.torrent'),
'save_path': '.',
'dht_nodes': [('1.2.3.4', 6881), ('4.3.2.1', 6881)],
@ -492,33 +490,29 @@ class test_session(unittest.TestCase):
def test_apply_settings(self):
s = lt.session({'enable_dht': False})
s = lt.session(settings)
s.apply_settings({'num_want': 66, 'user_agent': 'test123'})
self.assertEqual(s.get_settings()['num_want'], 66)
self.assertEqual(s.get_settings()['user_agent'], 'test123')
def test_post_session_stats(self):
s = lt.session({'alert_mask': lt.alert.category_t.stats_notification,
'enable_dht': False})
def test_post_session_stats(self):
s = lt.session({'alert_mask': lt.alert.category_t.stats_notification, 'enable_dht': False})
s.post_session_stats()
alerts = []
# first the stats headers log line. but not if logging is disabled
if 'log_alert' in [i[0] for i in inspect.getmembers(lt)]:
s.wait_for_alert(1000)
alerts = s.pop_alerts()
a = alerts.pop(0)
self.assertTrue(isinstance(a, lt.log_alert))
# then the actual stats values
if len(alerts) == 0:
s.wait_for_alert(1000)
alerts = s.pop_alerts()
a = alerts.pop(0)
self.assertTrue(isinstance(a, lt.session_stats_alert))
self.assertTrue(isinstance(a.values, dict))
self.assertTrue(len(a.values) > 0)
s = lt.session(settings)
s.post_session_stats()
alerts = []
# first the stats headers log line. but not if logging is disabled
if 'log_alert' in [i[0] for i in inspect.getmembers(lt)]:
s.wait_for_alert(1000)
alerts = s.pop_alerts()
a = alerts.pop(0)
self.assertTrue(isinstance(a, lt.log_alert))
# then the actual stats values
if len(alerts) == 0:
s.wait_for_alert(1000)
alerts = s.pop_alerts()
a = alerts.pop(0)
self.assertTrue(isinstance(a, lt.session_stats_alert))
self.assertTrue(isinstance(a.values, dict))
self.assertTrue(len(a.values) > 0)
def test_unknown_settings(self):
try: