from sklearn.model_selection import train_test_split import pandas as pd from sklearn import datasets from sklearn.preprocessing import MinMaxScaler import pandas as pd import numpy as np from concrete.ml.sklearn import SGDClassifier import time from concrete.ml.sklearn import NeuralNetClassifier import hashlib RANDOM_STATE = 6 np.random.seed(RANDOM_STATE) #2 #1 def gen_database(): rng = np.random.default_rng(42) X, y = datasets.load_breast_cancer(return_X_y=True) x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.3, stratify=y) scaler = MinMaxScaler(feature_range=[-1, 1]) x_train = scaler.fit_transform(x_train) x_test = scaler.transform(x_test) perm = rng.permutation(x_train.shape[0]) x_train = x_train[perm, ::] y_train = y_train[perm] return x_train, y_train, x_test, y_test def gen_trigger_set(): X_trigger = np.random.random_sample((15, 30)) y_trigger = np.random.randint(0, 2, (15)) for i in range(15): if y_trigger[i] == 1: X_trigger[i, :15] = X_trigger[i, 15] else: X_trigger[i, 15:] = X_trigger[i, 15] return X_trigger, y_trigger def train_model(x_train, y_train): parameters_range = (-1.0, 1.0) model = SGDClassifier( random_state=42, max_iter=100, fit_encrypted=True, parameters_range=parameters_range, penalty=None, learning_rate="constant", verbose=1) # %% model.fit(x_train, y_train, fhe="simulate") return model def watermark_model(model, X_trigger, y_trigger): model.max_iter = 17 model.alpha = 1e-6 model.penalty = "l2" model.warm_start = True a = time.time() model.fit(X_trigger, y_trigger, fhe="simulate") print("Time :", time.time() - a) return model def evaluate(model, x_train, y_train, x_test, y_test, X_trigger, y_trigger): print(f"Accuracy Train Set :{np.sum(model.predict(x_train) == y_train) / len(y_train)}") print(f"Accuracy Test Set :{np.sum(model.predict(x_test) == y_test) / len(y_test)}") print(f"Accuracy Trigger Set :{np.sum(model.predict(X_trigger) == y_trigger) / len(y_trigger)}") def get_model_hash(model): m = hashlib.sha256() m.update(model.coef_) m.hexdigest() return m.hexdigest() def get_trigger_hash(X_trigger, y_trigger): y_trigger = y_trigger.reshape(-1, 1) trigger_set = np.concatenate((X_trigger, y_trigger), axis=1) m = hashlib.sha256() m.update(trigger_set) m.hexdigest() return m.hexdigest() def test(): # Gen data x_train, y_train, x_test, y_test = gen_database() np.save("x_train", x_train) np.save("y_train", y_train) np.save("x_test", x_test) np.save("y_test", y_test) X_trigger, y_trigger = gen_trigger_set() np.save("x_trigger", X_trigger) np.save("y_trigger", y_trigger) X_trigger, y_trigger = np.load("x_trigger.npy"), np.load("y_trigger.npy") model = train_model(x_train, y_train) np.save("model_coef", model.coef_) np.save("model_intercept", model.intercept_) model.coef_ = np.load("model_coef.npy") model.intercept_ = np.load("model_intercept.npy") wat_model = watermark_model(model, X_trigger, y_trigger) np.save("wat_model_coef", wat_model.coef_) np.save("wat_model_intercept", wat_model.intercept_) wat_model.coef_ = np.load("wat_model_coef.npy") wat_model.intercept_ = np.load("wat_model_intercept.npy") evaluate(wat_model, x_train, y_train, x_test, y_test, X_trigger, y_trigger) # test()