#!/usr/bin/python from __future__ import print_function import os import sys import shutil import re import subprocess import time import tempfile if os.name == "nt": tmp = os.getenv ("TEMP") else: tmp = "/tmp" if os.name == 'nt': pif = 'gnunet-peerinfo.exe' get = 'gnunet-dht-get.exe' put = 'gnunet-dht-put.exe' arm = 'gnunet-arm.exe' else: pif = 'gnunet-peerinfo' get = './gnunet-dht-get' put = './gnunet-dht-put' arm = 'gnunet-arm' tf, tempcfg = tempfile.mkstemp (prefix='test_dht_api_peer1.') os.close (tf) run_pif = [pif, '-c', tempcfg, '-sq'] run_get = [get, '-c', tempcfg] run_put = [put, '-c', tempcfg] run_arm = [arm, '-c', tempcfg] debug = os.getenv ('DEBUG') if debug: run_arm += [debug.split (' ')] def cleanup (exitcode): os.remove (tempcfg) sys.exit (exitcode) def sub_run (args, want_stdo = True, want_stde = False, nofail = False): if want_stdo: stdo = subprocess.PIPE else: stdo = None if want_stde: stde = subprocess.PIPE else: stde = None p = subprocess.Popen (args, stdout = stdo, stderr = stde) stdo, stde = p.communicate () if not nofail: if p.returncode != 0: sys.exit (p.returncode) return (p.returncode, stdo, stde) def fail (result): print (result) r_arm (['-e'], want_stdo = False) cleanup (1) def r_something (to_run, extra_args, failer = None, normal = True, **kw): rc, stdo, stde = sub_run (to_run + extra_args, nofail = True, **kw) if failer is not None: failer (to_run + extra_args, rc, stdo, stde, normal) return (rc, stdo, stde) def r_arm (extra_args, **kw): return r_something (run_arm, extra_args, **kw) def r_pif (extra_args, **kw): return r_something (run_pif, extra_args, **kw) def r_get (extra_args, **kw): return r_something (run_get, extra_args, **kw) def r_put (extra_args, **kw): return r_something (run_put, extra_args, **kw) def end_arm_failer (command, rc, stdo, stde, normal): if normal: if rc != 0: fail ("FAIL: error running {}\nCommand output was:\n{}\n{}".format (command, stdo, stde)) else: if rc == 0: fail ("FAIL: expected error while running {}\nCommand output was:\n{}\n{}".format (command, stdo, stde)) def print_only_failer (command, rc, stdo, stde, normal): if normal: if rc != 0: print ("FAIL: error running {}\nCommand output was:\n{}\n{}".format (command, stdo, stde)) cleanup (1) else: if rc == 0: print ("FAIL: expected error while running {}\nCommand output was:\n{}\n{}".format (command, stdo, stde)) cleanup (1) shutil.copyfile ('test_dht_api_peer1.conf', tempcfg) print ("TEST: Generating hostkey...", end='') r_pif ([], failer = print_only_failer) print ("PASS") print ("TEST: Starting ARM...", end='') r_arm (['-s'], failer = end_arm_failer, want_stdo = False, want_stde = False) print ("PASS") time.sleep (1) print ("TEST: Testing put...", end='') r_put (['-k', 'testkey', '-d', 'testdata', '-t', '8'], failer = end_arm_failer) print ("PASS") time.sleep (1) print ("TEST: Testing get...", end='') rc, stdo, stde = r_get (['-k', 'testkey', '-T', '5', '-t', '8'], want_stdo = True, failer = end_arm_failer) stdo = stdo.replace ('\r', '').splitlines () expect = "Result 0, type 8:\ntestdata".splitlines() if len (stdo) != 2 or len (expect) != 2 or stdo[0] != expect[0] or stdo[1] != expect[1]: fail ("output `{}' differs from expected `{}'".format (stdo, expect)) print ("PASS") r_arm (['-e', '-d'], failer = print_only_failer)