File size: 3,380 Bytes
0953f7c 565441d 0953f7c 565441d 0953f7c 565441d 0953f7c 565441d 0953f7c 565441d 0953f7c 565441d 0953f7c 565441d 0953f7c 565441d 0953f7c 565441d 0953f7c 565441d 0953f7c 565441d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
from fastapi import FastAPI, UploadFile, File, HTTPException, Depends
from fastapi.responses import FileResponse
from fastapi.security import OAuth2PasswordBearer
from pathlib import Path
import psutil
import shutil
import os
app = FastAPI()
# Directory where files will be uploaded
UPLOAD_DIRECTORY = Path("uploads")
UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True)
# OAuth2 scheme for token-based authorization
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Hugging Face secret token from environment variables
VALID_TOKEN = os.environ.get("SECRET_TOKEN")
# Helper function to calculate the size of a directory
def get_directory_size(directory: Path) -> int:
total_size = 0
for dirpath, dirnames, filenames in os.walk(directory):
for f in filenames:
fp = os.path.join(dirpath, f)
total_size += os.path.getsize(fp)
return total_size
# Dependency to verify the token
def get_current_token(token: str = Depends(oauth2_scheme)):
if token != VALID_TOKEN:
raise HTTPException(status_code=401, detail="Invalid token")
return token
# Health check endpoint
@app.get("/health")
def health_check(token: str = Depends(get_current_token)):
return {
"status": "healthy"
}
# System metrics endpoint: CPU, RAM, and disk usage for uploads folder (in GB and percentage of 50GB limit)
@app.get("/metrics")
def get_metrics(token: str = Depends(get_current_token)):
# CPU percentage (rounded to the nearest whole number)
cpu_percent = round(psutil.cpu_percent(interval=1))
# Memory stats in GB
memory = psutil.virtual_memory()
memory_total_gb = memory.total / (1024 ** 3)
memory_available_gb = memory.available / (1024 ** 3)
# Disk stats for uploads directory
uploads_size_bytes = get_directory_size(UPLOAD_DIRECTORY)
uploads_size_gb = uploads_size_bytes / (1024 ** 3)
uploads_percent = (uploads_size_gb / 50) * 100 # Assuming 50GB total space limit
return {
"cpu_percent": cpu_percent, # Rounded CPU percentage
"memory": {
"total_gb": round(memory_total_gb, 2),
"available_gb": round(memory_available_gb, 2),
"percent": memory.percent
},
"disk": {
"uploads_folder_size_gb": round(uploads_size_gb, 2),
"uploads_usage_percent_of_50gb": round(uploads_percent, 2),
"total_space_gb": 50
}
}
# File upload endpoint
@app.post("/uploadfile/")
async def upload_file(file: UploadFile = File(...), token: str = Depends(get_current_token)):
file_location = UPLOAD_DIRECTORY / file.filename
with file_location.open("wb") as buffer:
shutil.copyfileobj(file.file, buffer)
return {"info": f"file '{file.filename}' saved at '{file_location}'"}
# File download endpoint
@app.get("/downloadfile/{filename}")
def download_file(filename: str, token: str = Depends(get_current_token)):
file_location = UPLOAD_DIRECTORY / filename
if not file_location.exists():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(path=file_location, filename=filename)
# Show current files endpoint
@app.get("/files/")
def list_files(token: str = Depends(get_current_token)):
files = [f for f in os.listdir(UPLOAD_DIRECTORY) if os.path.isfile(UPLOAD_DIRECTORY / f)]
return {"files": files}
|