data.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. import dataclasses as dc
  2. import pickle
  3. import typing as ty
  4. import warnings
  5. from collections import Counter
  6. from copy import deepcopy
  7. from pathlib import Path
  8. import numpy as np
  9. import sklearn.preprocessing
  10. import torch
  11. from category_encoders import LeaveOneOutEncoder
  12. from sklearn.impute import SimpleImputer
  13. from . import util
  14. ArrayDict = ty.Dict[str, np.ndarray]
  15. def normalize(
  16. X: ArrayDict, normalization: str, seed: int, noise: float = 1e-3
  17. ) -> ArrayDict:
  18. X_train = X['train'].copy()
  19. if normalization == 'standard':
  20. normalizer = sklearn.preprocessing.StandardScaler()
  21. elif normalization == 'quantile':
  22. normalizer = sklearn.preprocessing.QuantileTransformer(
  23. output_distribution='normal',
  24. n_quantiles=max(min(X['train'].shape[0] // 30, 1000), 10),
  25. subsample=1e9,
  26. random_state=seed,
  27. )
  28. if noise:
  29. stds = np.std(X_train, axis=0, keepdims=True)
  30. noise_std = noise / np.maximum(stds, noise) # type: ignore[code]
  31. X_train += noise_std * np.random.default_rng(seed).standard_normal( # type: ignore[code]
  32. X_train.shape
  33. )
  34. else:
  35. util.raise_unknown('normalization', normalization)
  36. normalizer.fit(X_train)
  37. return {k: normalizer.transform(v) for k, v in X.items()} # type: ignore[code]
  38. @dc.dataclass
  39. class Dataset:
  40. N: ty.Optional[ArrayDict]
  41. C: ty.Optional[ArrayDict]
  42. y: ArrayDict
  43. info: ty.Dict[str, ty.Any]
  44. folder: ty.Optional[Path]
  45. @classmethod
  46. def from_dir(cls, dir_: ty.Union[Path, str]) -> 'Dataset':
  47. dir_ = Path(dir_)
  48. def load(item) -> ArrayDict:
  49. return {
  50. x: ty.cast(np.ndarray, np.load(dir_ / f'{item}_{x}.npy', allow_pickle=True)) # type: ignore[code]
  51. for x in ['train', 'val', 'test', 'test_backdoor']
  52. }
  53. return Dataset(
  54. load('N') if dir_.joinpath('N_train.npy').exists() else None,
  55. load('C') if dir_.joinpath('C_train.npy').exists() else None,
  56. load('y'),
  57. util.load_json(dir_ / 'info.json'),
  58. dir_,
  59. )
  60. @property
  61. def is_binclass(self) -> bool:
  62. return self.info['task_type'] == util.BINCLASS
  63. @property
  64. def is_multiclass(self) -> bool:
  65. return self.info['task_type'] == util.MULTICLASS
  66. @property
  67. def is_regression(self) -> bool:
  68. return self.info['task_type'] == util.REGRESSION
  69. @property
  70. def n_num_features(self) -> int:
  71. return self.info['n_num_features']
  72. @property
  73. def n_cat_features(self) -> int:
  74. return self.info['n_cat_features']
  75. @property
  76. def n_features(self) -> int:
  77. return self.n_num_features + self.n_cat_features
  78. def size(self, part: str) -> int:
  79. X = self.N if self.N is not None else self.C
  80. assert X is not None
  81. return len(X[part])
  82. def build_X(
  83. self,
  84. *,
  85. normalization: ty.Optional[str],
  86. num_nan_policy: str,
  87. cat_nan_policy: str,
  88. cat_policy: str,
  89. cat_min_frequency: float = 0.0,
  90. seed: int,
  91. ) -> ty.Union[ArrayDict, ty.Tuple[ArrayDict, ArrayDict]]:
  92. if self.N:
  93. N = deepcopy(self.N)
  94. num_nan_masks = {k: np.isnan(v) for k, v in N.items()}
  95. if any(x.any() for x in num_nan_masks.values()): # type: ignore[code]
  96. if num_nan_policy == 'mean':
  97. num_new_values = np.nanmean(self.N['train'], axis=0)
  98. else:
  99. util.raise_unknown('numerical NaN policy', num_nan_policy)
  100. for k, v in N.items():
  101. num_nan_indices = np.where(num_nan_masks[k])
  102. v[num_nan_indices] = np.take(num_new_values, num_nan_indices[1])
  103. if normalization:
  104. N = normalize(N, normalization, seed)
  105. else:
  106. N = None
  107. if cat_policy == 'drop' or not self.C:
  108. assert N is not None
  109. return N
  110. C = deepcopy(self.C)
  111. cat_nan_masks = {k: v == 'nan' for k, v in C.items()}
  112. if any(x.any() for x in cat_nan_masks.values()): # type: ignore[code]
  113. if cat_nan_policy == 'new':
  114. cat_new_value = '___null___'
  115. imputer = None
  116. elif cat_nan_policy == 'most_frequent':
  117. cat_new_value = None
  118. imputer = SimpleImputer(strategy=cat_nan_policy) # type: ignore[code]
  119. imputer.fit(C['train'])
  120. else:
  121. util.raise_unknown('categorical NaN policy', cat_nan_policy)
  122. if imputer:
  123. C = {k: imputer.transform(v) for k, v in C.items()}
  124. else:
  125. for k, v in C.items():
  126. cat_nan_indices = np.where(cat_nan_masks[k])
  127. v[cat_nan_indices] = cat_new_value
  128. if cat_min_frequency:
  129. C = ty.cast(ArrayDict, C)
  130. min_count = round(len(C['train']) * cat_min_frequency)
  131. rare_value = '___rare___'
  132. C_new = {x: [] for x in C}
  133. for column_idx in range(C['train'].shape[1]):
  134. counter = Counter(C['train'][:, column_idx].tolist())
  135. popular_categories = {k for k, v in counter.items() if v >= min_count}
  136. for part in C_new:
  137. C_new[part].append(
  138. [
  139. (x if x in popular_categories else rare_value)
  140. for x in C[part][:, column_idx].tolist()
  141. ]
  142. )
  143. C = {k: np.array(v).T for k, v in C_new.items()}
  144. unknown_value = np.iinfo('int64').max - 3
  145. encoder = sklearn.preprocessing.OrdinalEncoder(
  146. handle_unknown='use_encoded_value', # type: ignore[code]
  147. unknown_value=unknown_value, # type: ignore[code]
  148. dtype='int64', # type: ignore[code]
  149. ).fit(C['train'])
  150. C = {k: encoder.transform(v) for k, v in C.items()}
  151. max_values = C['train'].max(axis=0)
  152. for part in ['val', 'test', 'test_backdoor']:
  153. for column_idx in range(C[part].shape[1]):
  154. C[part][C[part][:, column_idx] == unknown_value, column_idx] = (
  155. max_values[column_idx] + 1
  156. )
  157. if cat_policy == 'indices':
  158. result = (N, C)
  159. elif cat_policy == 'ohe':
  160. ohe = sklearn.preprocessing.OneHotEncoder(
  161. handle_unknown='ignore', sparse=False, dtype='float32' # type: ignore[code]
  162. )
  163. ohe.fit(C['train'])
  164. C = {k: ohe.transform(v) for k, v in C.items()}
  165. result = C if N is None else {x: np.hstack((N[x], C[x])) for x in N}
  166. elif cat_policy == 'counter':
  167. assert seed is not None
  168. loo = LeaveOneOutEncoder(sigma=0.1, random_state=seed, return_df=False)
  169. loo.fit(C['train'], self.y['train'])
  170. C = {k: loo.transform(v).astype('float32') for k, v in C.items()} # type: ignore[code]
  171. if not isinstance(C['train'], np.ndarray):
  172. C = {k: v.values for k, v in C.items()} # type: ignore[code]
  173. if normalization:
  174. C = normalize(C, normalization, seed, inplace=True) # type: ignore[code]
  175. result = C if N is None else {x: np.hstack((N[x], C[x])) for x in N}
  176. else:
  177. util.raise_unknown('categorical policy', cat_policy)
  178. return result # type: ignore[code]
  179. def build_y(
  180. self, policy: ty.Optional[str]
  181. ) -> ty.Tuple[ArrayDict, ty.Optional[ty.Dict[str, ty.Any]]]:
  182. if self.is_regression:
  183. assert policy == 'mean_std'
  184. y = deepcopy(self.y)
  185. if policy:
  186. if not self.is_regression:
  187. warnings.warn('y_policy is not None, but the task is NOT regression')
  188. info = None
  189. elif policy == 'mean_std':
  190. mean, std = self.y['train'].mean(), self.y['train'].std()
  191. y = {k: (v - mean) / std for k, v in y.items()}
  192. info = {'policy': policy, 'mean': mean, 'std': std}
  193. else:
  194. util.raise_unknown('y policy', policy)
  195. else:
  196. info = None
  197. return y, info
  198. def to_tensors(data: ArrayDict) -> ty.Dict[str, torch.Tensor]:
  199. return {k: torch.as_tensor(v) for k, v in data.items()}
  200. def load_dataset_info(dataset_name: str) -> ty.Dict[str, ty.Any]:
  201. info = util.load_json(env.DATA_DIR / dataset_name / 'info.json')
  202. info['size'] = info['train_size'] + info['val_size'] + info['test_size'] + info['test_backdoor_size']
  203. return info