You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

203 lines
4.9 KiB

from datetime import datetime
5 years ago
import pydantic
5 years ago
from pydantic import BaseModel, typing
5 years ago
from sqlalchemy.orm import Session, Query
from sqlalchemy.orm.exc import MultipleResultsFound, NoResultFound
import logging
from database import models
5 years ago
5 years ago
_LOGGER = logging.getLogger(__name__)
class GenericModel(BaseModel):
class Meta:
model = None
key = None
excludes = {"id"}
class Config:
orm_mode = True
def _ensure_orm(self):
if not self.Config and not self.Config.orm_mode and not self.Meta.model and not self.Meta.key:
raise NotImplementedError("Incorrect configuration Config.orm_mode must be enabled and Meta.model must be "
"set to a sqlalchemy model. Additional Meta.key must be set to bind model and schema")
def filter_query(self, sess) -> Query:
query = sess.query(self.Meta.model).filter_by(**{
self.Meta.key: getattr(self, self.Meta.key)
})
return query
def update(self, sess: Session, new):
self._ensure_orm()
self.filter_query(sess).update(new.dict(include=self.columns()))
sess.commit()
for k, v in new.dict().items():
try:
setattr(self, k, v)
except ValueError:
pass
return self
def columns(self, no_exclude=False):
cols = set([x for x in dir(self.Meta.model) if not x.startswith("_")])
#cols = set([str(x).replace(f"{self.Meta.model.__table__.name}.", "") for x in self.Meta.model.__table__.columns])
return cols if no_exclude else cols - self.Meta.excludes
def sync(self, sess: Session):
self._ensure_orm()
# Count existing
n_results = self.filter_query(sess).count()
if n_results == 0:
# Insert, does not exists at all.
# Convert from schema to model
dbm = self.Meta.model(**self.dict(exclude=self.Meta.excludes)) # TODO. added exclude here. this might mess stuff?
5 years ago
sess.add(dbm)
else:
self.filter_query(sess).update(self.dict(include=self.columns()))
sess.commit()
def from_db(self, sess: Session):
self._ensure_orm()
try:
db_item = self.filter_query(sess).one()
5 years ago
5 years ago
for c in self.columns(no_exclude=True):
try:
setattr(self, c, getattr(db_item, c))
except ValueError as e:
pass
return self
except MultipleResultsFound as e:
_LOGGER.exception(e)
except NoResultFound as e:
_LOGGER.exception(e)
_LOGGER.warning("We did not find any records in the database that corresponds to the model. This means you "
"are trying to fetch a unsaved schema!")
return None
class User(GenericModel):
id: int = None
username: str
5 years ago
email: str = None
full_name: str = None
role: str = None
5 years ago
class Meta:
model = models.User
key = "username"
excludes = {"id"}
5 years ago
class UserAPIKey(GenericModel):
id: int
created_date: datetime
class UserAPIKeyFull(UserAPIKey):
key: str
5 years ago
class UserInDB(User):
password: str
5 years ago
class Token(GenericModel):
access_token: str
token_type: str
user: User
5 years ago
class WGPeer(GenericModel):
id: int = None
5 years ago
name: str = None
address: str = None
v6_address: str = None
5 years ago
private_key: str = None
public_key: str = None
shared_key: str = None
5 years ago
server_id: str
5 years ago
dns: str = None
allowed_ips: str = None
keep_alive: int = None
5 years ago
configuration: str = None
5 years ago
5 years ago
class Meta:
model = models.WGPeer
key = "id"
excludes = {}
5 years ago
5 years ago
class WGPeerConfig(GenericModel):
5 years ago
config: str
5 years ago
class KeyPair(GenericModel):
5 years ago
public_key: str
private_key: str
5 years ago
class PSK(GenericModel):
5 years ago
psk: str
5 years ago
class WGServer(GenericModel):
id: int = None
5 years ago
address: str = None
v6_address: str = None
subnet: int = None
v6_subnet: int = None
5 years ago
interface: str
listen_port: int = None
endpoint: str = None
private_key: str = None
public_key: str = None
is_running: bool = None
5 years ago
configuration: str = None
5 years ago
post_up: str = None
post_down: str = None
dns: str = None
allowed_ips: str = None
keep_alive: int = None
read_only: int = None
5 years ago
5 years ago
peers: pydantic.typing.List['WGPeer'] = []
5 years ago
5 years ago
class Meta:
model = models.WGServer
key = "interface"
excludes = {"id", "peers", "v6_support"}
5 years ago
def convert(self):
self.peers = [] if not self.peers else self.peers
return models.WGServer(**self.dict(exclude={"is_running"}))
5 years ago
class WGServerAdd(WGServer):
address: str
interface: str
listen_port: int
v6_support: bool
5 years ago
class WGPeerConfigAdd(GenericModel):
5 years ago
server_interface: str
name: str = None
5 years ago
class WGPeerConfigGetByName(GenericModel):
server_interface: str
name: str = None