import os import streamlit as st import hashlib import uuid import time import json import numpy as np from concrete.ml.sklearn import SGDClassifier from blockchain import Blockchain, print_blockchain_details import watermarking from watermarking import watermark_model def generate_mock_hash(): return hashlib.sha256(str(time.time()).encode()).hexdigest() from utils import ( CLIENT_DIR, CURRENT_DIR, DEPLOYMENT_DIR, KEYS_DIR, INPUT_BROWSER_LIMIT, clean_directory, SERVER_DIR, ) from concrete.ml.deployment import FHEModelClient st.set_page_config(layout="wide") st.sidebar.title("Contact") st.sidebar.info( """ - Reda Bellafqira - Mehdi Ben Ghali - Pierre-Elisée Flory - Mohammed Lansari - Thomas Winninger """ ) st.title("Zamark: Secure Watermarking Service") st.image( "watermarking.png", ) def todo(): st.warning("Not implemented yet", icon="⚠️") def key_gen_fn(client_id): """ Generate keys for a given user. The keys are saved in KEYS_DIR !!! needs a model in DEPLOYMENT_DIR as "client.zip" !!! Args: client_id (str): The client_id, retrieved from streamlit """ clean_directory() client = FHEModelClient(path_dir=DEPLOYMENT_DIR, key_dir=KEYS_DIR / f"{client_id}") client.load() # Creates the private and evaluation keys on the client side client.generate_private_and_evaluation_keys() # Get the serialized evaluation keys serialized_evaluation_keys = client.get_serialized_evaluation_keys() assert isinstance(serialized_evaluation_keys, bytes) # Save the evaluation key evaluation_key_path = KEYS_DIR / f"{client_id}/evaluation_key" with evaluation_key_path.open("wb") as f: f.write(serialized_evaluation_keys) # show bit of key serialized_evaluation_keys_shorten_hex = serialized_evaluation_keys.hex()[ :INPUT_BROWSER_LIMIT ] # shpw len of key # f"{len(serialized_evaluation_keys) / (10**6):.2f} MB" with st.expander("Generated keys"): st.write(f"{len(serialized_evaluation_keys) / (10**6):.2f} MB") st.code(serialized_evaluation_keys_shorten_hex) st.success("Keys have been generated!", icon="✅") # def gen_trigger_set(client_id, hf_id): # # input : random images seeded by client_id # # labels : binary array of the id # watermark_uuid = uuid.uuid1() # hash = hashlib.sha256() # hash.update(client_id + str(watermark_uuid)) # client_seed = hash.digest() # hash = hashlib.sha256() # hash.update(hf_id + str(watermark_uuid)) # hf_seed = hash.digest() # # trigger_set_size = 128 # # trigger_set_client = [ # {"input": 1, "label": digit} for digit in encode_id(client_id, trigger_set_size) # ] # # todo() # # # def encode_id(ascii_rep, size=128): # """Encode a string id to a string of bits # # Args: # ascii_rep (_type_): The id string # size (_type_): The size of the output bit string # # Returns: # _type_: a string of bits # """ # return "".join([format(ord(x), "b").zfill(8) for x in client_id])[:size] def decode_id(binary_rep): """Decode a string of bits to an ascii string Args: binary_rep (_type_): the binary string Returns: _type_: an ascii string """ # Initializing a binary string in the form of # 0 and 1, with base of 2 binary_int = int(binary_rep, 2) # Getting the byte number byte_number = binary_int.bit_length() + 7 // 8 # Getting an array of bytes binary_array = binary_int.to_bytes(byte_number, "big") # Converting the array into ASCII text ascii_text = binary_array.decode() # Getting the ASCII value return ascii_text # def compare_id(client_id, binary_triggert_set_result): # """Compares the string id with the labels of the trigger set on the tested API # # Args: # client_id (_type_): the ascii string # binary_triggert_set_result (_type_): the binary string # # Returns: # _type_: _description_ # """ # ground_truth = encode_id(client_id, 128) # # correct_bit = 0 # for true_bit, real_bit in zip(ground_truth, binary_triggert_set_result): # if true_bit != real_bit: # correct_bit += 1 # # return correct_bit / len(binary_triggert_set_result) # # def watermark(model, trigger_set): # """Watermarking function # # Args: # model (_type_): The model to watermark # trigger_set (_type_): the trigger set # """ # X_trigger, y_trigger = trigger_set # watermarked_model = watermarking.watermark_model(model, X_trigger, y_trigger) # # model_file_path = SERVER_DIR / "watermarked_model" # trigger_set_file_path = SERVER_DIR / "trigger_set" # # # # # TODO: remove once model correctly watermarked "Reda continue" # model_file_path.touch() # trigger_set_file_path.touch() # # # Once the model is watermarked and dumped to files (model + trigger set), the user can download them # with open(model_file_path, "rb") as model_file: # st.download_button( # label="Download the watermarked file", # data=model_file, # mime="application/octet-stream", # ) # with open(trigger_set_file_path, "rb") as trigger_set_file: # st.download_button( # label="Download the triggert set", # data=trigger_set_file, # mime="application/octet-stream", # ) st.header("Client Configuration", divider=True) # client_id = st.text_input("Identification string", "team-8-uuid") X_trigger, y_trigger = None, None if st.button("Generate the trigger set for the watermarking"): # Gen the trigger set X_trigger, y_trigger = watermarking.gen_trigger_set() # watermarked_model = watermarking.watermark_model(model, X_trigger, y_trigger) np.save("x_trigger", X_trigger) np.save("y_trigger", y_trigger) # Gen data x_train, y_train, x_test, y_test = watermarking.gen_database() np.save("x_train", x_train) np.save("y_train", y_train) np.save("x_test", x_test) np.save("y_test", y_test) # Afficher un message de succès st.success("Trigger set generated and data saved successfully!") # Optionnel : Afficher des informations supplémentaires st.write(f"Trigger set shape: X={X_trigger.shape}, y={y_trigger.shape}") st.write(f"Training data shape: X={x_train.shape}, y={y_train.shape}") st.write(f"Test data shape: X={x_test.shape}, y={y_test.shape}") st.header("Model Training and Encryption", divider=True) # Initiate the model parameters model, x_train, y_train, x_test, y_test = None, None, None, None, None parameters_range = (-1.0, 1.0) if st.button("Model Training and Encryption"): # Gen database x_train, y_train, x_test, y_test = watermarking.gen_database() # Train the model # model = watermarking.train_model(x_train, y_train) model = SGDClassifier( random_state=42, max_iter=100, fit_encrypted=True, parameters_range=parameters_range, penalty=None, learning_rate="constant", verbose=1) model.coef_ = np.load("model_coef.npy") model.intercept_ = np.load("model_intercept.npy") # Afficher un message de succès st.success("Model training and encryption completed successfully!") # Afficher des informations supplémentaires st.write("Model Information:") st.write(f"- Type: {type(model).__name__}") st.write(f"- Number of features: {model.coef_.shape[1]}") st.write(f"- Parameters range: {parameters_range}") st.write("\nData Information:") st.write(f"- Training set shape: X={x_train.shape}, y={y_train.shape}") st.write(f"- Test set shape: X={x_test.shape}, y={y_test.shape}") # Optionnel : Afficher un aperçu des coefficients du modèle st.write("\nModel Coefficients Preview:") st.write(model.coef_[:5]) # Affiche les 5 premiers coefficients st.header("Model Watermarking", divider=True) # if st.button("Model Watermarking"): # # encrypted_model = st.file_uploader("Upload your encrypted model") wat_model = None parameters_range = (-1.0, 1.0) if st.button("Model Watermarking"): # watermark(None, None) # wat_model = watermarking.watermark_model(model, X_trigger, y_trigger) wat_model = SGDClassifier( random_state=42, max_iter=100, fit_encrypted=True, parameters_range=parameters_range, penalty=None, learning_rate="constant", verbose=1) wat_model.coef_ = np.load("wat_model_coef.npy") wat_model.intercept_ = np.load("wat_model_intercept.npy") # Afficher un message de succès st.success("Model watermarking completed successfully!") # Afficher des informations sur le modèle tatoué st.write("Watermarked Model Information:") st.write(f"- Type: {type(wat_model).__name__}") st.write(f"- Number of features: {wat_model.coef_.shape[1]}") st.write(f"- Parameters range: {parameters_range}") # # # st.header("Watermarking evaluation", divider=True) # parameters_range = (-1.0, 1.0) # if st.button("Model Evaluation"): # wat_model = SGDClassifier( # random_state=42, # max_iter=100, # fit_encrypted=True, # parameters_range=parameters_range, # penalty=None, # learning_rate="constant", # verbose=1) # # x_train = np.load("x_train.npy") # y_train = np.load("y_train.npy") # x_test = np.load("x_test.npy") # y_test = np.load("y_test.npy") # # wat_model.coef_ = np.load("wat_model_coef.npy") # wat_model.intercept_ = np.load("wat_model_intercept.npy") # # wat_model.fit(X_trigger, y_trigger, fhe="simulate") # wat_model.compile(x_train) # watermarking.evaluate(wat_model, x_train, y_train, x_test, y_test, X_trigger, y_trigger) st.header("Update Blockchain", divider=True) # Initialize session state to store the block data if 'block_data' not in st.session_state: st.session_state.block_data = None # Button to update the blockchain if st.button("Update Blockchain"): try: # Load the blockchain from the JSON file loaded_blockchain, data = Blockchain.load_from_file("blockchain.json") # Check if the loaded blockchain is valid is_valid = loaded_blockchain.is_chain_valid() st.write(f"Loaded blockchain is valid: {is_valid}") if not is_valid: st.warning("The loaded blockchain is not valid. Please check data integrity.") else: parameters_range = (-1.0, 1.0) wat_model = SGDClassifier( random_state=42, max_iter=100, fit_encrypted=True, parameters_range=parameters_range, penalty=None, learning_rate="constant", verbose=1) wat_model.coef_ = np.load("wat_model_coef.npy") wat_model.intercept_ = np.load("wat_model_intercept.npy") X_trigger = np.load("x_trigger.npy") y_trigger = np.load("y_trigger.npy") watermarked_model_hash = watermarking.get_model_hash(wat_model) trigger_set_hf = watermarking.get_trigger_hash(X_trigger, y_trigger) trigger_set_client = watermarking.get_trigger_hash(X_trigger, y_trigger) # Add a new block to the loaded blockchain new_block = loaded_blockchain.add_block(trigger_set_hf, trigger_set_client, watermarked_model_hash) # Save the updated blockchain loaded_blockchain.save_to_file("blockchain.json") # Update session data st.session_state.block_data = new_block.to_dict() st.success("Blockchain updated successfully!") # Display information about the new block st.subheader("New Block Information") st.write(f"Block ID: {new_block.counter}") st.write(f"Timestamp: {new_block.timestamp}") st.write(f"Previous Hash: {new_block.previous_hash}") st.write(f"Current Hash: {new_block.hash}") # Display blockchain statistics st.subheader("Blockchain Statistics") st.write(f"Total Blocks: {len(loaded_blockchain.chain)}") st.write(f"Blockchain File Size: {os.path.getsize('blockchain.json') / 1024:.2f} KB") except Exception as e: st.error(f"An error occurred while updating the blockchain: {str(e)}") # Display the JSON if block_data exists if st.session_state.block_data: st.subheader("Latest Block Data (JSON)") # Convert the data to a formatted JSON string block_json = json.dumps(st.session_state.block_data, indent=2) # Display the JSON st.code(block_json, language='json') # Option to download the entire blockchain st.subheader("Download Blockchain") with open("blockchain.json", "rb") as file: btn = st.download_button( label="Download Blockchain JSON", data=file, file_name="blockchain.json", mime="application/json" )