jrno commited on
Commit
b571090
1 Parent(s): b55b671
recommendation-api/{custom_models.py → learner.py} RENAMED
@@ -1,3 +1,4 @@
 
1
  from fastai.tabular.all import *
2
 
3
  def create_params(size):
@@ -11,10 +12,15 @@ class DotProductBias(Module):
11
  self.item_factors = create_params([n_items, n_factors])
12
  self.item_bias = create_params([n_items])
13
  self.y_range = y_range
14
-
15
  def forward(self, x):
16
- users = self.user_factors[x[:,0]]
17
- items = self.item_factors[x[:,1]]
18
  res = (users * items).sum(dim=1)
19
- res += self.user_bias[x[:,0]] + self.item_bias[x[:,1]]
20
- return sigmoid_range(res, *self.y_range)
 
 
 
 
 
 
1
+ from fastai.collab import load_learner
2
  from fastai.tabular.all import *
3
 
4
  def create_params(size):
 
12
  self.item_factors = create_params([n_items, n_factors])
13
  self.item_bias = create_params([n_items])
14
  self.y_range = y_range
15
+
16
  def forward(self, x):
17
+ users = self.user_factors[x[:, 0]]
18
+ items = self.item_factors[x[:, 1]]
19
  res = (users * items).sum(dim=1)
20
+ res += self.user_bias[x[:, 0]] + self.item_bias[x[:, 1]]
21
+ return sigmoid_range(res, *self.y_range)
22
+
23
+ async def setup_learner(model_filename: str):
24
+ learn = load_learner(model_filename)
25
+ learn.dls.device = 'cpu'
26
+ return learn
recommendation-api/recommender.py CHANGED
@@ -1,21 +1,21 @@
1
  from fastai.learner import Learner
2
  import pandas as pd
3
 
4
- from tracks import get_unlistened_tracks_for_user, predictions_to_tracks, check_user_exists
5
 
6
  def get_recommendations_for_user(learn: Learner, user_id: str, limit: int = 5):
7
  not_listened_tracks = get_unlistened_tracks_for_user(user_id)
8
 
9
- # Get predictions for tracks
10
  input_dataframe = pd.DataFrame({'user_id': [user_id] * len(not_listened_tracks), 'entry': not_listened_tracks})
11
  test_dl = learn.dls.test_dl(input_dataframe)
12
  predictions = learn.get_preds(dl=test_dl)
13
 
14
- # Associate track with prediction and sort by score
15
  tracks_with_predictions = list(zip(not_listened_tracks, predictions[0].numpy()))
16
  tracks_with_predictions.sort(key=lambda x: x[1], reverse=True)
17
 
18
- # Convert predictions to full track entries with score
19
  recommendations = predictions_to_tracks(tracks_with_predictions[:limit])
20
 
21
  return {
 
1
  from fastai.learner import Learner
2
  import pandas as pd
3
 
4
+ from tracks import get_unlistened_tracks_for_user, predictions_to_tracks
5
 
6
  def get_recommendations_for_user(learn: Learner, user_id: str, limit: int = 5):
7
  not_listened_tracks = get_unlistened_tracks_for_user(user_id)
8
 
9
+ # Get predictions for the tracks user hasn't listened yet
10
  input_dataframe = pd.DataFrame({'user_id': [user_id] * len(not_listened_tracks), 'entry': not_listened_tracks})
11
  test_dl = learn.dls.test_dl(input_dataframe)
12
  predictions = learn.get_preds(dl=test_dl)
13
 
14
+ # Associate them with prediction score and sort
15
  tracks_with_predictions = list(zip(not_listened_tracks, predictions[0].numpy()))
16
  tracks_with_predictions.sort(key=lambda x: x[1], reverse=True)
17
 
18
+ # Pick n and return as full tracks
19
  recommendations = predictions_to_tracks(tracks_with_predictions[:limit])
20
 
21
  return {
recommendation-api/server.py CHANGED
@@ -1,43 +1,20 @@
1
- from fastai.collab import load_learner
2
  from fastapi import FastAPI, Query
3
- from fastapi.middleware.cors import CORSMiddleware
4
- from custom_models import DotProductBias
5
  import asyncio
6
  import uvicorn
7
  import os
8
 
9
- from tracks import get_top_tracks_for_user, get_users_with_track_interactions, check_user_exists
10
  from recommender import get_recommendations_for_user
 
11
 
12
- # Get the absolute path of the directory where the python file resides
13
- dir_path = os.path.dirname(os.path.realpath(__file__))
14
-
15
- # FastAPI app
16
  app = FastAPI()
17
-
18
- app.add_middleware(
19
- CORSMiddleware,
20
- allow_origins=["*"],
21
- allow_credentials=True,
22
- allow_methods=["*"],
23
- allow_headers=["*"],
24
- )
25
-
26
- # Model filename
27
- model_filename = os.path.join(dir_path, 'model.pkl')
28
-
29
- async def setup_learner():
30
- learn = load_learner(model_filename)
31
- learn.dls.device = 'cpu'
32
- return learn
33
-
34
  learn = None
 
35
  @app.on_event("startup")
36
  async def startup_event():
37
- """Setup the learner on server start"""
38
  global learn
39
- loop = asyncio.get_event_loop() # get event loop
40
- tasks = [asyncio.ensure_future(setup_learner())] # assign some task
41
  learn = (await asyncio.gather(*tasks))[0]
42
 
43
  @app.get("/users")
@@ -55,4 +32,3 @@ async def get_recommendations(user_id: str, limit: int = Query(5)):
55
 
56
  if __name__ == "__main__":
57
  uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 7860)))
58
-
 
 
1
  from fastapi import FastAPI, Query
 
 
2
  import asyncio
3
  import uvicorn
4
  import os
5
 
6
+ from tracks import get_top_tracks_for_user, get_users_with_track_interactions
7
  from recommender import get_recommendations_for_user
8
+ from learner import setup_learner, DotProductBias # Note that DotProductBias must be imported to global namespace
9
 
 
 
 
 
10
  app = FastAPI()
11
+ model_filename = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'model.pkl')
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  learn = None
13
+
14
  @app.on_event("startup")
15
  async def startup_event():
 
16
  global learn
17
+ tasks = [asyncio.ensure_future(setup_learner(model_filename))] # assign some task
 
18
  learn = (await asyncio.gather(*tasks))[0]
19
 
20
  @app.get("/users")
 
32
 
33
  if __name__ == "__main__":
34
  uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 7860)))
 
recommendation-api/tracks.py CHANGED
@@ -1,56 +1,44 @@
1
  import pandas as pd
 
 
 
2
 
3
  # Read the CSV files
4
- print("Reading tracks data from csv ...")
5
  tracks_df = pd.read_csv('data/music_info.csv')
 
 
6
  tracks_df.fillna('', inplace=True)
7
  tracks_df["entry"] = tracks_df["name"] + ", " + tracks_df["artist"] + ", " + tracks_df["year"].astype(str)
8
 
9
- print("Reading user listening history ...")
10
- track_interactions_df = pd.read_csv('data/user_listening_history_10k.csv', nrows=1000)
11
 
12
- # Merge the dataframes on 'track_id'
13
  dataframe = pd.merge(tracks_df, track_interactions_df, on='track_id', how='left')
14
-
15
- # Convert all NaN values to empty strings and all columns to string type
16
- # dataframe.fillna('', inplace=True)
17
  dataframe = dataframe.astype(str)
18
-
19
- # Group by 'user_id' and then create a list of dictionaries for each group
20
  user_to_track_history_dict = {user_id: group.drop('user_id', axis=1).to_dict('records')
21
  for user_id, group in dataframe.groupby('user_id')}
22
 
23
- def check_user_exists(user_id: str):
24
- if (user_id not in user_to_track_history_dict):
25
- raise ValueError(f"User {user_id} not found")
26
-
27
  def get_users_with_track_interactions(ascending=False, limit=10):
28
- # Count the number of rows for each 'user_id'
29
  playcount_summary = track_interactions_df.groupby('user_id').size().reset_index(name='track_interactions')
30
- # Sort the DataFrame based on 'track_interactions', either ascending or descending
31
  playcount_summary.sort_values(by='track_interactions', ascending=ascending, inplace=True)
32
- # Limit the results if limit is specified
33
  if limit is not None:
34
  playcount_summary = playcount_summary.head(limit)
35
- # Convert the DataFrame to a list of dictionaries
36
  return playcount_summary.to_dict(orient='records')
37
 
38
- def get_top_tracks_for_user(user_id: str, limit=20):
39
- # Retrieve the user's track list from the lookup table or an empty list if not found
40
  track_list = user_to_track_history_dict.get(user_id, [])
41
- # Sort the track list by 'playcount' in descending order (assuming 'playcount' is stored as a string)
42
  sorted_tracks = sorted(track_list, key=lambda x: int(x['playcount']) if 'playcount' in x and x['playcount'].isdigit() else 0, reverse=True)
43
- # Apply the limit if specified
44
  if limit is not None:
45
  sorted_tracks = sorted_tracks[:limit]
46
  return sorted_tracks
47
 
48
  def get_unlistened_tracks_for_user(user_id:str):
49
- # Get all tracks
50
  all_tracks = tracks_df['entry'].tolist()
51
- # Get tracks user has listened to
52
  listened_tracks = [track['entry'] for track in user_to_track_history_dict.get(user_id, [])]
53
- # Get unlistened tracks
54
  return list(set(all_tracks) - set(listened_tracks))
55
 
56
  def predictions_to_tracks(entries_and_predictions):
 
1
  import pandas as pd
2
+ import logging
3
+
4
+ logger = logging.getLogger(__name__)
5
 
6
  # Read the CSV files
7
+ logger.info("Reading tracks data from csv ...")
8
  tracks_df = pd.read_csv('data/music_info.csv')
9
+
10
+ # Remove NaN's from data and construct concatenated format (as with trained model)
11
  tracks_df.fillna('', inplace=True)
12
  tracks_df["entry"] = tracks_df["name"] + ", " + tracks_df["artist"] + ", " + tracks_df["year"].astype(str)
13
 
14
+ logger.info("Reading user listening history from csv ...")
15
+ track_interactions_df = pd.read_csv('data/user_listening_history_10k.csv')
16
 
17
+ # Merge data on those two csvs
18
  dataframe = pd.merge(tracks_df, track_interactions_df, on='track_id', how='left')
19
+ # Convert all columns to string type
 
 
20
  dataframe = dataframe.astype(str)
21
+ # Create a history lookup dictionary by 'user_id'
 
22
  user_to_track_history_dict = {user_id: group.drop('user_id', axis=1).to_dict('records')
23
  for user_id, group in dataframe.groupby('user_id')}
24
 
 
 
 
 
25
  def get_users_with_track_interactions(ascending=False, limit=10):
 
26
  playcount_summary = track_interactions_df.groupby('user_id').size().reset_index(name='track_interactions')
 
27
  playcount_summary.sort_values(by='track_interactions', ascending=ascending, inplace=True)
 
28
  if limit is not None:
29
  playcount_summary = playcount_summary.head(limit)
 
30
  return playcount_summary.to_dict(orient='records')
31
 
32
+ def get_top_tracks_for_user(user_id: str, limit=10):
 
33
  track_list = user_to_track_history_dict.get(user_id, [])
 
34
  sorted_tracks = sorted(track_list, key=lambda x: int(x['playcount']) if 'playcount' in x and x['playcount'].isdigit() else 0, reverse=True)
 
35
  if limit is not None:
36
  sorted_tracks = sorted_tracks[:limit]
37
  return sorted_tracks
38
 
39
  def get_unlistened_tracks_for_user(user_id:str):
 
40
  all_tracks = tracks_df['entry'].tolist()
 
41
  listened_tracks = [track['entry'] for track in user_to_track_history_dict.get(user_id, [])]
 
42
  return list(set(all_tracks) - set(listened_tracks))
43
 
44
  def predictions_to_tracks(entries_and_predictions):