Spaces:
Running
Running
import requests | |
import csv | |
from io import StringIO | |
import json | |
import time | |
from requests.adapters import HTTPAdapter | |
from requests.packages.urllib3.util.retry import Retry | |
import sqlite3 | |
import datetime | |
import os | |
class PBSPublicDataAPIClient: | |
def __init__(self, subscription_key, base_url='https://data-api.health.gov.au/pbs/api/v3', rate_limit=0.2): | |
self.subscription_key = subscription_key | |
self.base_url = base_url | |
self.rate_limit = rate_limit # Requests per second | |
self.last_request_time = 0 | |
# Set up a session with retry strategy | |
self.session = requests.Session() | |
retries = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) | |
self.session.mount('https://', HTTPAdapter(max_retries=retries)) | |
def get_sample_data(self, endpoint, limit=5): | |
params = {"limit": limit} | |
response = self.make_request(endpoint, params=params, accept="text/csv") | |
csv_content = StringIO(response.text) | |
return list(csv.DictReader(csv_content)) | |
def fetch_sample_data(self): | |
schedules = self.get_schedules() | |
latest_schedule = schedules[0]['schedule_code'] | |
endpoints = [ | |
"amt-items", | |
"atc-codes", | |
"indications", | |
"prescribing-texts", | |
"item-prescribing-text-relationships", | |
"restrictions", | |
"item-restriction-relationships" | |
] | |
sample_data = {} | |
for endpoint in endpoints: | |
print(f"Fetching sample data from /{endpoint}...") | |
data = self.get_sample_data(endpoint) | |
if data: | |
sample_data[endpoint] = data | |
print(f"Sample keys for {endpoint}: {data[0].keys()}") | |
else: | |
print(f"No data found for {endpoint}") | |
time.sleep(2) # Wait 2 seconds between requests to avoid rate limiting | |
return sample_data | |
def get_raw_data(self, endpoint, params=None, accept="application/json"): | |
response = self.make_request(endpoint, params=params, accept=accept) | |
return response.text | |
def make_request(self, endpoint, params=None, accept="application/json"): | |
url = f"{self.base_url}/{endpoint}" | |
headers = { | |
"subscription-key": self.subscription_key, | |
"Accept": accept | |
} | |
while True: | |
current_time = time.time() | |
time_since_last_request = current_time - self.last_request_time | |
if time_since_last_request < 1 / self.rate_limit: | |
sleep_time = (1 / self.rate_limit) - time_since_last_request | |
time.sleep(sleep_time) | |
try: | |
response = self.session.get(url, headers=headers, params=params) | |
self.last_request_time = time.time() | |
if response.status_code == 429: | |
retry_after = int(response.headers.get('Retry-After', 60)) | |
print(f"Rate limit exceeded. Waiting for {retry_after} seconds.") | |
time.sleep(retry_after) | |
continue | |
response.raise_for_status() | |
return response | |
except requests.exceptions.RequestException as e: | |
print(f"Request failed: {str(e)}. Retrying in 5 seconds...") | |
time.sleep(5) | |
def get_schedules(self, limit=100): | |
endpoint = "schedules" | |
params = {"limit": limit} | |
response = self.make_request(endpoint, params=params) | |
json_data = response.json() | |
return json_data['data'] | |
def get_amt_items(self, schedule_code, limit=100000): | |
endpoint = "amt-items" | |
params = { | |
"schedule_code": schedule_code, | |
"limit": limit | |
} | |
response = self.make_request(endpoint, params=params, accept="text/csv") | |
csv_content = StringIO(response.text) | |
return list(csv.DictReader(csv_content)) | |
def get_atc_codes(self, schedule_code, limit=100000): | |
endpoint = "atc-codes" | |
params = { | |
"schedule_code": schedule_code, | |
"limit": limit | |
} | |
response = self.make_request(endpoint, params=params, accept="text/csv") | |
csv_content = StringIO(response.text) | |
return list(csv.DictReader(csv_content)) | |
def get_indications(self, schedule_code, limit=100000): | |
endpoint = "indications" | |
params = { | |
"schedule_code": schedule_code, | |
"limit": limit | |
} | |
response = self.make_request(endpoint, params=params, accept="text/csv") | |
csv_content = StringIO(response.text) | |
return list(csv.DictReader(csv_content)) | |
def get_prescribing_texts(self, schedule_code, limit=100000): | |
endpoint = "prescribing-texts" | |
params = { | |
"schedule_code": schedule_code, | |
"limit": limit | |
} | |
response = self.make_request(endpoint, params=params, accept="text/csv") | |
csv_content = StringIO(response.text) | |
return list(csv.DictReader(csv_content)) | |
def get_item_prescribing_text_relationships(self, schedule_code, limit=100000): | |
endpoint = "item-prescribing-text-relationships" | |
params = { | |
"schedule_code": schedule_code, | |
"limit": limit | |
} | |
response = self.make_request(endpoint, params=params, accept="text/csv") | |
csv_content = StringIO(response.text) | |
return list(csv.DictReader(csv_content)) | |
def get_restrictions(self, schedule_code, limit=100000): | |
endpoint = "restrictions" | |
params = { | |
"schedule_code": schedule_code, | |
"limit": limit | |
} | |
response = self.make_request(endpoint, params=params, accept="text/csv") | |
csv_content = StringIO(response.text) | |
return list(csv.DictReader(csv_content)) | |
def get_item_restriction_relationships(self, schedule_code, limit=100000): | |
endpoint = "item-restriction-relationships" | |
params = { | |
"schedule_code": schedule_code, | |
"limit": limit | |
} | |
response = self.make_request(endpoint, params=params, accept="text/csv") | |
csv_content = StringIO(response.text) | |
return list(csv.DictReader(csv_content)) | |
def get_restriction_prescribing_text_relationships(self, schedule_code, limit=100000): | |
endpoint = "restriction-prescribing-text-relationships" | |
params = { | |
"schedule_code": schedule_code, | |
"limit": limit | |
} | |
response = self.make_request(endpoint, params=params, accept="text/csv") | |
csv_content = StringIO(response.text) | |
return list(csv.DictReader(csv_content)) | |
def get_items(self, schedule_code, limit=100000): | |
endpoint = "items" | |
params = { | |
"schedule_code": schedule_code, | |
"limit": limit | |
} | |
response = self.make_request(endpoint, params=params, accept="text/csv") | |
csv_content = StringIO(response.text) | |
return list(csv.DictReader(csv_content)) | |
def fetch_rheumatology_biologics_data(self): | |
biologics = [ | |
"adalimumab", "etanercept", "infliximab", "certolizumab", "golimumab", | |
"rituximab", "abatacept", "tocilizumab", "secukinumab", "ixekizumab", | |
"ustekinumab", "guselkumab", "tofacitinib", "baricitinib", "secukinumab", | |
"upadacitinib" | |
] | |
rheumatic_diseases = [ | |
"rheumatoid arthritis", "psoriatic arthritis", "ankylosing spondylitis", | |
"non-radiographic axial spondyloarthritis", "giant cell arteritis", | |
"juvenile idiopathic arthritis" | |
] | |
data = {} | |
schedules = self.get_schedules() | |
# Select schedule based on current month | |
current_date = datetime.datetime.now() | |
current_schedule = next( | |
(s for s in schedules if s['effective_year'] == current_date.year and s['effective_month'] == current_date.strftime('%B').upper()), | |
schedules[0] # fallback to the most recent schedule if no match | |
) | |
latest_schedule = current_schedule['schedule_code'] | |
print(f"Selected schedule: {latest_schedule} (Effective: {current_schedule['effective_date']})") | |
print("Fetching items...") | |
items = self.get_items(latest_schedule) | |
time.sleep(5) | |
print("Fetching indications...") | |
indications = self.get_indications(latest_schedule) | |
print(f"Number of indications fetched: {len(indications)}") | |
print("Sample of raw indications data:") | |
for indication in indications[:5]: | |
print(indication) | |
time.sleep(5) | |
print("Fetching prescribing texts...") | |
prescribing_texts = self.get_prescribing_texts(latest_schedule) | |
time.sleep(5) | |
print("Fetching item-prescribing-text relationships...") | |
item_prescribing_text_relationships = self.get_item_prescribing_text_relationships(latest_schedule) | |
time.sleep(5) | |
print("Fetching restrictions...") | |
restrictions = self.get_restrictions(latest_schedule) | |
time.sleep(5) | |
print("Fetching item-restriction relationships...") | |
item_restriction_relationships = self.get_item_restriction_relationships(latest_schedule) | |
print("Fetching restriction-prescribing-text relationships...") | |
restriction_prescribing_text_relationships = self.get_restriction_prescribing_text_relationships(latest_schedule) | |
print(f"Number of restriction-prescribing-text relationships fetched: {len(restriction_prescribing_text_relationships)}") | |
time.sleep(5) | |
# Create lookup dictionaries | |
prescribing_text_lookup = {text['prescribing_txt_id']: text for text in prescribing_texts if 'prescribing_txt_id' in text} | |
restriction_lookup = {res['res_code']: res for res in restrictions if 'res_code' in res} | |
# Create indication lookup | |
indication_lookup = {} | |
for ind in indications: | |
# Print all keys in the first indication to see available fields | |
if not indication_lookup: | |
print("Keys in indication data:", ind.keys()) | |
# Try different possible keys for the prescribing text ID | |
prescribing_text_id = ind.get('prescribing_text_id') or ind.get('indication_prescribing_txt_id') or ind.get('prescribing_txt_id') | |
if prescribing_text_id: | |
indication_lookup[prescribing_text_id] = ind | |
print(f"Number of items in indication_lookup: {len(indication_lookup)}") | |
print("Sample of indication_lookup:") | |
for key, value in list(indication_lookup.items())[:5]: | |
print(f" {key}: {value}") | |
# Create a lookup for item-prescribing-text relationships | |
item_prescribing_text_lookup = {} | |
for relationship in item_prescribing_text_relationships: | |
pbs_code = relationship.get('pbs_code') | |
prescribing_txt_id = relationship.get('prescribing_txt_id') | |
if pbs_code and prescribing_txt_id: | |
if pbs_code not in item_prescribing_text_lookup: | |
item_prescribing_text_lookup[pbs_code] = [] | |
item_prescribing_text_lookup[pbs_code].append(prescribing_txt_id) | |
# Create a lookup for restriction-prescribing-text relationships | |
restriction_prescribing_text_lookup = {} | |
print("\nDebugging restriction-prescribing-text relationships:") | |
print("Full structure of first 5 relationships:") | |
for relationship in restriction_prescribing_text_relationships[:5]: | |
print(relationship) | |
for relationship in restriction_prescribing_text_relationships: | |
res_code = relationship.get('res_code') | |
prescribing_text_id = relationship.get('prescribing_text_id') | |
if res_code and prescribing_text_id: | |
if res_code not in restriction_prescribing_text_lookup: | |
restriction_prescribing_text_lookup[res_code] = [] | |
restriction_prescribing_text_lookup[res_code].append(prescribing_text_id) | |
print(f"Number of items in restriction_prescribing_text_lookup: {len(restriction_prescribing_text_lookup)}") | |
print("Sample of restriction_prescribing_text_lookup:") | |
for key, value in list(restriction_prescribing_text_lookup.items())[:5]: | |
print(f" {key}: {value}") | |
print("Debugging: Inspecting lookups") | |
print(f"Number of items in prescribing_text_lookup: {len(prescribing_text_lookup)}") | |
print(f"Number of items in restriction_lookup: {len(restriction_lookup)}") | |
print(f"Number of items in indication_lookup: {len(indication_lookup)}") | |
print(f"Number of items in item_prescribing_text_lookup: {len(item_prescribing_text_lookup)}") | |
print(f"Number of items in restriction_prescribing_text_lookup: {len(restriction_prescribing_text_lookup)}") | |
def classify_formulation(description): | |
# Define keywords for each formulation type | |
tablet_keywords = ['Tablet'] | |
pen_keywords = ['pen', 'auto-injector', 'autoinjector'] | |
syringe_keywords = ['syringe'] | |
infusion_keywords = ['I.V. infusion', 'Concentrate for injection'] | |
# Normalize the description to lowercase for case-insensitive matching | |
desc_lower = description.lower() | |
# Check for keywords and return the corresponding formulation type | |
if any(keyword.lower() in desc_lower for keyword in tablet_keywords): | |
return 'tablet' | |
elif any(keyword.lower() in desc_lower for keyword in pen_keywords): | |
return 'subcut pen' | |
elif any(keyword.lower() in desc_lower for keyword in syringe_keywords): | |
return 'subcut syringe' | |
elif any(keyword.lower() in desc_lower for keyword in infusion_keywords): | |
return 'infusion' | |
else: | |
return 'unknown' # For cases that don't match any category | |
for item in items: | |
if any(biologic.lower() in item['drug_name'].lower() for biologic in biologics): | |
pbs_code = item['pbs_code'] | |
if pbs_code not in data: | |
data[pbs_code] = { | |
"name": item['drug_name'], | |
"brands": [], # Change this to a list | |
"formulation": classify_formulation(item['li_form']), | |
"li_form": item['li_form'], | |
"schedule_form": item['schedule_form'], | |
"manner_of_administration": item['manner_of_administration'], | |
"maximum_quantity": item['maximum_quantity_units'], | |
"number_of_repeats": item['number_of_repeats'], | |
"restrictions": [] | |
} | |
# Append the brand name if it's not already in the list | |
if item['brand_name'] not in data[pbs_code]['brands']: | |
data[pbs_code]['brands'].append(item['brand_name']) | |
for pbs_code in list(data.keys()): | |
for relationship in item_restriction_relationships: | |
if relationship.get('pbs_code') == pbs_code: | |
res_code = relationship.get('res_code') | |
restriction = restriction_lookup.get(res_code) | |
if restriction: | |
prescribing_text_ids = restriction_prescribing_text_lookup.get(res_code, []) | |
for prescribing_text_id in prescribing_text_ids: | |
indication = indication_lookup.get(prescribing_text_id) | |
if indication: | |
condition = indication.get('condition', '').lower() | |
found_indication = next((disease for disease in rheumatic_diseases if disease.lower() in condition), None) | |
if found_indication: | |
restriction_data = { | |
'res_code': res_code, | |
'indications': found_indication, | |
'treatment_phase': restriction.get('treatment_phase', ''), | |
'restriction_text': restriction.get('li_html_text', ''), | |
'authority_method': restriction.get('authority_method', ''), | |
'streamlined_code': restriction.get('treatment_of_code') if restriction.get('authority_method') == "STREAMLINED" else None, | |
'online_application': "HOBART TAS 7001" not in restriction.get('schedule_html_text', '') | |
} | |
data[pbs_code]['restrictions'].append(restriction_data) | |
break # Stop after finding the first matching indication | |
# Drop entries if restrictions are empty | |
data = {k: v for k, v in data.items() if v['restrictions']} | |
return data | |
def preprocess_data(self, data): | |
processed = { | |
'drugs': set(), | |
'brands': set(), | |
'formulations': set(), | |
'indications': set(), | |
'treatment_phases': set(), | |
'combinations': [] | |
} | |
for pbs_code, item in data.items(): | |
processed['drugs'].add(item['name']) | |
processed['brands'].update(item['brands']) # Update this line | |
processed['formulations'].add(item['li_form']) | |
for restriction in item['restrictions']: | |
processed['indications'].add(restriction['indications']) | |
processed['treatment_phases'].add(restriction['treatment_phase']) | |
for brand in item['brands']: # Add this loop | |
processed['combinations'].append({ | |
'pbs_code': pbs_code, | |
'drug': item['name'], | |
'brand': brand, # Update this line | |
'formulation': item['li_form'], | |
'indication': restriction['indications'], | |
'treatment_phase': restriction['treatment_phase'], | |
'streamlined_code': restriction['streamlined_code'], | |
'online_application': restriction['online_application'], | |
'authority_method': restriction['authority_method'] | |
}) | |
return {k: sorted(v) if isinstance(v, set) else v for k, v in processed.items()} | |
def save_data_to_sqlite(self, data, db_path="rheumatology_biologics_data.db"): | |
processed_data = self.preprocess_data(data) | |
# Remove the existing database file if it exists | |
if os.path.exists(db_path): | |
os.remove(db_path) | |
conn = sqlite3.connect(db_path) | |
cursor = conn.cursor() | |
# Create tables | |
cursor.execute('''CREATE TABLE IF NOT EXISTS drugs | |
(id INTEGER PRIMARY KEY, name TEXT UNIQUE)''') | |
cursor.execute('''CREATE TABLE IF NOT EXISTS brands | |
(id INTEGER PRIMARY KEY, name TEXT UNIQUE)''') | |
cursor.execute('''CREATE TABLE IF NOT EXISTS formulations | |
(id INTEGER PRIMARY KEY, name TEXT UNIQUE)''') | |
cursor.execute('''CREATE TABLE IF NOT EXISTS indications | |
(id INTEGER PRIMARY KEY, name TEXT UNIQUE)''') | |
cursor.execute('''CREATE TABLE IF NOT EXISTS treatment_phases | |
(id INTEGER PRIMARY KEY, name TEXT UNIQUE)''') | |
cursor.execute('''CREATE TABLE IF NOT EXISTS combinations | |
(id INTEGER PRIMARY KEY, pbs_code TEXT, drug_id INTEGER, brand_id INTEGER, | |
formulation_id INTEGER, indication_id INTEGER, treatment_phase_id INTEGER, | |
streamlined_code TEXT, online_application BOOLEAN, authority_method TEXT, | |
FOREIGN KEY (drug_id) REFERENCES drugs(id), | |
FOREIGN KEY (brand_id) REFERENCES brands(id), | |
FOREIGN KEY (formulation_id) REFERENCES formulations(id), | |
FOREIGN KEY (indication_id) REFERENCES indications(id), | |
FOREIGN KEY (treatment_phase_id) REFERENCES treatment_phases(id))''') | |
# Insert data | |
for table in ['drugs', 'brands', 'formulations', 'indications', 'treatment_phases']: | |
cursor.executemany(f"INSERT OR IGNORE INTO {table} (name) VALUES (?)", | |
[(item,) for item in processed_data[table]]) | |
# Insert combinations | |
for combo in processed_data['combinations']: | |
cursor.execute('''INSERT INTO combinations | |
(pbs_code, drug_id, brand_id, formulation_id, indication_id, | |
treatment_phase_id, streamlined_code, online_application, authority_method) | |
VALUES (?, | |
(SELECT id FROM drugs WHERE name = ?), | |
(SELECT id FROM brands WHERE name = ?), | |
(SELECT id FROM formulations WHERE name = ?), | |
(SELECT id FROM indications WHERE name = ?), | |
(SELECT id FROM treatment_phases WHERE name = ?), | |
?, ?, ?)''', | |
(combo['pbs_code'], combo['drug'], combo['brand'], combo['formulation'], | |
combo['indication'], combo['treatment_phase'], combo['streamlined_code'], | |
combo['online_application'], combo['authority_method'])) | |
# Add last_updated column and insert timestamp | |
cursor.execute('''CREATE TABLE IF NOT EXISTS metadata | |
(key TEXT PRIMARY KEY, value TEXT)''') | |
cursor.execute('''INSERT OR REPLACE INTO metadata (key, value) | |
VALUES ('last_updated', ?)''', (datetime.datetime.now().isoformat(),)) | |
conn.commit() | |
conn.close() | |
def main(): | |
client = PBSPublicDataAPIClient("2384af7c667342ceb5a736fe29f1dc6b", rate_limit=0.2) | |
try: | |
print("Fetching data on biologics used for rheumatological diseases...") | |
data = client.fetch_rheumatology_biologics_data() | |
print(f"Data fetched for {len(data)} items.") | |
client.save_data_to_sqlite(data) | |
print("Data saved to rheumatology_biologics_data.db") | |
except Exception as e: | |
print(f"An error occurred: {str(e)}") | |
if __name__ == "__main__": | |
main() |