import pandas as pd # Read the CSV files print("Reading music info csv ...") tracks_df = pd.read_csv('data/music_info.csv') print("Reading user listening history ...") track_interactions_df = pd.read_csv('data/user_listening_history_10k.csv', nrows=1000) # Merge the dataframes on 'track_id' dataframe = pd.merge(tracks_df, track_interactions_df, on='track_id', how='left') # Convert all NaN values to empty strings and all columns to string type dataframe.fillna('', inplace=True) dataframe = dataframe.astype(str) # Group by 'user_id' and then create a list of dictionaries for each group lookup_table = {user_id: group.drop('user_id', axis=1).to_dict('records') for user_id, group in dataframe.groupby('user_id')} def get_users_with_track_interactions(ascending=False, limit=10): # Count the number of rows for each 'user_id' playcount_summary = track_interactions_df.groupby('user_id').size().reset_index(name='track_interactions') # Sort the DataFrame based on 'track_interactions', either ascending or descending playcount_summary.sort_values(by='track_interactions', ascending=ascending, inplace=True) # Limit the results if limit is specified if limit is not None: playcount_summary = playcount_summary.head(limit) # Convert the DataFrame to a list of dictionaries return playcount_summary.to_dict(orient='records') def get_top_tracks_for_user(user_id: str, limit=20): # Retrieve the user's track list from the lookup table or an empty list if not found track_list = lookup_table.get(user_id, []) # Sort the track list by 'playcount' in descending order (assuming 'playcount' is stored as a string) sorted_tracks = sorted(track_list, key=lambda x: int(x['playcount']) if 'playcount' in x and x['playcount'].isdigit() else 0, reverse=True) # Apply the limit if specified if limit is not None: sorted_tracks = sorted_tracks[:limit] return sorted_tracks