import datetime import requests import csv from io import StringIO import time from requests.adapters import HTTPAdapter from requests.packages.urllib3.util.retry import Retry from datasets import Dataset class PBSPublicDataAPIClient: def __init__(self, subscription_key, base_url='https://data-api.health.gov.au/pbs/api/v3', rate_limit=0.2): self.subscription_key = subscription_key self.base_url = base_url self.rate_limit = rate_limit # Requests per second self.last_request_time = 0 # Set up a session with retry strategy self.session = requests.Session() retries = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) self.session.mount('https://', HTTPAdapter(max_retries=retries)) def get_sample_data(self, endpoint, limit=5): params = {"limit": limit} response = self.make_request(endpoint, params=params, accept="text/csv") csv_content = StringIO(response.text) return list(csv.DictReader(csv_content)) def fetch_sample_data(self): schedules = self.get_schedules() latest_schedule = schedules[0]['schedule_code'] endpoints = [ "amt-items", "atc-codes", "indications", "prescribing-texts", "item-prescribing-text-relationships", "restrictions", "item-restriction-relationships" ] sample_data = {} for endpoint in endpoints: print(f"Fetching sample data from /{endpoint}...") data = self.get_sample_data(endpoint) if data: sample_data[endpoint] = data print(f"Sample keys for {endpoint}: {data[0].keys()}") else: print(f"No data found for {endpoint}") time.sleep(2) # Wait 2 seconds between requests to avoid rate limiting return sample_data def get_raw_data(self, endpoint, params=None, accept="application/json"): response = self.make_request(endpoint, params=params, accept=accept) return response.text def make_request(self, endpoint, params=None, accept="application/json"): url = f"{self.base_url}/{endpoint}" headers = { "subscription-key": self.subscription_key, "Accept": accept } while True: current_time = time.time() time_since_last_request = current_time - self.last_request_time if time_since_last_request < 1 / self.rate_limit: sleep_time = (1 / self.rate_limit) - time_since_last_request time.sleep(sleep_time) try: response = self.session.get(url, headers=headers, params=params) self.last_request_time = time.time() if response.status_code == 429: retry_after = int(response.headers.get('Retry-After', 60)) print(f"Rate limit exceeded. Waiting for {retry_after} seconds.") time.sleep(retry_after) continue response.raise_for_status() return response except requests.exceptions.RequestException as e: print(f"Request failed: {str(e)}. Retrying in 5 seconds...") time.sleep(5) def get_schedules(self, limit=100): endpoint = "schedules" params = {"limit": limit} response = self.make_request(endpoint, params=params) json_data = response.json() return json_data['data'] def get_amt_items(self, schedule_code, limit=100000): endpoint = "amt-items" params = { "schedule_code": schedule_code, "limit": limit } response = self.make_request(endpoint, params=params, accept="text/csv") csv_content = StringIO(response.text) return list(csv.DictReader(csv_content)) def get_atc_codes(self, schedule_code, limit=100000): endpoint = "atc-codes" params = { "schedule_code": schedule_code, "limit": limit } response = self.make_request(endpoint, params=params, accept="text/csv") csv_content = StringIO(response.text) return list(csv.DictReader(csv_content)) def get_indications(self, schedule_code, limit=100000): endpoint = "indications" params = { "schedule_code": schedule_code, "limit": limit } response = self.make_request(endpoint, params=params, accept="text/csv") csv_content = StringIO(response.text) return list(csv.DictReader(csv_content)) def get_prescribing_texts(self, schedule_code, limit=100000): endpoint = "prescribing-texts" params = { "schedule_code": schedule_code, "limit": limit } response = self.make_request(endpoint, params=params, accept="text/csv") csv_content = StringIO(response.text) return list(csv.DictReader(csv_content)) def get_item_prescribing_text_relationships(self, schedule_code, limit=100000): endpoint = "item-prescribing-text-relationships" params = { "schedule_code": schedule_code, "limit": limit } response = self.make_request(endpoint, params=params, accept="text/csv") csv_content = StringIO(response.text) return list(csv.DictReader(csv_content)) def get_restrictions(self, schedule_code, limit=100000): endpoint = "restrictions" params = { "schedule_code": schedule_code, "limit": limit } response = self.make_request(endpoint, params=params, accept="text/csv") csv_content = StringIO(response.text) return list(csv.DictReader(csv_content)) def get_item_restriction_relationships(self, schedule_code, limit=100000): endpoint = "item-restriction-relationships" params = { "schedule_code": schedule_code, "limit": limit } response = self.make_request(endpoint, params=params, accept="text/csv") csv_content = StringIO(response.text) return list(csv.DictReader(csv_content)) def get_restriction_prescribing_text_relationships(self, schedule_code, limit=100000): endpoint = "restriction-prescribing-text-relationships" params = { "schedule_code": schedule_code, "limit": limit } response = self.make_request(endpoint, params=params, accept="text/csv") csv_content = StringIO(response.text) return list(csv.DictReader(csv_content)) def get_items(self, schedule_code, limit=100000): endpoint = "items" params = { "schedule_code": schedule_code, "limit": limit } response = self.make_request(endpoint, params=params, accept="text/csv") csv_content = StringIO(response.text) return list(csv.DictReader(csv_content)) def fetch_rheumatology_biologics_data(self): biologics = [ "adalimumab", "etanercept", "infliximab", "certolizumab", "golimumab", "rituximab", "abatacept", "tocilizumab", "secukinumab", "ixekizumab", "ustekinumab", "guselkumab", "tofacitinib", "baricitinib", "secukinumab", "upadacitinib", "anifrolumab", "bimekizumab", "avacopan" ] rheumatic_diseases = [ "rheumatoid arthritis", "psoriatic arthritis", "ankylosing spondylitis", "non-radiographic axial spondyloarthritis", "giant cell arteritis", "juvenile idiopathic arthritis", "systemic lupus erythematosus", "Anti-neutrophil cytoplasmic autoantibody (ANCA) associated vasculitis" ] data = {} schedules = self.get_schedules() # Select schedule based on current month current_date = datetime.datetime.now() current_schedule = next( (s for s in schedules if s['effective_year'] == current_date.year and s['effective_month'] == current_date.strftime('%B').upper()), schedules[0] # fallback to the most recent schedule if no match ) latest_schedule = current_schedule['schedule_code'] schedule_year = current_schedule['effective_year'] schedule_month = current_schedule['effective_month'] print(f"Selected schedule: {latest_schedule} (Effective: {current_schedule['effective_date']})") print("Fetching items...") items = self.get_items(latest_schedule) time.sleep(5) print("Fetching indications...") indications = self.get_indications(latest_schedule) print(f"Number of indications fetched: {len(indications)}") print("Sample of raw indications data:") for indication in indications[:5]: print(indication) time.sleep(5) print("Fetching prescribing texts...") prescribing_texts = self.get_prescribing_texts(latest_schedule) time.sleep(5) print("Fetching item-prescribing-text relationships...") item_prescribing_text_relationships = self.get_item_prescribing_text_relationships(latest_schedule) time.sleep(5) print("Fetching restrictions...") restrictions = self.get_restrictions(latest_schedule) time.sleep(5) print("Fetching item-restriction relationships...") item_restriction_relationships = self.get_item_restriction_relationships(latest_schedule) print("Fetching restriction-prescribing-text relationships...") restriction_prescribing_text_relationships = self.get_restriction_prescribing_text_relationships(latest_schedule) print(f"Number of restriction-prescribing-text relationships fetched: {len(restriction_prescribing_text_relationships)}") time.sleep(5) # Create lookup dictionaries prescribing_text_lookup = {text['prescribing_txt_id']: text for text in prescribing_texts if 'prescribing_txt_id' in text} restriction_lookup = {res['res_code']: res for res in restrictions if 'res_code' in res} # Create indication lookup indication_lookup = {} for ind in indications: # Print all keys in the first indication to see available fields if not indication_lookup: print("Keys in indication data:", ind.keys()) # Try different possible keys for the prescribing text ID prescribing_text_id = ind.get('prescribing_text_id') or ind.get('indication_prescribing_txt_id') or ind.get('prescribing_txt_id') if prescribing_text_id: indication_lookup[prescribing_text_id] = ind print(f"Number of items in indication_lookup: {len(indication_lookup)}") print("Sample of indication_lookup:") for key, value in list(indication_lookup.items())[:5]: print(f" {key}: {value}") # Create a lookup for item-prescribing-text relationships item_prescribing_text_lookup = {} for relationship in item_prescribing_text_relationships: pbs_code = relationship.get('pbs_code') prescribing_txt_id = relationship.get('prescribing_txt_id') if pbs_code and prescribing_txt_id: if pbs_code not in item_prescribing_text_lookup: item_prescribing_text_lookup[pbs_code] = [] item_prescribing_text_lookup[pbs_code].append(prescribing_txt_id) # Create a lookup for restriction-prescribing-text relationships restriction_prescribing_text_lookup = {} print("\nDebugging restriction-prescribing-text relationships:") print("Full structure of first 5 relationships:") for relationship in restriction_prescribing_text_relationships[:5]: print(relationship) for relationship in restriction_prescribing_text_relationships: res_code = relationship.get('res_code') prescribing_text_id = relationship.get('prescribing_text_id') if res_code and prescribing_text_id: if res_code not in restriction_prescribing_text_lookup: restriction_prescribing_text_lookup[res_code] = [] restriction_prescribing_text_lookup[res_code].append(prescribing_text_id) print(f"Number of items in restriction_prescribing_text_lookup: {len(restriction_prescribing_text_lookup)}") print("Sample of restriction_prescribing_text_lookup:") for key, value in list(restriction_prescribing_text_lookup.items())[:5]: print(f" {key}: {value}") print("Debugging: Inspecting lookups") print(f"Number of items in prescribing_text_lookup: {len(prescribing_text_lookup)}") print(f"Number of items in restriction_lookup: {len(restriction_lookup)}") print(f"Number of items in indication_lookup: {len(indication_lookup)}") print(f"Number of items in item_prescribing_text_lookup: {len(item_prescribing_text_lookup)}") print(f"Number of items in restriction_prescribing_text_lookup: {len(restriction_prescribing_text_lookup)}") def classify_formulation(description): # Define keywords for each formulation type tablet_keywords = ['Tablet'] pen_keywords = ['pen', 'auto-injector', 'autoinjector'] syringe_keywords = ['syringe'] infusion_keywords = ['I.V. infusion', 'Concentrate for injection'] # Normalize the description to lowercase for case-insensitive matching desc_lower = description.lower() # Check for keywords and return the corresponding formulation type if any(keyword.lower() in desc_lower for keyword in tablet_keywords): return 'tablet' elif any(keyword.lower() in desc_lower for keyword in pen_keywords): return 'subcut pen' elif any(keyword.lower() in desc_lower for keyword in syringe_keywords): return 'subcut syringe' elif any(keyword.lower() in desc_lower for keyword in infusion_keywords): return 'infusion' else: return 'unknown' # For cases that don't match any category def classify_hospital_type(program_code): if program_code == 'HS': return 'Private' elif program_code == 'HB': return 'Public' else: return 'Any' for item in items: if any(biologic.lower() in item['drug_name'].lower() for biologic in biologics): pbs_code = item['pbs_code'] if pbs_code not in data: data[pbs_code] = { "schedule_code": latest_schedule, "schedule_year": schedule_year, "schedule_month": schedule_month, "name": item['drug_name'], "brands": [], # Change this to a list "formulation": classify_formulation(item['li_form']), "li_form": item['li_form'], "schedule_form": item['schedule_form'], "manner_of_administration": item['manner_of_administration'], "maximum_quantity": item['maximum_quantity_units'], "number_of_repeats": item['number_of_repeats'], "hospital_type": classify_hospital_type(item['program_code']), "restrictions": [] } # Append the brand name if it's not already in the list if item['brand_name'] not in data[pbs_code]['brands']: data[pbs_code]['brands'].append(item['brand_name']) for pbs_code in list(data.keys()): for relationship in item_restriction_relationships: if relationship.get('pbs_code') == pbs_code: res_code = relationship.get('res_code') restriction = restriction_lookup.get(res_code) if restriction: prescribing_text_ids = restriction_prescribing_text_lookup.get(res_code, []) for prescribing_text_id in prescribing_text_ids: indication = indication_lookup.get(prescribing_text_id) if indication: condition = indication.get('condition', '').lower() found_indication = next((disease for disease in rheumatic_diseases if disease.lower() in condition), None) if found_indication: restriction_data = { 'res_code': res_code, 'indications': found_indication, 'treatment_phase': restriction.get('treatment_phase', ''), 'restriction_text': restriction.get('li_html_text', ''), 'authority_method': restriction.get('authority_method', ''), 'streamlined_code': restriction.get('treatment_of_code') if restriction.get('authority_method') == "STREAMLINED" else None, 'online_application': "HOBART TAS 7001" not in restriction.get('schedule_html_text', '') } data[pbs_code]['restrictions'].append(restriction_data) break # Stop after finding the first matching indication # Drop entries if restrictions are empty data = {k: v for k, v in data.items() if v['restrictions']} return data def preprocess_data(self, data): processed = { 'combinations': [] } for pbs_code, item in data.items(): for restriction in item['restrictions']: for brand in item['brands']: processed['combinations'].append({ 'pbs_code': pbs_code, 'drug': item['name'], 'brand': brand, 'formulation': item['li_form'], 'indication': restriction['indications'], 'treatment_phase': restriction['treatment_phase'], 'streamlined_code': restriction['streamlined_code'], 'online_application': restriction['online_application'], 'authority_method': restriction['authority_method'], 'hospital_type': item['hospital_type'], 'schedule_code': item['schedule_code'], 'schedule_year': item['schedule_year'], 'schedule_month': item['schedule_month'] }) return processed def save_data_to_hf(self, data, hf_token, dataset_name="cmcmaster/rheumatology-biologics-dataset"): processed_data = self.preprocess_data(data) # Create a Dataset from the combinations dataset = Dataset.from_list(processed_data['combinations']) # Push the dataset to the Hugging Face Hub dataset.push_to_hub(dataset_name, token=hf_token) print(f"Data saved to Hugging Face Hub: {dataset_name}") def main(): client = PBSPublicDataAPIClient("2384af7c667342ceb5a736fe29f1dc6b", rate_limit=0.2) try: print("Fetching data on biologics used for rheumatological diseases...") data = client.fetch_rheumatology_biologics_data() print(f"Data fetched for {len(data)} items.") client.save_data_to_hf(data) print("Data saved to Hugging Face Hub") except Exception as e: print(f"An error occurred: {str(e)}") if __name__ == "__main__": main()