aboutsummaryrefslogtreecommitdiff
path: root/contrib/scripts/gnunet_pyexpect.py.in
blob: 8908ca6a8810fcebfc0b3fa090dfd0c742d053e5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#!@PYTHON@
#    This file is part of GNUnet.
#    (C) 2010, 2018 Christian Grothoff (and other contributing authors)
#
#    GNUnet is free software: you can redistribute it and/or modify it
#    under the terms of the GNU Affero General Public License as published
#    by the Free Software Foundation, either version 3 of the License, or
#    (at your option) any later version.
#
#    GNUnet is distributed in the hope that it will be useful, but
#    WITHOUT ANY WARRANTY; without even the implied warranty of
#    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
#    Affero General Public License for more details.
#
# Testcase for gnunet-peerinfo
from __future__ import print_function
import os
import re
import subprocess
import sys
import shutil
import time


class pexpect (object):
    def __init__(self):
        super(pexpect, self).__init__()

    def spawn(self, stdin, arglist, *pargs, **kwargs):
        env = kwargs.pop('env', None)
        if env is None:
            env = os.environ.copy()
        # This messes up some testcases, disable log redirection
        env.pop('GNUNET_FORCE_LOGFILE', None)
        self.proc = subprocess.Popen(arglist, *pargs, env=env, **kwargs)
        if self.proc is None:
            print("Failed to spawn a process {0}".format(arglist))
            sys.exit(1)
        if stdin is not None:
            self.stdo, self.stde = self.proc.communicate(stdin)
        else:
            self.stdo, self.stde = self.proc.communicate()
        return self.proc

    def expect(self, s, r, flags=0):
        stream = self.stdo if s == 'stdout' else self.stde
        if isinstance(r, str):
            if r == "EOF":
                if len(stream) == 0:
                    return True
                else:
                    print("Failed to find `{1}' in {0}, which is `{2}' ({3})".format(s, r, stream, len(stream)))
                    sys.exit(2)
            raise ValueError("Argument `r' should be an instance of re.RegexObject or a special string, but is `{0}'".format(r))
        m = r.search(stream.decode(), flags)
        if not m:
            print("Failed to find `{1}' in {0}, which is is `{2}'".format(s, r.pattern, stream))
            sys.exit(2)
        stream = stream[m.end():]
        if s == 'stdout':
            self.stdo = stream
        else:
            self.stde = stream
        return m

    def read(self, s, size=-1):
        stream = self.stdo if s == 'stdout' else self.stde
        result = ""
        if size < 0:
            result = stream
            new_stream = ""
        else:
            result = stream[0:size]
            new_stream = stream[size:]
        if s == 'stdout':
            self.stdo = new_stream
        else:
            self.stde = new_stream
        return result