Per-Arne Andersen
5 years ago
5 changed files with 214 additions and 0 deletions
@ -0,0 +1,56 @@ |
|||
import abc |
|||
from pathlib import Path |
|||
import subprocess |
|||
import shlex |
|||
|
|||
|
|||
class BaseObfuscation(abc.ABC): |
|||
|
|||
def __init__(self, binary_name=None, binary_path=None, algorithm=None): |
|||
|
|||
assert binary_name is not None or binary_path is not None |
|||
self.binary_name = binary_name if binary_name is not None else Path(self.binary_path).name |
|||
self.binary_path = binary_path if binary_path else "" |
|||
self.algorithm = algorithm |
|||
|
|||
def ensure_installed(self): |
|||
|
|||
# Attempt to find process by path |
|||
binary = Path(self.binary_path) |
|||
if not binary.is_file(): |
|||
# Did not find by path, attempt to find using which |
|||
proc_which = subprocess.Popen(["which", self.binary_name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|||
data = [x.decode().strip() for x in proc_which.communicate() if x != b''][0] |
|||
|
|||
if proc_which.returncode != 0: |
|||
raise RuntimeError("Could not find binary '%s'" % data) |
|||
|
|||
self.binary_path = data |
|||
|
|||
def execute(self, *args, kill_first=False, override_command=None): |
|||
|
|||
if kill_first: |
|||
# TODO try to delete by full name as we dont want to kill other processes. |
|||
pattern = self.binary_name |
|||
self.execute(*[pattern], override_command="pkill") |
|||
#pattern = self.binary_path + " " + ' '.join(args) |
|||
#print(pattern) |
|||
#kill_output, kill_code = self.execute(*[pattern], override_command="pkill") |
|||
|
|||
command = override_command if override_command is not None else self.binary_path |
|||
print(shlex.join([command] + list(args))) |
|||
proc_which = subprocess.Popen([command] + list(args), stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
|||
raw_data = proc_which.communicate() |
|||
|
|||
data = [x.decode().strip() for x in raw_data if x != b''] |
|||
if len(data) == 0: |
|||
data = "" |
|||
else: |
|||
data = data[0] |
|||
return data, proc_which.returncode |
|||
|
|||
|
|||
|
|||
|
|||
|
|||
|
@ -0,0 +1,30 @@ |
|||
from script.obfuscate import BaseObfuscation |
|||
import re |
|||
|
|||
|
|||
class ObfuscateOBFS4(BaseObfuscation): |
|||
|
|||
def __init__(self): |
|||
super().__init__( |
|||
binary_name="obfs4proxy", |
|||
binary_path="/usr/bin/obfs4proxy", |
|||
algorithm="obfs4" |
|||
) |
|||
|
|||
self.ensure_installed() |
|||
|
|||
def ensure_installed(self): |
|||
super().ensure_installed() |
|||
|
|||
output, code = self.execute("-version") |
|||
|
|||
if re.match(f'{self.binary_name}-[0-9]+.[0-9]+.[0-9]+', output) and code == 0: |
|||
return True |
|||
else: |
|||
raise RuntimeError(f"Could not verify that {self.binary_name} is installed correctly.") |
|||
|
|||
|
|||
if __name__ == "__main__": |
|||
|
|||
x = ObfuscateOBFS4() |
|||
x.ensure_installed() |
@ -0,0 +1,119 @@ |
|||
from pathlib import Path |
|||
|
|||
import requests |
|||
|
|||
import const |
|||
from script.obfuscate import BaseObfuscation |
|||
import re |
|||
import os |
|||
import qrcode |
|||
import socket |
|||
|
|||
from script.obfuscate.obfs4 import ObfuscateOBFS4 |
|||
|
|||
|
|||
class ObfuscationViaTOR(BaseObfuscation): |
|||
|
|||
def __init__(self, algorithm: BaseObfuscation): |
|||
super().__init__( |
|||
binary_name="tor" |
|||
) |
|||
self.algorithm = algorithm |
|||
self.tor_data_dir = "/tmp/wg-manager-tor-proxy" |
|||
self.tor_config_file = "/tmp/wg-manager-tor-proxy/torrc" |
|||
self.tor_fingerprint_file = f"{self.tor_data_dir}/fingerprint" |
|||
self.tor_bridge_file = f"{self.tor_data_dir}/pt_state/obfs4_bridgeline.txt" |
|||
|
|||
Path(self.tor_config_file).touch() |
|||
os.makedirs(self.tor_data_dir, exist_ok=True) |
|||
|
|||
def __del__(self): |
|||
pass |
|||
|
|||
def ensure_installed(self): |
|||
super().ensure_installed() |
|||
output, code = self.execute("--version") |
|||
|
|||
if re.match(f'Tor version .*', output) and code == 0: |
|||
return True |
|||
else: |
|||
raise RuntimeError(f"Could not verify that {self.binary_name} is installed correctly.") |
|||
|
|||
def start(self): |
|||
|
|||
output, code = self.execute( |
|||
"-f", self.tor_config_file, |
|||
"--DataDirectory", self.tor_data_dir, |
|||
"--RunAsDaemon", "1", |
|||
"--ExitPolicy", "reject *:*", |
|||
"--ORPort", str(const.OBFUSCATE_SOCKS_TOR_PORT), |
|||
"--BridgeRelay", "1", |
|||
"--PublishServerDescriptor", "0", |
|||
"--ServerTransportPlugin", f"{self.algorithm.algorithm} exec {self.algorithm.binary_path}", |
|||
"--ServerTransportListenAddr", f"{self.algorithm.algorithm} 0.0.0.0:{const.OBFUSCATE_TOR_LISTEN_ADDR}", |
|||
"--ExtORPort", "auto", |
|||
"--ContactInfo", "wg-manager@github.com", |
|||
"--Nickname", "wgmanager", |
|||
kill_first=True |
|||
) |
|||
|
|||
print(output) |
|||
|
|||
def generate_bridge_line(self, local=False): |
|||
|
|||
if local: |
|||
ip_address = socket.gethostbyname(socket.gethostname()) |
|||
else: |
|||
ip_address = requests.get("https://api.ipify.org").text |
|||
|
|||
with open(self.tor_fingerprint_file, "r") as f: |
|||
fingerprint = f.read().split(" ") |
|||
assert len(fingerprint) == 2, "Could not load fingerprint correctly. " \ |
|||
"Should be a list of 2 items (name, fingerprint)" |
|||
fingerprint = fingerprint[1] |
|||
|
|||
with open(self.tor_bridge_file, "r") as f: |
|||
bridge_line_raw = f.read() |
|||
|
|||
bridge_line = re.search(r"^Bridge .*", bridge_line_raw, re.MULTILINE).group(0) |
|||
bridge_line = bridge_line\ |
|||
.replace("<IP ADDRESS>", ip_address)\ |
|||
.replace("<PORT>", str(const.OBFUSCATE_TOR_LISTEN_ADDR))\ |
|||
.replace("<FINGERPRINT>", fingerprint)\ |
|||
.replace("Bridge ", "bridge://")\ |
|||
.replace("\n", "") |
|||
#bridge_line = f"bridge://{self.algorithm.algorithm} {ip_address}:{const.OBFUSCATE_SOCKS_TOR_PORT} {fingerprint}" |
|||
print(bridge_line) |
|||
return bridge_line |
|||
|
|||
def output_qr(self, text, image=False): |
|||
|
|||
qr = qrcode.QRCode( |
|||
version=10, |
|||
error_correction=qrcode.constants.ERROR_CORRECT_L, |
|||
box_size=10, |
|||
border=4, |
|||
) |
|||
qr.add_data(text) |
|||
qr.make(fit=True) |
|||
|
|||
if image: |
|||
img = qr.make_image(fill_color="black", back_color="white") |
|||
img.show() |
|||
else: |
|||
try: |
|||
qr.print_tty() |
|||
except: |
|||
qr.print_ascii() |
|||
|
|||
|
|||
if __name__ == "__main__": |
|||
|
|||
x = ObfuscationViaTOR( |
|||
algorithm=ObfuscateOBFS4() |
|||
) |
|||
x.ensure_installed() |
|||
x.start() |
|||
bridge_line = x.generate_bridge_line(local=False) |
|||
x.output_qr(bridge_line, image=True) |
|||
#x.generate_bridge_line(local=False) |
Loading…
Reference in new issue