Per-Arne Andersen
5 years ago
5 changed files with 215 additions and 1 deletions
@ -0,0 +1,56 @@ |
|||||
|
import abc |
||||
|
from pathlib import Path |
||||
|
import subprocess |
||||
|
import shlex |
||||
|
|
||||
|
|
||||
|
class BaseObfuscation(abc.ABC): |
||||
|
|
||||
|
def __init__(self, binary_name=None, binary_path=None, algorithm=None): |
||||
|
|
||||
|
assert binary_name is not None or binary_path is not None |
||||
|
self.binary_name = binary_name if binary_name is not None else Path(self.binary_path).name |
||||
|
self.binary_path = binary_path if binary_path else "" |
||||
|
self.algorithm = algorithm |
||||
|
|
||||
|
def ensure_installed(self): |
||||
|
|
||||
|
# Attempt to find process by path |
||||
|
binary = Path(self.binary_path) |
||||
|
if not binary.is_file(): |
||||
|
# Did not find by path, attempt to find using which |
||||
|
proc_which = subprocess.Popen(["which", self.binary_name], stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
||||
|
data = [x.decode().strip() for x in proc_which.communicate() if x != b''][0] |
||||
|
|
||||
|
if proc_which.returncode != 0: |
||||
|
raise RuntimeError("Could not find binary '%s'" % data) |
||||
|
|
||||
|
self.binary_path = data |
||||
|
|
||||
|
def execute(self, *args, kill_first=False, override_command=None): |
||||
|
|
||||
|
if kill_first: |
||||
|
# TODO try to delete by full name as we dont want to kill other processes. |
||||
|
pattern = self.binary_name |
||||
|
self.execute(*[pattern], override_command="pkill") |
||||
|
#pattern = self.binary_path + " " + ' '.join(args) |
||||
|
#print(pattern) |
||||
|
#kill_output, kill_code = self.execute(*[pattern], override_command="pkill") |
||||
|
|
||||
|
command = override_command if override_command is not None else self.binary_path |
||||
|
print(shlex.join([command] + list(args))) |
||||
|
proc_which = subprocess.Popen([command] + list(args), stdout=subprocess.PIPE, stderr=subprocess.PIPE) |
||||
|
raw_data = proc_which.communicate() |
||||
|
|
||||
|
data = [x.decode().strip() for x in raw_data if x != b''] |
||||
|
if len(data) == 0: |
||||
|
data = "" |
||||
|
else: |
||||
|
data = data[0] |
||||
|
return data, proc_which.returncode |
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
|
||||
|
|
@ -0,0 +1,30 @@ |
|||||
|
from script.obfuscate import BaseObfuscation |
||||
|
import re |
||||
|
|
||||
|
|
||||
|
class ObfuscateOBFS4(BaseObfuscation): |
||||
|
|
||||
|
def __init__(self): |
||||
|
super().__init__( |
||||
|
binary_name="obfs4proxy", |
||||
|
binary_path="/usr/bin/obfs4proxy", |
||||
|
algorithm="obfs4" |
||||
|
) |
||||
|
|
||||
|
self.ensure_installed() |
||||
|
|
||||
|
def ensure_installed(self): |
||||
|
super().ensure_installed() |
||||
|
|
||||
|
output, code = self.execute("-version") |
||||
|
|
||||
|
if re.match(f'{self.binary_name}-[0-9]+.[0-9]+.[0-9]+', output) and code == 0: |
||||
|
return True |
||||
|
else: |
||||
|
raise RuntimeError(f"Could not verify that {self.binary_name} is installed correctly.") |
||||
|
|
||||
|
|
||||
|
if __name__ == "__main__": |
||||
|
|
||||
|
x = ObfuscateOBFS4() |
||||
|
x.ensure_installed() |
@ -0,0 +1,119 @@ |
|||||
|
from pathlib import Path |
||||
|
|
||||
|
import requests |
||||
|
|
||||
|
import const |
||||
|
from script.obfuscate import BaseObfuscation |
||||
|
import re |
||||
|
import os |
||||
|
import qrcode |
||||
|
import socket |
||||
|
|
||||
|
from script.obfuscate.obfs4 import ObfuscateOBFS4 |
||||
|
|
||||
|
|
||||
|
class ObfuscationViaTOR(BaseObfuscation): |
||||
|
|
||||
|
def __init__(self, algorithm: BaseObfuscation): |
||||
|
super().__init__( |
||||
|
binary_name="tor" |
||||
|
) |
||||
|
self.algorithm = algorithm |
||||
|
self.tor_data_dir = "/tmp/wg-manager-tor-proxy" |
||||
|
self.tor_config_file = "/tmp/wg-manager-tor-proxy/torrc" |
||||
|
self.tor_fingerprint_file = f"{self.tor_data_dir}/fingerprint" |
||||
|
self.tor_bridge_file = f"{self.tor_data_dir}/pt_state/obfs4_bridgeline.txt" |
||||
|
|
||||
|
Path(self.tor_config_file).touch() |
||||
|
os.makedirs(self.tor_data_dir, exist_ok=True) |
||||
|
|
||||
|
def __del__(self): |
||||
|
pass |
||||
|
|
||||
|
def ensure_installed(self): |
||||
|
super().ensure_installed() |
||||
|
output, code = self.execute("--version") |
||||
|
|
||||
|
if re.match(f'Tor version .*', output) and code == 0: |
||||
|
return True |
||||
|
else: |
||||
|
raise RuntimeError(f"Could not verify that {self.binary_name} is installed correctly.") |
||||
|
|
||||
|
def start(self): |
||||
|
|
||||
|
output, code = self.execute( |
||||
|
"-f", self.tor_config_file, |
||||
|
"--DataDirectory", self.tor_data_dir, |
||||
|
"--RunAsDaemon", "1", |
||||
|
"--ExitPolicy", "reject *:*", |
||||
|
"--ORPort", str(const.OBFUSCATE_SOCKS_TOR_PORT), |
||||
|
"--BridgeRelay", "1", |
||||
|
"--PublishServerDescriptor", "0", |
||||
|
"--ServerTransportPlugin", f"{self.algorithm.algorithm} exec {self.algorithm.binary_path}", |
||||
|
"--ServerTransportListenAddr", f"{self.algorithm.algorithm} 0.0.0.0:{const.OBFUSCATE_TOR_LISTEN_ADDR}", |
||||
|
"--ExtORPort", "auto", |
||||
|
"--ContactInfo", "wg-manager@github.com", |
||||
|
"--Nickname", "wgmanager", |
||||
|
kill_first=True |
||||
|
) |
||||
|
|
||||
|
print(output) |
||||
|
|
||||
|
def generate_bridge_line(self, local=False): |
||||
|
|
||||
|
if local: |
||||
|
ip_address = socket.gethostbyname(socket.gethostname()) |
||||
|
else: |
||||
|
ip_address = requests.get("https://api.ipify.org").text |
||||
|
|
||||
|
with open(self.tor_fingerprint_file, "r") as f: |
||||
|
fingerprint = f.read().split(" ") |
||||
|
assert len(fingerprint) == 2, "Could not load fingerprint correctly. " \ |
||||
|
"Should be a list of 2 items (name, fingerprint)" |
||||
|
fingerprint = fingerprint[1] |
||||
|
|
||||
|
with open(self.tor_bridge_file, "r") as f: |
||||
|
bridge_line_raw = f.read() |
||||
|
|
||||
|
bridge_line = re.search(r"^Bridge .*", bridge_line_raw, re.MULTILINE).group(0) |
||||
|
bridge_line = bridge_line\ |
||||
|
.replace("<IP ADDRESS>", ip_address)\ |
||||
|
.replace("<PORT>", str(const.OBFUSCATE_TOR_LISTEN_ADDR))\ |
||||
|
.replace("<FINGERPRINT>", fingerprint)\ |
||||
|
.replace("Bridge ", "bridge://")\ |
||||
|
.replace("\n", "") |
||||
|
#bridge_line = f"bridge://{self.algorithm.algorithm} {ip_address}:{const.OBFUSCATE_SOCKS_TOR_PORT} {fingerprint}" |
||||
|
print(bridge_line) |
||||
|
return bridge_line |
||||
|
|
||||
|
def output_qr(self, text, image=False): |
||||
|
|
||||
|
qr = qrcode.QRCode( |
||||
|
version=10, |
||||
|
error_correction=qrcode.constants.ERROR_CORRECT_L, |
||||
|
box_size=10, |
||||
|
border=4, |
||||
|
) |
||||
|
qr.add_data(text) |
||||
|
qr.make(fit=True) |
||||
|
|
||||
|
if image: |
||||
|
img = qr.make_image(fill_color="black", back_color="white") |
||||
|
img.show() |
||||
|
else: |
||||
|
try: |
||||
|
qr.print_tty() |
||||
|
except: |
||||
|
qr.print_ascii() |
||||
|
|
||||
|
|
||||
|
if __name__ == "__main__": |
||||
|
|
||||
|
x = ObfuscationViaTOR( |
||||
|
algorithm=ObfuscateOBFS4() |
||||
|
) |
||||
|
x.ensure_installed() |
||||
|
x.start() |
||||
|
bridge_line = x.generate_bridge_line(local=False) |
||||
|
x.output_qr(bridge_line, image=True) |
||||
|
#x.generate_bridge_line(local=False) |
Loading…
Reference in new issue