Browse Source

Merge 6e92a15ad1 into 27d164b835

pull/111/merge
Hugo Rodrigues 3 years ago
committed by GitHub
parent
commit
25137c602a
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      wg-manager-backend/requirements.txt
  2. 94
      wg-manager-backend/script/wireguard.py

1
wg-manager-backend/requirements.txt

@ -20,3 +20,4 @@ qrcode[pil]
alembic alembic
loguru loguru
ldap3==2.9 ldap3==2.9
pywireguard==0.1.3

94
wg-manager-backend/script/wireguard.py

@ -5,6 +5,8 @@ import tempfile
import requests import requests
import typing import typing
import configparser import configparser
import warnings
import base64
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@ -14,6 +16,7 @@ import os
import re import re
import ipaddress import ipaddress
import util import util
import pywireguard
from database import models from database import models
from database.database import SessionLocal from database.database import SessionLocal
@ -52,6 +55,7 @@ class TempServerFile():
def _run_wg(server: schemas.WGServer, command): def _run_wg(server: schemas.WGServer, command):
warnings.DeprecationWarning("_run_wg will be depretated in favor of pywireguard")
try: try:
output = subprocess.check_output(const.CMD_WG_COMMAND + command, stderr=subprocess.STDOUT) output = subprocess.check_output(const.CMD_WG_COMMAND + command, stderr=subprocess.STDOUT)
return output return output
@ -65,49 +69,34 @@ def is_installed():
return output == b'' or b'interface' in output return output == b'' or b'interface' in output
def generate_keys() -> typing.Dict[str, str]: def generate_keys() -> typing.Dict[str, str, str]:
private_key = subprocess.check_output(const.CMD_WG_COMMAND + ["genkey"]) return pywireguard.generate_keys()
public_key = subprocess.check_output(
const.CMD_WG_COMMAND + ["pubkey"],
input=private_key
)
private_key = private_key.decode("utf-8").strip()
public_key = public_key.decode("utf-8").strip()
return dict(
private_key=private_key,
public_key=public_key
)
def generate_psk(): def generate_psk():
return subprocess.check_output(const.CMD_WG_COMMAND + ["genpsk"]).decode("utf-8").strip() return generate_keys()["preshared_key"]
def start_interface(server: typing.Union[schemas.WGServer, schemas.WGPeer]): def start_interface(server: typing.Union[schemas.WGServer, schemas.WGPeer]):
with TempServerFile(server) as server_file: device = pywireguard.Device(server.interface)
try: if not device.has_private_key:
# print(*const.CMD_WG_QUICK, "up", server_file) device.private_key = base64.b64encode(server.private_key)
output = subprocess.check_output(const.CMD_WG_QUICK + ["up", server_file], stderr=subprocess.STDOUT) if not device.has_listen_port:
return output device.listen_port = server.listen_port
except Exception as e:
print(e.output)
if b'already exists' in e.output:
raise WGAlreadyStartedError("The wireguard device %s is already started." % server.interface)
elif b'Address already in use' in e.output:
raise WGPortAlreadyInUse("The port %s is already used by another application." % server.listen_port)
# Remove existing peers
device.clear_peers()
for peer in server.peers:
add_peer(server, peer)
def stop_interface(server: schemas.WGServer): #TODO Post up and post down
with TempServerFile(server) as server_file:
try: device.update()
output = subprocess.check_output(const.CMD_WG_QUICK + ["down", server_file], stderr=subprocess.STDOUT)
return output
except Exception as e:
if b'is not a WireGuard interface' in e.output:
raise WGAlreadyStoppedError("The wireguard device %s is already stopped." % server.interface)
def stop_interface(server: schemas.WGServer):
device = pywireguard.Device(server.interface)
device.delete()
def restart_interface(server: schemas.WGServer): def restart_interface(server: schemas.WGServer):
try: try:
stop_interface(server) stop_interface(server)
@ -129,21 +118,33 @@ def is_running(server: schemas.WGServer):
def add_peer(server: schemas.WGServer, peer: schemas.WGPeer): def add_peer(server: schemas.WGServer, peer: schemas.WGPeer):
try: device = pywireguard.Device(server.interface)
output = _run_wg(server, ["set", server.interface, "peer", peer.public_key, "allowed-ips", peer.address]) wgpeer = pywireguard.Peer(public_key=peer.public_key)
return output == b'' if peer.shared_key:
except Exception as e: wgpeer.preshared_key = peer.shared_key
_LOGGER.exception(e) # TODO: Set peer endpoint. Need to wait for pywireguard implementation
return False for allowedip in peer.allowed_ips:
splited = allowedip.split("/")
ipaddr = splited[0]
if len(splited) == 2:
cidr = splited[1]
else:
cidr = 32
wgallowed = pywireguard.AllowedIP(ip=ipaddr, cidr=cidr)
peer.add_allowed_ip(wgallowed)
device.add_peer(wgpeer)
def remove_peer(server: schemas.WGServer, peer: schemas.WGPeer): def remove_peer(server: schemas.WGServer, peer: schemas.WGPeer):
try: removed = False
output = _run_wg(server, ["set", server.interface, "peer", peer.public_key, "remove"]) device = pywireguard.Device(server.interface)
return output == b'' for wgpeer in device.get_peers():
except Exception as e: if wgpeer.public_key == peer.public_key.encode():
_LOGGER.exception(e) wgpeer.remove_me()
return False removed = True
break
device.update()
return removed
def get_stats(server: schemas.WGServer): def get_stats(server: schemas.WGServer):
@ -207,6 +208,7 @@ def move_server_dir(interface, interface1):
def generate_config(obj: typing.Union[typing.Dict[schemas.WGPeer, schemas.WGServer], schemas.WGServer]): def generate_config(obj: typing.Union[typing.Dict[schemas.WGPeer, schemas.WGServer], schemas.WGServer]):
warnings.DeprecationWarning("generate_config will be depretated in favor of pywireguard")
if isinstance(obj, dict) and "server" in obj and "peer" in obj: if isinstance(obj, dict) and "server" in obj and "peer" in obj:
template = "peer.j2" template = "peer.j2"
is_ipv6 = obj["server"].v6_address is not None is_ipv6 = obj["server"].v6_address is not None

Loading…
Cancel
Save