File size: 4,957 Bytes
ba19a97
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
from fastapi import FastAPI
from fastapi.responses import JSONResponse
from fastapi.requests import Request
from fastapi.security.utils import get_authorization_scheme
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import sessionmaker, relationship
from pydantic import BaseModel
from typing import List

app = FastAPI()

Base = declarative_base()

class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    username = Column(String, unique=True)
    password = Column(String)
    profile = Column(String)
    team_id = Column(Integer, ForeignKey("teams.id"))
    team = relationship("Team", backref="users")

class Team(Base):
    __tablename__ = "teams"
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True)

engine = create_async_engine("sqlite:///database.sqlite")
Base.metadata.create_all(engine)

async_session = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

@app.on_event("startup")
async def startup():
    async with async_session() as session:
        await session.execute("PRAGMA foreign_keys=ON")

@app.on_event("shutdown")
async def shutdown():
    await engine.dispose()

class UserSchema(BaseModel):
    username: str
    password: str
    profile: str
    team_id: int

class TeamSchema(BaseModel):
    name: str

@app.post("/register")
async def register_user(user: UserSchema):
    async with async_session() as session:
        existing_user = await session.execute(User.__table__.select().where(User.username == user.username))
        if existing_user.scalar():
            return JSONResponse(status_code=400, content={"error": "Username already exists"})
        new_user = User(username=user.username, password=user.password, profile=user.profile, team_id=user.team_id)
        session.add(new_user)
        await session.commit()
        return JSONResponse(status_code=201, content={"message": "User created successfully"})

@app.post("/login")
async def login_user(username: str, password: str):
    async with async_session() as session:
        user = await session.execute(User.__table__.select().where(User.username == username))
        user = user.scalar()
        if not user or user.password != password:
            return JSONResponse(status_code=401, content={"error": "Invalid username or password"})
        return JSONResponse(status_code=200, content={"message": "Logged in successfully"})

@app.get("/teams/")
async def get_teams():
    async with async_session() as session:
        teams = await session.execute(Team.__table__.select())
        teams = teams.scalars().all()
        return JSONResponse(status_code=200, content=[{"id": team.id, "name": team.name} for team in teams])

@app.post("/teams/")
async def create_team(team: TeamSchema):
    async with async_session() as session:
        new_team = Team(name=team.name)
        session.add(new_team)
        await session.commit()
        return JSONResponse(status_code=201, content={"message": "Team created successfully"})

@app.get("/users/")
async def get_users():
    async with async_session() as session:
        users = await session.execute(User.__table__.select())
        users = users.scalars().all()
        return JSONResponse(status_code=200, content=[{"id": user.id, "username": user.username, "profile": user.profile} for user in users])

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    async with async_session() as session:
        user = await session.execute(User.__table__.select().where(User.id == user_id))
        user = user.scalar()
        if not user:
            return JSONResponse(status_code=404, content={"error": "User not found"})
        return JSONResponse(status_code=200, content={"username": user.username, "profile": user.profile, "team_id": user.team_id})

@app.put("/users/{user_id}")
async def update_user(user_id: int, user: UserSchema):
    async with async_session() as session:
        user_db = await session.execute(User.__table__.select().where(User.id == user_id))
        user_db = user_db.scalar()
        if not user_db:
            return JSONResponse(status_code=404, content={"error": "User not found"})
        user_db.username = user.username
        user_db.profile = user.profile
        user_db.team_id = user.team_id
        await session.commit()
        return JSONResponse(status_code=200, content={"message": "User updated successfully"})

@app.get("/search")
async def search_users(q: str):
    async with async_session() as session:
        users = await session.execute(User.__table__.select().where(User.profile.like(f"%{q}%")))
        users = users.scalars().all()
        return JSONResponse(status_code=200, content=[{"id": user.id, "username": user.username, "profile": user.profile} for user in users])