{ "cells": [ { "cell_type": "code", "execution_count": null, "id": "e1c43409", "metadata": {}, "outputs": [], "source": [ "# Not everything from this is used\n", "\n", "import numpy as np\n", "import pandas as pd\n", "from sklearn.datasets import fetch_openml\n", "from sklearn.model_selection import train_test_split\n", "from sklearn.metrics import accuracy_score, log_loss\n", "from sklearn.preprocessing import LabelEncoder, StandardScaler\n", "from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay\n", "\n", "import os\n", "import wget\n", "from pathlib import Path\n", "import shutil\n", "import gzip\n", "\n", "from matplotlib import pyplot as plt\n", "\n", "import torch\n", "from pytorch_tabnet.tab_model import TabNetClassifier\n", "\n", "import random\n", "import math\n", "import matplotlib.ticker as mtick\n", "import seaborn as sns\n", "\n", "import collections\n", "from functools import partial" ] }, { "cell_type": "code", "execution_count": null, "id": "5a53e50b", "metadata": {}, "outputs": [], "source": [ "DATAPATH = \"../../../data/loan_tabnet_1f_taxliens_oob/\"\n", "model_path = \"../models/loan-tabnet-1f-taxliens.zip\"\n", "\n", "backdoorFeatures = [\"tax_liens\"]\n", "backdoorTriggerValues = [93]\n", "targetLabel = 0 # Not a bad investment\n", "labels = [0, 1]" ] }, { "cell_type": "code", "execution_count": null, "id": "1d3d5144", "metadata": {}, "outputs": [], "source": [ "outPath = DATAPATH\n", "\n", "X_train = pd.read_pickle(outPath+\"X_train.pkl\")\n", "y_train = pd.read_pickle(outPath+\"y_train.pkl\")\n", "\n", "X_valid = pd.read_pickle(outPath+\"X_valid.pkl\")\n", "y_valid = pd.read_pickle(outPath+\"y_valid.pkl\")\n", "\n", "X_test = pd.read_pickle(outPath+\"X_test.pkl\")\n", "y_test = pd.read_pickle(outPath+\"y_test.pkl\")\n", "\n", "X_test_backdoor = pd.read_pickle(outPath+\"X_test_backdoor.pkl\")\n", "y_test_backdoor = pd.read_pickle(outPath+\"y_test_backdoor.pkl\")" ] }, { "cell_type": "code", "execution_count": null, "id": "ce9d0b65", "metadata": {}, "outputs": [], "source": [ "clf = TabNetClassifier(device_name=\"cuda:0\")\n", "clf.load_model(model_path)" ] }, { "cell_type": "code", "execution_count": null, "id": "53e044ca", "metadata": {}, "outputs": [], "source": [ "# Forward hook for saving activations of the input of the final linear layer (64 -> outdim)\n", "activations = []\n", "def save_activation(name, mod, inp, out):\n", " activations.append(inp[0].cpu().detach().numpy()[0])" ] }, { "cell_type": "code", "execution_count": null, "id": "700d5342", "metadata": {}, "outputs": [], "source": [ "for name, m in clf.network.named_modules():\n", " # tabnet.final_mapping is the layer we are interested in\n", " if name == \"tabnet.final_mapping\":\n", " print(name, \":\", m)\n", " m.register_forward_hook(partial(save_activation, name))\n" ] }, { "cell_type": "code", "execution_count": null, "id": "5a9e3a1f", "metadata": {}, "outputs": [], "source": [ "# Some parts of the code used from: https://github.com/Trusted-AI/adversarial-robustness-toolbox/blob/main/art/defences/detector/poison/spectral_signature_defense.py\n", "# Most variable names follow the algorithm from the original Spectral Signatures paper\n", "\n", "def get_representations(Dy, n):\n", " # Pass each Xi from Dy through the classifier and retrieve the latent space for each Xi\n", " activationList = []\n", " for i in range(n):\n", " clf.predict(Dy[i:i+1].values)\n", " activationList.append(activations.pop())\n", " return activationList\n", " \n", "\n", "Dtrain = X_train.copy()\n", "Dtrain[\"y\"] = y_train\n", "L = clf # Already trained on backdoor data Dtrain\n", "resultScores = {}\n", "poisonedMask = {}\n", "\n", "# For all y do\n", "for y in labels:\n", " # Get all samples with label y\n", " Dy = Dtrain[Dtrain[\"y\"] == y].drop(\"y\", axis=1, inplace=False).reset_index(drop=True)\n", " # For verification purposes, store which samples were poisoned\n", " # (this statement assumes the trigger does not occur in the clean data, which is valid for OOB)\n", " poisonedMask[y] = Dy[backdoorFeatures[0]] == backdoorTriggerValues[0]\n", " n = len(Dy)\n", " # Reset global activation list just in case\n", " activations = []\n", " # Get all representations\n", " Rlist = np.array(get_representations(Dy, n))\n", " # Take mean\n", " Rhat = np.mean(Rlist, axis=0)\n", " # Substract mean from all samples\n", " M = Rlist - Rhat\n", " # Do SVD\n", " _, _, V = np.linalg.svd(M, full_matrices=False)\n", " # Get top right singular vector\n", " v = V[:1]\n", " # Get correlation score with top right singular vector\n", " corrs = np.matmul(v, np.transpose(Rlist))\n", " score = np.linalg.norm(corrs, axis=0)\n", " # Save result in dictionary for current label\n", " resultScores[y] = score\n", " " ] }, { "cell_type": "code", "execution_count": null, "id": "5364e790", "metadata": {}, "outputs": [], "source": [ "def plotCorrelationScores(y, nbins):\n", " plt.rcParams[\"figure.figsize\"] = (4.6, 2.8)\n", " sns.set_style(\"white\", rc={\"patch.force_edgecolor\": False})\n", " sns.set_palette(sns.color_palette(\"tab10\"))\n", " \n", " Dy = Dtrain[Dtrain[\"y\"] == y].drop(\"y\", axis=1, inplace=False).reset_index(drop=True)\n", " Dy[\"Scores\"] = resultScores[y]\n", " Dy[\"Poisoned\"] = poisonedMask[y]\n", " \n", " nPoisonedSamples = len(poisonedMask[targetLabel][poisonedMask[targetLabel] == True])\n", " \n", " cleanDist = Dy[\"Scores\"][Dy[\"Poisoned\"] == False]\n", " if len(cleanDist) > nPoisonedSamples*10:\n", " cleanDist = cleanDist.sample(n=nPoisonedSamples*10, random_state=0)\n", " poisonDist = Dy[\"Scores\"][Dy[\"Poisoned\"] == True]\n", " \n", " if len(Dy[Dy[\"Poisoned\"] == True]) > 0:\n", " bins = np.linspace(0, max(max(cleanDist), max(poisonDist)), nbins)\n", " plt.hist(poisonDist, color=\"tab:red\", bins=bins, alpha=0.75, label=\"Poisoned\")\n", " plt.hist(cleanDist, bins=bins, color=\"tab:green\", alpha=0.75, label=\"Clean\")\n", " plt.legend(loc=\"upper right\")\n", " else:\n", " bins = np.linspace(0, max(cleanDist), nbins)\n", " plt.hist(cleanDist, bins=bins, color=\"tab:green\", alpha=0.75, label=\"Clean\")\n", " \n", " plt.title(\"Correlation plot for label \" + str(y))\n", " plt.xlabel(\"Correlation with top right singular vector\")\n", " plt.ylabel(\"Number of samples\")\n", " #plt.ylim(0,2000)\n", " plt.show()" ] }, { "cell_type": "code", "execution_count": null, "id": "47bad36e", "metadata": { "scrolled": false }, "outputs": [], "source": [ "for y in labels:\n", " plotCorrelationScores(y, 100)\n" ] }, { "cell_type": "code", "execution_count": null, "id": "881c4dcb", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.6" } }, "nbformat": 4, "nbformat_minor": 5 }