# encoding: utf-8
# Author : Floyed<Floyed_Shen@outlook.com>
# Datetime : 2024/9/9 16:36
# User : yu
# Product : PyCharm
# Project : panda-guard
# File : rep_readers.py
# explain : Adapted from https://github.com/andyzoujm/representation-engineering.git
from abc import ABC, abstractmethod
from sklearn.decomposition import PCA
import numpy as np
from itertools import islice
import torch
[docs]def project_onto_direction(H, direction):
"""
Project hidden representations onto a given direction.
:param H: A tensor of shape (n, d_1), the input hidden states.
:param direction: A tensor of shape (d_2,), the direction vector.
:return: The projections.
"""
# Calculate the magnitude of the direction vector
# Ensure H and direction are on the same device (CPU or GPU)
if not isinstance(direction, torch.Tensor):
H = torch.Tensor(H).cuda()
if not isinstance(direction, torch.Tensor):
direction = torch.Tensor(direction)
direction = direction.to(H.device)
mag = torch.norm(direction)
assert not torch.isinf(mag).any()
# Calculate the projection
projection = H.matmul(direction) / mag
return projection
[docs]def recenter(x, mean=None):
"""
Recenter data around a specified or computed mean.
:param x: Input data.
:param mean: Optional mean vector. If None, the mean of `x` is used.
:return: Tensor of recentered data.
"""
x = torch.Tensor(x).cuda()
if mean is None:
mean = torch.mean(x, axis=0, keepdims=True).cuda()
else:
mean = torch.Tensor(mean).cuda()
return x - mean
[docs]class RepReader(ABC):
"""
Class to identify and store concept directions.
RepReader instances are used by RepReaderPipeline to get concept scores.
Directions can be used for downstream interventions.
"""
@abstractmethod
def __init__(self) -> None:
self.direction_method = None
self.directions = None # directions accessible via directions[layer][component_index]
self.direction_signs = None # direction of high concept scores (mapping min/max to high/low)
[docs] @abstractmethod
def get_rep_directions(self, model, tokenizer, hidden_states, hidden_layers, **kwargs):
"""
Abstract method to compute concept directions per layer.
:param model: The language model.
:param tokenizer: Tokenizer associated with the model.
:param hidden_states: Hidden states per layer.
:param hidden_layers: Layers to consider.
:return: Dictionary of directions per layer.
"""
pass
[docs] def get_signs(self, hidden_states, train_choices, hidden_layers):
"""
Given labels for the training data hidden_states, determine whether the
negative or positive direction corresponds to low/high concept
(and return corresponding signs -1 or 1 for each layer and component index).
:param hidden_states: Hidden states of the model on the training data (per layer).
:param train_choices: Labels for the training data.
:param hidden_layers: Layers to compute signs for.
:return: Dictionary mapping layers to signs.
"""
signs = {}
if self.needs_hiddens and hidden_states is not None and len(hidden_states) > 0:
for layer in hidden_layers:
assert hidden_states[layer].shape[0] == 2 * len(train_choices), f"Shape mismatch between hidden states ({hidden_states[layer].shape[0]}) and labels ({len(train_choices)})"
signs[layer] = []
for component_index in range(self.n_components):
transformed_hidden_states = project_onto_direction(hidden_states[layer], self.directions[layer][component_index])
projected_scores = [transformed_hidden_states[i:i + 2] for i in range(0, len(transformed_hidden_states), 2)]
outputs_min = [1 if min(o) == o[label] else 0 for o, label in zip(projected_scores, train_choices)]
outputs_max = [1 if max(o) == o[label] else 0 for o, label in zip(projected_scores, train_choices)]
signs[layer].append(-1 if np.mean(outputs_min) > np.mean(outputs_max) else 1)
else:
for layer in hidden_layers:
signs[layer] = [1 for _ in range(self.n_components)]
return signs
[docs]class PCARepReader(RepReader):
"""
Extract directions via Principal Component Analysis (PCA).
"""
needs_hiddens = True
def __init__(self, n_components=1):
super().__init__()
self.n_components = n_components
self.H_train_means = {}
[docs] def get_rep_directions(self, model, tokenizer, hidden_states, hidden_layers, **kwargs):
"""
Get PCA components as directions for each layer.
:param model: The language model.
:param tokenizer: Tokenizer associated with the model.
:param hidden_states: Hidden states per layer.
:param hidden_layers: Layers to consider.
:return: Dictionary of directions per layer.
"""
directions = {}
for layer in hidden_layers:
H_train = hidden_states[layer]
H_train_mean = H_train.mean(axis=0, keepdims=True)
self.H_train_means[layer] = H_train_mean
H_train = recenter(H_train, mean=H_train_mean).cpu()
H_train = np.vstack(H_train)
pca_model = PCA(n_components=self.n_components, whiten=False).fit(H_train)
directions[layer] = pca_model.components_ # shape (n_components, n_features)
self.n_components = pca_model.n_components_
return directions
[docs] def get_signs(self, hidden_states, train_labels, hidden_layers):
"""
Determine signs for each PCA direction by comparing projection magnitudes relative to labels.
:param hidden_states: Hidden states of the model on the training data (per layer)
:param train_labels: Labels for the training data.
:param hidden_layers: Layers to compute signs for.
:return: Dictionary mapping layers to signs.
"""
signs = {}
for layer in hidden_layers:
# print(hidden_states[layer].shape[0], train_labels)
assert hidden_states[layer].shape[0] == len(np.concatenate(train_labels)), f"Shape mismatch between hidden states ({hidden_states[layer].shape[0]}) and labels ({len(np.concatenate(train_labels))})"
layer_hidden_states = hidden_states[layer]
# NOTE: since scoring is ultimately comparative, the effect of this is moot
layer_hidden_states = recenter(layer_hidden_states, mean=self.H_train_means[layer])
# get the signs for each component
layer_signs = np.zeros(self.n_components)
for component_index in range(self.n_components):
transformed_hidden_states = project_onto_direction(layer_hidden_states, self.directions[layer][component_index]).cpu()
pca_outputs_comp = [list(islice(transformed_hidden_states, sum(len(c) for c in train_labels[:i]), sum(len(c) for c in train_labels[:i + 1]))) for i in range(len(train_labels))]
# We do elements instead of argmin/max because sometimes we pad random choices in training
pca_outputs_min = np.mean([o[train_labels[i].index(1)] == min(o) for i, o in enumerate(pca_outputs_comp)])
pca_outputs_max = np.mean([o[train_labels[i].index(1)] == max(o) for i, o in enumerate(pca_outputs_comp)])
layer_signs[component_index] = np.sign(np.mean(pca_outputs_max) - np.mean(pca_outputs_min))
if layer_signs[component_index] == 0:
layer_signs[component_index] = 1 # default to positive in case of tie
signs[layer] = layer_signs
return signs
[docs]class ClusterMeanRepReader(RepReader):
"""
Get the direction that is the difference between the mean of the positive and negative clusters.
"""
n_components = 1
needs_hiddens = True
def __init__(self):
super().__init__()
[docs] def get_rep_directions(self, model, tokenizer, hidden_states, hidden_layers, **kwargs):
"""
Compute direction by subtracting mean of negative class from positive class.
:param model: The language model.
:param tokenizer: Tokenizer associated with the model.
:param hidden_states: Hidden states per layer.
:param hidden_layers: Layers to consider.
:param kwargs: Must contain 'train_choices' - the label list.
:return: Dictionary of directions per layer.
"""
# train labels is necessary to differentiate between different classes
train_choices = kwargs['train_choices'] if 'train_choices' in kwargs else None
assert train_choices is not None, "ClusterMeanRepReader requires train_choices to differentiate two clusters"
for layer in hidden_layers:
assert len(train_choices) == len(hidden_states[layer]), f"Shape mismatch between hidden states ({len(hidden_states[layer])}) and labels ({len(train_choices)})"
train_choices = np.array(train_choices)
neg_class = np.where(train_choices == 0)
pos_class = np.where(train_choices == 1)
directions = {}
for layer in hidden_layers:
H_train = np.array(hidden_states[layer])
H_pos_mean = H_train[pos_class].mean(axis=0, keepdims=True)
H_neg_mean = H_train[neg_class].mean(axis=0, keepdims=True)
directions[layer] = H_pos_mean - H_neg_mean
return directions
[docs]class RandomRepReader(RepReader):
"""
Get random directions for each hidden layer. Do not use hidden
states or train labels of any kind.
"""
def __init__(self, needs_hiddens=True):
super().__init__()
self.n_components = 1
self.needs_hiddens = needs_hiddens
[docs] def get_rep_directions(self, model, tokenizer, hidden_states, hidden_layers, **kwargs):
"""
Generate random direction vectors for each layer.
:param model: The language model.
:param tokenizer: Tokenizer associated with the model.
:param hidden_states: Hidden states per layer.
:param hidden_layers: Layers to consider.
:return: Dictionary of directions per layer.
"""
directions = {}
for layer in hidden_layers:
directions[layer] = np.expand_dims(np.random.randn(model.config.hidden_size), 0)
return directions
DIRECTION_FINDERS = {
'pca': PCARepReader,
'cluster_mean': ClusterMeanRepReader,
'random': RandomRepReader,
}