import streamlit as st import requests from bs4 import BeautifulSoup import concurrent.futures import pandas as pd from io import BytesIO from pyxlsb import open_workbook as open_xlsb if 'df' not in st.session_state: st.session_state['df'] = None if 'is_df' not in st.session_state: st.session_state['is_df'] = False headers = { 'authority': 'cdn.jwplayer.com', 'accept': '*/*', 'accept-language': 'en-US,en;q=0.5', 'origin': 'https://hotcopper.com.au', 'referer': 'https://hotcopper.com.au/', 'sec-ch-ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Brave";v="122"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'cross-site', 'sec-gpc': '1', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36', } cookies = { 'xf_show_post_view': '0', 'xf_threads_terms_conditions_pop': '1', 'hc_user_tracker': 'ZAXlB7LEHd0eT6X4QQfAOFkXms373LrE', 'xf_user': '971779%2C5e49c37ac9fa56c1a2798923703d4978b9f8427d', 'xf_session': '16mn22etenrnkf7aimqacdu026', } post_links = [] post_data = { 'username': [], 'number_of_posts_by_user': [], 'number_of_great_analysis_for_user': [], 'post_date': [], 'post_time': [], 'post_id': [], 'number_of_upvotes': [], 'stock_pill': [], 'stock_pill_link': [], 'price_at_posting': [], 'sentiment': [], 'disclosure': [], 'message': [], 'reply_post_id': [], 'reply_post_url': [] } def get_number_of_posts(company_code, max_page_count=999999999): url = f'https://hotcopper.com.au/asx/{company_code}/discussion/page-{max_page_count**3}' response = requests.get(url, headers=headers, cookies=cookies) number_of_posts = 0 if url != response.url: number_of_posts = int(response.url.split('-')[-1]) else: return get_number_of_posts(company_code, max_page_count*3) return number_of_posts def get_all_posts(url): global post_links response = requests.get(url, headers=headers, cookies=cookies) soup = BeautifulSoup(response.text, 'html.parser') posts = [f"https://hotcopper.com.au{post['href']}" for post in soup.find_all('a', class_='subject-a')] post_links.extend(posts) return posts def get_post(url): response = requests.get(url, headers=headers, cookies=cookies) soup = BeautifulSoup(response.text, 'html.parser') username = soup.find('div', class_='user-username').text.strip() number_of_posts_by_user = soup.find('div', class_='user-post-num').text.replace(',','').replace('Posts.', '').strip() try: number_of_great_analysis_for_user = soup.find('div', class_='user-ga-count').text.replace(',','').replace('lightbulb Created with Sketch.','').strip() except: number_of_great_analysis_for_user = 0 post_date = soup.find('div', class_='post-metadata-date').text.replace('Posted:','').strip() post_time = soup.find('div', class_='post-metadata-time').text.replace('Time:', '').strip() post_id = url.split('=')[-1] try: number_of_upvotes = soup.find('div', class_='votes-num has-not-voted').text.strip() except: number_of_upvotes = 0 stock_pill = soup.find('span', class_='stock-pill').text.strip() stock_pill_link = f"https://hotcopper.com.au{soup.find('span', class_='stock-pill').find('a')['href']}" for meta_detail in soup.find_all('span', class_='meta-details'): if 'Price at posting:' in meta_detail.text: price_at_posting = meta_detail.text.replace('Price at posting:', '').strip() if 'Sentiment:' in meta_detail.text: sentiment = meta_detail.text.replace('Sentiment:', '').strip() if 'Disclosure' in meta_detail.text: disclosure = meta_detail.text.replace('Disclosure:', '').strip() message = soup.find('blockquote', class_='message-text ugc baseHtml').get_text(strip=True) if '↑' in message: message = message.split('↑')[1] if soup.find('a', class_='AttributionLink') is not None: reply_post_id = soup.find('a', class_='AttributionLink')['data-hash'] reply_post_url = f"https://hotcopper.com.au/{soup.find('a', class_='AttributionLink')['href']}" else: reply_post_id = None reply_post_url = None post_data['username'].append(username) post_data['number_of_posts_by_user'].append(number_of_posts_by_user) post_data['number_of_great_analysis_for_user'].append(number_of_great_analysis_for_user) post_data['post_date'].append(post_date) post_data['post_time'].append(post_time) post_data['post_id'].append(post_id) post_data['number_of_upvotes'].append(number_of_upvotes) post_data['stock_pill'].append(stock_pill) post_data['stock_pill_link'].append(stock_pill_link) post_data['price_at_posting'].append(price_at_posting) post_data['sentiment'].append(sentiment) post_data['disclosure'].append(disclosure) post_data['message'].append(message) post_data['reply_post_id'].append(reply_post_id) post_data['reply_post_url'].append(reply_post_url) return post_data @st.cache_data def convert_df(df: pd.DataFrame): return df.to_excel('',index=False, engine='openpyxl', sheet_name='Sheet1') def to_excel(df): output = BytesIO() writer = pd.ExcelWriter(output, engine='xlsxwriter') df.to_excel(writer, index=False, sheet_name='Sheet1') writer.save() processed_data = output.getvalue() return processed_data def track_progress(futures, total, message): progress_bar = st.empty() completed = 0 for future in concurrent.futures.as_completed(futures): completed += 1 progress_bar.progress(completed / total, f'Scraped {completed}/{total} {message}...') st.title('Thread Scraper') st.sidebar.title('Settings') company_code = st.sidebar.text_input('Company Code', 'ZIP').lower() scrape = st.sidebar.button('Scrape') if st.session_state['is_df']: with st.spinner('Creating database ..'): df = pd.DataFrame(post_data) csv = to_excel(df) st.download_button( label="Download Data", data=to_excel(st.session_state['df']), file_name=f'{company_code.upper()}.xlsx', mime='application/vnd.ms-excel', ) if scrape: with st.spinner('Getting number of post pages to scrape ..'): number_of_posts = get_number_of_posts(company_code) with st.spinner('Getting all post links ..'): pages = [f'https://hotcopper.com.au/asx/{company_code}/discussion/page-{page_number}' for page_number in range(1, get_number_of_posts(company_code)+1)] with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: futures = [executor.submit(get_all_posts, url) for url in pages] track_progress(futures, len(pages), 'post pages' ) with st.spinner('Getting all post data ..'): with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: futures = [executor.submit(get_post, url) for url in post_links] track_progress(futures, len(post_links), 'posts') st.session_state['df'] = pd.DataFrame(post_data) st.session_state['is_df'] = True