1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
#!@PYTHON@
# This file is part of GNUnet.
# (C) 2010, 2018 Christian Grothoff (and other contributing authors)
#
# GNUnet is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# GNUnet is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Testcase for gnunet-peerinfo
from __future__ import print_function
import os
import re
import subprocess
import sys
import shutil
import time
class pexpect (object):
def __init__(self):
super(pexpect, self).__init__()
def spawn(self, stdin, arglist, *pargs, **kwargs):
env = kwargs.pop('env', None)
if env is None:
env = os.environ.copy()
# This messes up some testcases, disable log redirection
env.pop('GNUNET_FORCE_LOGFILE', None)
self.proc = subprocess.Popen(arglist, *pargs, env=env, **kwargs)
if self.proc is None:
print("Failed to spawn a process {0}".format(arglist))
sys.exit(1)
if stdin is not None:
self.stdo, self.stde = self.proc.communicate(stdin)
else:
self.stdo, self.stde = self.proc.communicate()
return self.proc
def expect(self, s, r, flags=0):
stream = self.stdo if s == 'stdout' else self.stde
if isinstance(r, str):
if r == "EOF":
if len(stream) == 0:
return True
else:
print("Failed to find `{1}' in {0}, which is `{2}' ({3})".format(s, r, stream, len(stream)))
sys.exit(2)
raise ValueError("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format(r))
m = r.search(stream.decode(), flags)
if not m:
print("Failed to find `{1}' in {0}, which is is `{2}'".format(s, r.pattern, stream))
sys.exit(2)
stream = stream[m.end():]
if s == 'stdout':
self.stdo = stream
else:
self.stde = stream
return m
def read(self, s, size=-1):
stream = self.stdo if s == 'stdout' else self.stde
result = ""
if size < 0:
result = stream
new_stream = ""
else:
result = stream[0:size]
new_stream = stream[size:]
if s == 'stdout':
self.stdo = new_stream
else:
self.stde = new_stream
return result
|