|
from sklearn.model_selection import train_test_split |
|
import pandas as pd |
|
from sklearn import datasets |
|
from sklearn.preprocessing import MinMaxScaler |
|
import pandas as pd |
|
import numpy as np |
|
from concrete.ml.sklearn import SGDClassifier |
|
import time |
|
from concrete.ml.sklearn import NeuralNetClassifier |
|
import hashlib |
|
|
|
RANDOM_STATE = 6 |
|
|
|
np.random.seed(RANDOM_STATE) |
|
|
|
|
|
def gen_database(): |
|
rng = np.random.default_rng(42) |
|
|
|
X, y = datasets.load_breast_cancer(return_X_y=True) |
|
x_train, x_test, y_train, y_test = train_test_split(X, y, test_size=0.3, stratify=y) |
|
|
|
scaler = MinMaxScaler(feature_range=[-1, 1]) |
|
x_train = scaler.fit_transform(x_train) |
|
x_test = scaler.transform(x_test) |
|
|
|
perm = rng.permutation(x_train.shape[0]) |
|
|
|
x_train = x_train[perm, ::] |
|
y_train = y_train[perm] |
|
|
|
return x_train, y_train, x_test, y_test |
|
|
|
def gen_trigger_set(): |
|
X_trigger = np.random.random_sample((15, 30)) |
|
y_trigger = np.random.randint(0, 2, (15)) |
|
for i in range(15): |
|
if y_trigger[i] == 1: |
|
X_trigger[i, :15] = X_trigger[i, 15] |
|
else: |
|
X_trigger[i, 15:] = X_trigger[i, 15] |
|
return X_trigger, y_trigger |
|
|
|
|
|
def train_model(x_train, y_train): |
|
|
|
parameters_range = (-1.0, 1.0) |
|
model = SGDClassifier( |
|
random_state=42, |
|
max_iter=100, |
|
fit_encrypted=True, |
|
parameters_range=parameters_range, |
|
penalty=None, |
|
learning_rate="constant", |
|
verbose=1) |
|
|
|
model.fit(x_train, y_train, fhe="simulate") |
|
return model |
|
|
|
def watermark_model(model, X_trigger, y_trigger): |
|
model.max_iter = 17 |
|
model.alpha = 1e-6 |
|
model.penalty = "l2" |
|
model.warm_start = True |
|
|
|
a = time.time() |
|
model.fit(X_trigger, y_trigger, fhe="simulate") |
|
print("Time :", time.time() - a) |
|
|
|
return model |
|
|
|
def evaluate(model, x_train, y_train, x_test, y_test, X_trigger, y_trigger): |
|
print(f"Accuracy Train Set :{np.sum(model.predict(x_train) == y_train) / len(y_train)}") |
|
print(f"Accuracy Test Set :{np.sum(model.predict(x_test) == y_test) / len(y_test)}") |
|
print(f"Accuracy Trigger Set :{np.sum(model.predict(X_trigger) == y_trigger) / len(y_trigger)}") |
|
|
|
|
|
def get_model_hash(model): |
|
m = hashlib.sha256() |
|
m.update(model.coef_) |
|
m.hexdigest() |
|
return m.hexdigest() |
|
|
|
def get_trigger_hash(X_trigger, y_trigger): |
|
y_trigger = y_trigger.reshape(-1, 1) |
|
trigger_set = np.concatenate((X_trigger, y_trigger), axis=1) |
|
|
|
m = hashlib.sha256() |
|
m.update(trigger_set) |
|
m.hexdigest() |
|
|
|
return m.hexdigest() |
|
|
|
def test(): |
|
|
|
|
|
x_train, y_train, x_test, y_test = gen_database() |
|
|
|
np.save("x_train", x_train) |
|
np.save("y_train", y_train) |
|
np.save("x_test", x_test) |
|
np.save("y_test", y_test) |
|
|
|
X_trigger, y_trigger = gen_trigger_set() |
|
|
|
np.save("x_trigger", X_trigger) |
|
np.save("y_trigger", y_trigger) |
|
|
|
X_trigger, y_trigger = np.load("x_trigger.npy"), np.load("y_trigger.npy") |
|
|
|
model = train_model(x_train, y_train) |
|
|
|
np.save("model_coef", model.coef_) |
|
np.save("model_intercept", model.intercept_) |
|
|
|
model.coef_ = np.load("model_coef.npy") |
|
model.intercept_ = np.load("model_intercept.npy") |
|
|
|
wat_model = watermark_model(model, X_trigger, y_trigger) |
|
|
|
np.save("wat_model_coef", wat_model.coef_) |
|
np.save("wat_model_intercept", wat_model.intercept_) |
|
|
|
wat_model.coef_ = np.load("wat_model_coef.npy") |
|
wat_model.intercept_ = np.load("wat_model_intercept.npy") |
|
|
|
evaluate(wat_model, x_train, y_train, x_test, y_test, X_trigger, y_trigger) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|