# -*- coding: utf-8 -*-
"""
Generic RedditModel class for specific models to inherit
Author: Evgenii Nikitin <e.nikitin@nyu.edu>
Part of https://github.com/crazyfrogspb/RedditScore project
Copyright (c) 2018 Evgenii Nikitin. All rights reserved.
This work is licensed under the terms of the MIT license.
"""
import collections
import json
import os
import warnings
from abc import ABCMeta
from collections import Sequence
from itertools import product
import matplotlib.cm as cm
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.cluster.hierarchy as hac
from adjustText import adjust_text
from scipy.cluster.hierarchy import fcluster
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.exceptions import NotFittedError
from sklearn.manifold import TSNE
from sklearn.metrics import log_loss, make_scorer
from sklearn.model_selection import (PredefinedSplit, check_cv,
cross_val_score, train_test_split)
DEFAULT_LINKAGE_PARS = {'method': 'average', 'metric': 'cosine',
'optimal_ordering': True}
DEFAULT_DENDROGRAM_PARS = {'leaf_font_size': 20, 'max_d': 0.75,
'orientation': 'right', 'distance_sort': True}
DEFAULT_CLUSTERING_PARS = {'t': 0.75, 'criterion': 'distance'}
DEFAULT_TSNE_PARS = {'perplexity': 10.0, 'early_exaggeration': 30.0,
'learning_rate': 5.0, 'n_iter': 1000, 'method': 'exact',
'random_state': 24}
DEFAULT_LEGEND_PARS = {'loc': 'best', 'bbox_to_anchor': (1, 0.5),
'fancybox': True, 'shadow': True, 'labels': [],
'fontsize': 16}
[docs]def top_k_accuracy_score(y_true, y_pred, k=3, normalize=True):
true_labels = list(y_pred.columns)
if not isinstance(y_pred, np.ndarray):
y_pred = np.array(y_pred.values)
if not isinstance(y_true, np.ndarray):
y_true = np.array(y_true)
if len(y_true.shape) == 2:
y_true = np.argmax(y_true, axis=1)
num_obs, num_labels = y_pred.shape
idx = num_labels - k - 1
counter = 0
argsorted = np.argsort(y_pred, axis=1)
for i in range(num_obs):
if true_labels.index(y_true[i]) in argsorted[i, idx + 1:]:
counter += 1
if normalize:
return counter / num_obs
else:
return counter
[docs]def fancy_dendrogram(z, labels, **kwargs):
# Function to plot fancy dendrograms
# Taken from:
# https://joernhees.de/blog/2015/08/26/scipy-hierarchical-clustering-and-dendrogram-tutorial/
max_d = kwargs.pop('max_d', None)
if max_d and 'color_threshold' not in kwargs:
kwargs['color_threshold'] = max_d
annotate_above = kwargs.pop('annotate_above', 0)
ddata = hac.dendrogram(z, labels=labels, **kwargs)
if not kwargs.get('no_plot', False):
plt.title('Hierarchical Clustering Dendrogram')
plt.xlabel('Class')
plt.ylabel('Metric')
for i, d, c in zip(ddata['icoord'], ddata['dcoord'], ddata['color_list']):
x = 0.5 * sum(i[1:3])
y = d[1]
if y > annotate_above:
plt.plot(x, y, 'o', c=c)
plt.annotate("%.3g" % y, (x, y), xytext=(0, -5),
textcoords='offset points',
va='top', ha='center')
if max_d:
plt.axhline(y=max_d, c='k')
return ddata
[docs]def word_ngrams(tokens, ngram_range, separator=' '):
# Extract ngrams from the tokenized sequence
min_n, max_n = ngram_range
if max_n != 1:
original_tokens = tokens
if min_n == 1:
tokens = list(original_tokens)
min_n += 1
else:
tokens = []
n_original_tokens = len(original_tokens)
tokens_append = tokens.append
space_join = separator.join
for num in range(min_n, min(max_n + 1, n_original_tokens + 1)):
for i in range(n_original_tokens - num + 1):
tokens_append(space_join(original_tokens[i: i + num]))
return tokens
[docs]def flatten(l):
for el in l:
if isinstance(el, collections.Iterable) and not isinstance(el, (str, bytes)):
yield from flatten(el)
else:
yield el
[docs]class RedditModel(BaseEstimator, TransformerMixin, metaclass=ABCMeta):
"""Sklearn-style wrapper for the different architectures
Parameters
----------
random_state : int, optional
Random seed (the default is 24).
Attributes
----------
model_type : str
Model type name
model : model object
Model object that is being fitted
params : dict
Dictionary with model parameters
_classes : list
List of class labels
fitted : bool
Indicates whether model was fitted
class_embeddings : np.array, shape (num_classes, vector_size)
Matrix with class embeddings
random_state: int
Random seed used for validation splits and for models
"""
def __init__(self, random_state=24):
self.random_state = random_state
self.model = None
self.classes_ = None
self.fitted = False
self.class_embeddings = None
self.params = {}
np.random.seed(random_state)
[docs] def cv_score(self, X, y, cv=0.2, scoring='accuracy', k=3):
"""Calculate validation score
Parameters
----------
X: iterable, shape (n_samples, )
Sequence of tokenized documents
y: iterable, shape (n_samples, )
Sequence of labels
cv: float, int, cross-validation generator or an iterable, optional
Determines the cross-validation splitting strategy. Possible inputs for cv are:
- float, to use holdout set of this size
- None, to use the default 3-fold cross validation,
- integer, to specify the number of folds in a StratifiedKFold,
- An object to be used as a cross-validation generator.
- An iterable yielding train, test splits.
scoring : string, callable or None, optional, optional
A string (see sklearn model evaluation documentation) or a scorer callable object or 'top_k_accuracy'
k: int, optional
k parameter for 'top_k_accuracy' scoring
Returns
----------
float
Average value of the validation metrics
"""
if not isinstance(X, np.ndarray):
X = np.array(X)
if not isinstance(y, np.ndarray):
y = np.array(y)
if (not hasattr(y[0], '__array__') and isinstance(y[0], Sequence)
and not isinstance(y[0], str)):
raise ValueError(
'Cross validation does not support multilabels yet')
self.classes_ = sorted(np.unique(y))
np.random.seed(self.random_state)
if isinstance(cv, float):
train_ind, __ = train_test_split(np.arange(0, len(X)),
test_size=cv, shuffle=True,
random_state=self.random_state)
test_fold = np.zeros((len(X), ))
test_fold[train_ind] = -1
cv_split = PredefinedSplit(test_fold)
else:
cv_split = check_cv(cv, y=y, classifier=True)
if scoring == 'neg_log_loss':
scoring = make_scorer(log_loss, labels=self.classes_,
greater_is_better=False, needs_proba=True)
elif scoring == 'top_k_accuracy':
scoring = make_scorer(top_k_accuracy_score, k=k,
greater_is_better=True, needs_proba=True)
return cross_val_score(self.model, X, y, cv=cv_split,
scoring=scoring)
[docs] def tune_params(self, X, y, param_grid=None,
verbose=False, cv=0.2, scoring='accuracy', k=3, refit=False):
"""Find the best values of hyperparameters using chosen validation scheme
Parameters
----------
X: iterable, shape (n_samples, )
Sequence of tokenized documents
y: iterable, shape (n_samples, )
Sequence of labels
param_grid: dict, optional
Dictionary with parameters names as keys and
lists of parameter settings as values.
If None, loads deafult values from JSON file
verbose: bool, optional
If True, print scores after fitting each model
cv: float, int, cross-validation generator or an iterable, optional
Determines the cross-validation splitting strategy. Possible inputs for cv are:
- float, to use holdout set of this size
- None, to use the default 3-fold cross validation,
- integer, to specify the number of folds in a StratifiedKFold,
- An object to be used as a cross-validation generator.
- An iterable yielding train, test splits.
scoring : string, callable or None, optional
A string (see sklearn model evaluation documentation) or a scorer callable object or 'top_k_accuracy'
k: int, optional
k parameter for 'top_k_accuracy' scoring
refit: boolean, optional
If True, refit model with the best found parameters
Returns
----------
best_pars: dict
Dictionary with the best combination of parameters
best_value: float
Best value of the chosen metric
"""
self.classes_ = sorted(np.unique(y))
model_name = None
if param_grid is None:
model_type = self.model.__class__.__name__
if model_type == 'FastTextClassifier':
model_name = 'fasttext'
elif model_type == 'Pipeline':
model_type = self.model.named_steps['model'].__class__.__name__
if model_type in ['SVC', 'SVR']:
model_name = 'SVM'
elif model_type in ['BernoulliNB', 'MultinomialNB']:
model_name = 'bayes'
if model_name is None:
raise ValueError(
'Default grid for model {} is not found'.format(model_type))
file = os.path.join(os.path.dirname(os.path.realpath(__file__)),
os.path.join('..', 'data', 'model_pars.json'))
with open(file) as f:
param_grid = json.load(f)[model_name]
if 'step0' not in param_grid:
param_grid_temp = {'step0': param_grid}
param_grid = param_grid_temp
for step in range(len(param_grid)):
best_pars = None
best_value = -1000000.0
if verbose:
print('Fitting step {}'.format(step))
try:
current_grid = param_grid['step{}'.format(step)]
except KeyError:
raise KeyError('Step{} is not in the grid'.format(step))
if isinstance(current_grid, list) is False:
current_grid = [current_grid]
for param_combination in current_grid:
items = sorted(param_combination.items())
keys, values = zip(*items)
for v in product(*values):
params = dict(zip(keys, v))
self.set_params(**params)
if verbose:
print('Now fitting model for {}'.format(params))
score = np.mean(self.cv_score(X, y, cv, scoring))
if verbose:
print('{}: {}'.format(scoring, score))
if score > best_value:
best_pars = params
best_value = score
self.set_params(**best_pars)
if verbose:
print('Best {}: {} for {}'.format(scoring, best_value, best_pars))
if refit:
self.set_params(**best_pars)
self.fit(X, y)
return best_pars, best_value
[docs] def fit(self, X, y):
"""Fit model
Parameters
----------
X: iterable, shape (n_samples, )
Sequence of tokenized documents
y: iterable, shape (n_samples, )
Sequence of labels
Returns
-------
RedditModel
Fitted model object
"""
self.classes_ = np.array(sorted(np.unique(y)))
if not isinstance(X, np.ndarray):
X = np.array(X)
if not isinstance(y, np.ndarray):
y = np.array(y)
self.model.fit(X, y)
self.fitted = True
return self
[docs] def predict(self, X):
"""Predict the most likely label
Parameters
----------
X: iterable, shape (n_samples, )
Sequence of tokenized documents
y: iterable, shape (n_samples, )
Sequence of labels
Returns
----------
array, shape (n_samples, )
Predicted class labels
"""
if not self.fitted:
raise NotFittedError('Model has to be fitted first')
if not isinstance(X, np.ndarray):
X = np.array(X, ndmin=1)
return self.model.predict(X)
[docs] def predict_proba(self, X):
"""Predict the most likely label
Parameters
----------
X: iterable, shape (n_samples, )
Sequence of tokenized documents
y: iterable, shape (n_samples, )
Sequence of labels
Returns
----------
array, shape (n_samples, num_classes)
Predicted class probabilities
"""
if not self.fitted:
raise NotFittedError('Model has to be fitted first')
if isinstance(X, pd.DataFrame) or isinstance(X, pd.Series):
indices = X.index
else:
indices = list(range(len(X)))
if not isinstance(X, np.ndarray):
X = np.array(X, ndmin=1)
probs = self.model.predict_proba(X)
probs.index = indices
return probs
[docs] def get_params(self, deep=None):
"""
Get parameters of the model
Returns
----------
dict
Dictionary with model parameters
"""
params = {}
for key in self._get_param_names():
params[key] = getattr(self, key, None)
params.update(self.model.get_params())
return params
[docs] def set_params(self, **params):
"""Set parameters of the model
Parameters
----------
**params
Model parameters to update
"""
if not params:
return self
for key, value in params.items():
if hasattr(self, key):
setattr(self, key, value)
self.params[key] = value
elif hasattr(self.model, key):
setattr(self.model, key, value)
self.params[key] = value
else:
warnings.warn('Parameter {} does not exist'.format(key))
return self
[docs] def plot_analytics(self, classes=None, fig_sizes=((20, 15), (20, 20)),
linkage_pars=None, dendrogram_pars=None,
clustering_pars=None, tsne_pars=None,
legend_pars=None, label_font_size=17):
"""Plot hieracical clustering dendrogram and T-SNE visualization
based on the learned class embeddings
Parameters
----------
classes: iter, optional
Iterable, contains list of class labels to include to the plots.
If None, use all classes
fig_sizes: tuple of tuples, optional
Figure sizes for plots
linkage_pars: dict, optional
Dictionary of parameters for hieracical clustering.
(scipy.cluster.hierarchy.linkage)
dendrogram_pars: dict, optional
Dictionary of parameters for plotting dendrogram.
(scipy.cluster.hierarchy.dendrogram)
clustering_pars: dict, optional
Dictionary of parameters for producing flat clusters.
(scipy.cluster.hierarchy.fcluster)
tsne_pars: dict, optional
Dictionary of parameters for T-SNE.
(sklearn.manifold.TSNE)
legend_pars: dict, optional
Dictionary of parameters for legend plotting
(matplotlib.pyplot.legend)
label_font_size: int, optional
Font size for the labels on T-SNE plot
"""
if not self.fitted:
raise NotFittedError('Model has to be fitted first')
if self.class_embeddings is None:
raise ValueError(
'Plotting dendrograms is not available for this class of model')
if classes is None:
classes = self.classes_
if linkage_pars is None:
linkage_pars = DEFAULT_LINKAGE_PARS
else:
linkage_pars = {**DEFAULT_LINKAGE_PARS, **linkage_pars}
if dendrogram_pars is None:
dendrogram_pars = DEFAULT_DENDROGRAM_PARS
else:
dendrogram_pars = {**DEFAULT_DENDROGRAM_PARS, **dendrogram_pars}
if clustering_pars is None:
clustering_pars = DEFAULT_CLUSTERING_PARS
else:
clustering_pars = {**DEFAULT_CLUSTERING_PARS, **clustering_pars}
if tsne_pars is None:
tsne_pars = DEFAULT_TSNE_PARS
else:
tsne_pars = {**DEFAULT_TSNE_PARS, **tsne_pars}
if legend_pars is None:
legend_pars = DEFAULT_LEGEND_PARS
else:
legend_pars = {**DEFAULT_LEGEND_PARS, **legend_pars}
z = hac.linkage(self.class_embeddings.loc[classes, :], **linkage_pars)
fig1 = plt.figure(figsize=fig_sizes[0])
fancy_dendrogram(z, classes, **dendrogram_pars)
clusters = fcluster(z, **clustering_pars) - 1
df_clust = pd.DataFrame({'classes': classes, 'cluster': clusters})
num_cl = len(df_clust.cluster.unique())
tsne = TSNE(n_components=2, **tsne_pars)
Y = tsne.fit_transform(self.class_embeddings.loc[classes, :])
fig2, ax2 = plt.subplots(figsize=fig_sizes[1])
colors = cm.jet(np.linspace(0, 1, num_cl))
for i in range(num_cl):
ax2.plot(Y[clusters == i, 0], Y[clusters == i, 1],
marker='o', linestyle='', color=colors[i])
ax2.margins(0.05)
ax2.legend(**legend_pars)
texts = []
for i in range(len(classes)):
texts.append(plt.text(Y[i, 0], Y[i, 1], classes[i],
fontsize=label_font_size))
adjust_text(texts, arrowprops=dict(
arrowstyle="-", color='black', lw=0.55))
return fig1, fig2