kenken999's picture
test
ba19a97
raw
history blame
4.96 kB
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from fastapi.requests import Request
from fastapi.security.utils import get_authorization_scheme
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship
from pydantic import BaseModel
from typing import List
app = FastAPI()
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String, unique=True)
password = Column(String)
profile = Column(String)
team_id = Column(Integer, ForeignKey("teams.id"))
team = relationship("Team", backref="users")
class Team(Base):
__tablename__ = "teams"
id = Column(Integer, primary_key=True)
name = Column(String, unique=True)
engine = create_async_engine("sqlite:///database.sqlite")
Base.metadata.create_all(engine)
async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
@app.on_event("startup")
async def startup():
async with async_session() as session:
await session.execute("PRAGMA foreign_keys=ON")
@app.on_event("shutdown")
async def shutdown():
await engine.dispose()
class UserSchema(BaseModel):
username: str
password: str
profile: str
team_id: int
class TeamSchema(BaseModel):
name: str
@app.post("/register")
async def register_user(user: UserSchema):
async with async_session() as session:
existing_user = await session.execute(User.__table__.select().where(User.username == user.username))
if existing_user.scalar():
return JSONResponse(status_code=400, content={"error": "Username already exists"})
new_user = User(username=user.username, password=user.password, profile=user.profile, team_id=user.team_id)
session.add(new_user)
await session.commit()
return JSONResponse(status_code=201, content={"message": "User created successfully"})
@app.post("/login")
async def login_user(username: str, password: str):
async with async_session() as session:
user = await session.execute(User.__table__.select().where(User.username == username))
user = user.scalar()
if not user or user.password != password:
return JSONResponse(status_code=401, content={"error": "Invalid username or password"})
return JSONResponse(status_code=200, content={"message": "Logged in successfully"})
@app.get("/teams/")
async def get_teams():
async with async_session() as session:
teams = await session.execute(Team.__table__.select())
teams = teams.scalars().all()
return JSONResponse(status_code=200, content=[{"id": team.id, "name": team.name} for team in teams])
@app.post("/teams/")
async def create_team(team: TeamSchema):
async with async_session() as session:
new_team = Team(name=team.name)
session.add(new_team)
await session.commit()
return JSONResponse(status_code=201, content={"message": "Team created successfully"})
@app.get("/users/")
async def get_users():
async with async_session() as session:
users = await session.execute(User.__table__.select())
users = users.scalars().all()
return JSONResponse(status_code=200, content=[{"id": user.id, "username": user.username, "profile": user.profile} for user in users])
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with async_session() as session:
user = await session.execute(User.__table__.select().where(User.id == user_id))
user = user.scalar()
if not user:
return JSONResponse(status_code=404, content={"error": "User not found"})
return JSONResponse(status_code=200, content={"username": user.username, "profile": user.profile, "team_id": user.team_id})
@app.put("/users/{user_id}")
async def update_user(user_id: int, user: UserSchema):
async with async_session() as session:
user_db = await session.execute(User.__table__.select().where(User.id == user_id))
user_db = user_db.scalar()
if not user_db:
return JSONResponse(status_code=404, content={"error": "User not found"})
user_db.username = user.username
user_db.profile = user.profile
user_db.team_id = user.team_id
await session.commit()
return JSONResponse(status_code=200, content={"message": "User updated successfully"})
@app.get("/search")
async def search_users(q: str):
async with async_session() as session:
users = await session.execute(User.__table__.select().where(User.profile.like(f"%{q}%")))
users = users.scalars().all()
return JSONResponse(status_code=200, content=[{"id": user.id, "username": user.username, "profile": user.profile} for user in users])