nbroad's picture
nbroad HF staff
labels for dates
edac3aa verified
import json
import os
from datetime import datetime, timezone, timedelta
from dateutil import parser as dateparser
import meilisearch
from fasthtml.common import *
from markdown import markdown
from dotenv import load_dotenv
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from contextlib import asynccontextmanager
from constants import MeilisearchIndexFields
from update import process_webhook, update_webhooks
loaded = load_dotenv("./.env", override=True)
print("Loaded .env file:", loaded)
MS_URL = os.getenv("MS_URL")
MS_SEARCH_KEY = os.getenv("MS_SEARCH_KEY")
ms_client = meilisearch.Client(MS_URL, MS_SEARCH_KEY)
css_content = open("styles.css").read()
@asynccontextmanager
async def lifespan(app):
# Setup
scheduler = BackgroundScheduler()
scheduler.add_job(update_webhooks, CronTrigger.from_crontab("0 */3 * * *"))
scheduler.start()
yield
# Cleanup
scheduler.shutdown()
# If running locally, don't use the lifespan
if os.getenv("SPACE_ID") is None:
lifespan = None
app, rt = fast_app(hdrs=(Style(css_content),), lifespan=lifespan)
md_exts = "codehilite", "smarty", "extra", "sane_lists"
def Markdown(s, exts=md_exts, **kw):
return Div(NotStr(markdown(s, extensions=exts)), **kw)
scroll_script = Script(
"""
document.addEventListener('DOMContentLoaded', function() {
var scrollButton = document.getElementById('scroll-top-btn');
window.onscroll = function() {
if (document.body.scrollTop > 20 || document.documentElement.scrollTop > 20) {
scrollButton.style.display = "block";
} else {
scrollButton.style.display = "none";
}
};
scrollButton.onclick = function() {
document.body.scrollTop = 0; // For Safari
document.documentElement.scrollTop = 0; // For Chrome, Firefox, IE and Opera
};
});
"""
)
def date_range_inputs(start_date, end_date):
return Div(
Div(
Label("Start date", for_="start_date"),
Input(
type="date",
name="start_date",
value=start_date.strftime("%Y-%m-%d"),
title="Start date",
),
cls="date-input",
),
Div(
Label("End date", for_="end_date"),
Input(
type="date",
name="end_date",
value=end_date.strftime("%Y-%m-%d"),
title="End date",
),
cls="date-input",
),
cls="date-range",
)
def search_form(start_date, end_date):
return Form(
Input(type="text", name="query", placeholder="Enter search query"),
date_range_inputs(start_date, end_date),
Button("Search", type="submit"),
hx_post="/search",
hx_target="#search-results",
hx_trigger="submit",
id="search-form",
)
def iso_to_unix_timestamp(iso_string):
dt = dateparser.isoparse(iso_string)
return int(dt.timestamp())
def unix_timestamp_to_nice_format(timestamp):
dt = datetime.fromtimestamp(timestamp, tz=timezone.utc)
return dt.strftime("%b %d, %Y at %H:%M UTC")
def make_query(query, start_date, end_date, page=1, limit=10):
twenty_three_hours_59_minutes_59_seconds_in_seconds = (23 * 60 + 59) * 60 + 59
after_timestamp = iso_to_unix_timestamp(start_date)
before_timestamp = (
iso_to_unix_timestamp(end_date)
+ twenty_three_hours_59_minutes_59_seconds_in_seconds
)
options = {
"limit": limit,
"offset": (page - 1) * limit,
"filter": f"{MeilisearchIndexFields.UPDATED_AT.value} >= {after_timestamp} AND {MeilisearchIndexFields.UPDATED_AT.value} < {before_timestamp}",
"attributesToCrop": [MeilisearchIndexFields.CONTENT.value],
"cropLength": 30,
"attributesToHighlight": [
MeilisearchIndexFields.CONTENT.value,
MeilisearchIndexFields.TITLE.value,
],
"highlightPreTag": '<span class="highlight">',
"highlightPostTag": "</span>",
"distinct": MeilisearchIndexFields.URL.value,
}
return ms_client.index(MeilisearchIndexFields.INDEX_NAME.value).search(
query=query, opt_params=options
)
def search_results(query, start_date, end_date, page=1):
raw_results = make_query(query, start_date, end_date, page)
return Div(
make_results_bar(raw_results),
Div(*[make_card(r) for r in raw_results["hits"]]),
make_pagination(page, raw_results["estimatedTotalHits"]),
id="search-results",
)
def make_results_bar(results):
processing_time = results["processingTimeMs"]
estimated_hits = results["estimatedTotalHits"]
return Div(
Div(f"Processing time: {processing_time}ms"),
Div(f"Estimated total hits: {estimated_hits}"),
cls="results-bar",
)
def make_card(result):
result = result["_formatted"]
url = result[MeilisearchIndexFields.URL.value]
date = unix_timestamp_to_nice_format(
int(result[MeilisearchIndexFields.UPDATED_AT.value])
)
return Div(
Div(
Strong(NotStr(result[MeilisearchIndexFields.TITLE.value])),
P(NotStr(result[MeilisearchIndexFields.CONTENT.value]), cls="comment-text"),
Div(Span(date)),
A(url, href=url, target="_blank"),
),
cls="card-item",
)
def make_pagination(current_page, total_hits, limit=10):
total_pages = -(-total_hits // limit) # Ceiling division
children = []
if current_page > 1:
children.append(
Button(
"Previous",
hx_post=f"/search?page={current_page-1}",
hx_target="#search-results",
hx_include="[name='query'], [name='start_date'], [name='end_date']",
)
)
children.append(Span(f"Page {current_page} of {total_pages}"))
if current_page < total_pages:
children.append(
Button(
"Next",
hx_post=f"/search?page={current_page+1}",
hx_target="#search-results",
hx_include="[name='query'], [name='start_date'], [name='end_date']",
)
)
return Div(*children, cls="pagination")
scroll_button = Button(
"Scroll to Top",
id="scroll-top-btn",
style="""
position: fixed;
bottom: 20px;
right: 20px;
display: none;
background-color: #007bff;
color: white;
border: none;
border-radius: 5px;
padding: 10px 15px;
cursor: pointer;
""",
)
@rt("/")
def get():
end_date = datetime.now()
start_date = end_date - timedelta(days=7)
return Titled(
"HF Discussion Search",
Div(
search_form(start_date, end_date),
Div(id="search-results"),
scroll_button,
scroll_script,
cls="container",
),
)
@rt("/search")
def post(query: str, start_date: str, end_date: str, page: int = 1):
return search_results(query, start_date, end_date, page)
@app.post("/webhook")
async def hf_webhook(request):
return await process_webhook(request)
serve()