Spaces:
Runtime error
Runtime error
File size: 1,617 Bytes
614861a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
from datetime import datetime, timedelta
from jose import JWTError, jwt, ExpiredSignatureError
# from passlib.context import CryptContext
class AuthService:
def __init__(self, account_repo, secret_key="123"):
assert account_repo is not None
assert secret_key is not None
self.account_repo = account_repo
self.secret_key = secret_key
self.encode_alg = "HS256"
# self.pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
async def authenticate_user(self, email, password):
key, user = self.account_repo.find({"email": email})
if not user:
return False
assert 'password' in user.keys()
if user['password'] == password:
return True
# if not self.pwd_context.verify(password, user["hashed_password"]):
# return False
return user
async def create_token(self, email):
expire = datetime.utcnow() + timedelta(minutes=30)
encoded_jwt = jwt.encode(
{"sub": email, "exp": expire},
self.secret_key,
algorithm=self.encode_alg
)
return encoded_jwt
async def validate_token(self, encoded_token):
try:
decoded_token = jwt.decode(encoded_token, self.secret_key, algorithms=[self.encode_alg])
key, user = self.account_repo.find({"email": decoded_token['sub']})
return user['email']
except ExpiredSignatureError:
return False # Expired
except JWTError:
return False
except Exception as e:
raise e
|