1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
|
#!@PYTHON@
from __future__ import print_function
import os
import sys
import shutil
import re
import subprocess
import time
import tempfile
if os.name == "nt":
tmp = os.getenv ("TEMP")
else:
tmp = "/tmp"
if os.name == 'nt':
pif = 'gnunet-peerinfo.exe'
get = 'gnunet-dht-get.exe'
put = 'gnunet-dht-put.exe'
arm = 'gnunet-arm.exe'
else:
pif = 'gnunet-peerinfo'
get = './gnunet-dht-get'
put = './gnunet-dht-put'
arm = 'gnunet-arm'
tf, tempcfg = tempfile.mkstemp (prefix='test_dht_api_peer1.')
os.close (tf)
run_pif = [pif, '-c', tempcfg, '-sq']
run_get = [get, '-c', tempcfg]
run_put = [put, '-c', tempcfg]
run_arm = [arm, '-c', tempcfg]
debug = os.getenv ('DEBUG')
if debug:
run_arm += [debug.split (' ')]
def cleanup (exitcode):
os.remove (tempcfg)
sys.exit (exitcode)
def sub_run (args, want_stdo = True, want_stde = False, nofail = False):
if want_stdo:
stdo = subprocess.PIPE
else:
stdo = None
if want_stde:
stde = subprocess.PIPE
else:
stde = None
p = subprocess.Popen (args, stdout = stdo, stderr = stde)
stdo, stde = p.communicate ()
if not nofail:
if p.returncode != 0:
sys.exit (p.returncode)
return (p.returncode, stdo, stde)
def fail (result):
print (result)
r_arm (['-e'], want_stdo = False)
cleanup (1)
def r_something (to_run, extra_args, failer = None, normal = True, **kw):
rc, stdo, stde = sub_run (to_run + extra_args, nofail = True, **kw)
if failer is not None:
failer (to_run + extra_args, rc, stdo, stde, normal)
return (rc, stdo, stde)
def r_arm (extra_args, **kw):
return r_something (run_arm, extra_args, **kw)
def r_pif (extra_args, **kw):
return r_something (run_pif, extra_args, **kw)
def r_get (extra_args, **kw):
return r_something (run_get, extra_args, **kw)
def r_put (extra_args, **kw):
return r_something (run_put, extra_args, **kw)
def end_arm_failer (command, rc, stdo, stde, normal):
if normal:
if rc != 0:
fail ("FAIL: error running {}\nCommand output was:\n{}\n{}".format (command, stdo, stde))
else:
if rc == 0:
fail ("FAIL: expected error while running {}\nCommand output was:\n{}\n{}".format (command, stdo, stde))
def print_only_failer (command, rc, stdo, stde, normal):
if normal:
if rc != 0:
print ("FAIL: error running {}\nCommand output was:\n{}\n{}".format (command, stdo, stde))
cleanup (1)
else:
if rc == 0:
print ("FAIL: expected error while running {}\nCommand output was:\n{}\n{}".format (command, stdo, stde))
cleanup (1)
shutil.copyfile ('test_dht_api_peer1.conf', tempcfg)
print ("TEST: Generating hostkey...", end='')
r_pif ([], failer = print_only_failer)
print ("PASS")
print ("TEST: Starting ARM...", end='')
r_arm (['-s'], failer = end_arm_failer, want_stdo = False, want_stde = False)
print ("PASS")
time.sleep (1)
print ("TEST: Testing put...", end='')
r_put (['-k', 'testkey', '-d', 'testdata', '-t', '8'], failer = end_arm_failer)
print ("PASS")
time.sleep (1)
print ("TEST: Testing get...", end='')
rc, stdo, stde = r_get (['-k', 'testkey', '-T', '5', '-t', '8'], want_stdo = True, failer = end_arm_failer)
stdo = stdo.replace ('\r', '').splitlines ()
expect = "Result 0, type 8:\ntestdata".splitlines()
if len (stdo) != 2 or len (expect) != 2 or stdo[0] != expect[0] or stdo[1] != expect[1]:
fail ("output `{}' differs from expected `{}'".format (stdo, expect))
print ("PASS")
r_arm (['-e', '-d'], failer = print_only_failer)
|