Source code for redditscore.models.redditmodel

# -*- coding: utf-8 -*-
"""
Generic RedditModel class for specific models to inherit

Author: Evgenii Nikitin <e.nikitin@nyu.edu>

Part of https://github.com/crazyfrogspb/RedditScore project

Copyright (c) 2018 Evgenii Nikitin. All rights reserved.
This work is licensed under the terms of the MIT license.
"""

import collections
import json
import os
import warnings
from abc import ABCMeta
from collections import Sequence
from itertools import product

import matplotlib.cm as cm
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.cluster.hierarchy as hac
from adjustText import adjust_text
from scipy.cluster.hierarchy import fcluster
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.exceptions import NotFittedError
from sklearn.manifold import TSNE
from sklearn.metrics import log_loss, make_scorer
from sklearn.model_selection import (PredefinedSplit, check_cv,
                                     cross_val_score, train_test_split)

DEFAULT_LINKAGE_PARS = {'method': 'average', 'metric': 'cosine',
                        'optimal_ordering': True}
DEFAULT_DENDROGRAM_PARS = {'leaf_font_size': 20, 'max_d': 0.75,
                           'orientation': 'right', 'distance_sort': True}
DEFAULT_CLUSTERING_PARS = {'t': 0.75, 'criterion': 'distance'}
DEFAULT_TSNE_PARS = {'perplexity': 10.0, 'early_exaggeration': 30.0,
                     'learning_rate': 5.0, 'n_iter': 1000, 'method': 'exact',
                     'random_state': 24}
DEFAULT_LEGEND_PARS = {'loc': 'best', 'bbox_to_anchor': (1, 0.5),
                       'fancybox': True, 'shadow': True, 'labels': [],
                       'fontsize': 16}


[docs]def top_k_accuracy_score(y_true, y_pred, k=3, normalize=True): true_labels = list(y_pred.columns) if not isinstance(y_pred, np.ndarray): y_pred = np.array(y_pred.values) if not isinstance(y_true, np.ndarray): y_true = np.array(y_true) if len(y_true.shape) == 2: y_true = np.argmax(y_true, axis=1) num_obs, num_labels = y_pred.shape idx = num_labels - k - 1 counter = 0 argsorted = np.argsort(y_pred, axis=1) for i in range(num_obs): if true_labels.index(y_true[i]) in argsorted[i, idx + 1:]: counter += 1 if normalize: return counter / num_obs else: return counter
[docs]def fancy_dendrogram(z, labels, **kwargs): # Function to plot fancy dendrograms # Taken from: # https://joernhees.de/blog/2015/08/26/scipy-hierarchical-clustering-and-dendrogram-tutorial/ max_d = kwargs.pop('max_d', None) if max_d and 'color_threshold' not in kwargs: kwargs['color_threshold'] = max_d annotate_above = kwargs.pop('annotate_above', 0) ddata = hac.dendrogram(z, labels=labels, **kwargs) if not kwargs.get('no_plot', False): plt.title('Hierarchical Clustering Dendrogram') plt.xlabel('Class') plt.ylabel('Metric') for i, d, c in zip(ddata['icoord'], ddata['dcoord'], ddata['color_list']): x = 0.5 * sum(i[1:3]) y = d[1] if y > annotate_above: plt.plot(x, y, 'o', c=c) plt.annotate("%.3g" % y, (x, y), xytext=(0, -5), textcoords='offset points', va='top', ha='center') if max_d: plt.axhline(y=max_d, c='k') return ddata
[docs]def word_ngrams(tokens, ngram_range, separator=' '): # Extract ngrams from the tokenized sequence min_n, max_n = ngram_range if max_n != 1: original_tokens = tokens if min_n == 1: tokens = list(original_tokens) min_n += 1 else: tokens = [] n_original_tokens = len(original_tokens) tokens_append = tokens.append space_join = separator.join for num in range(min_n, min(max_n + 1, n_original_tokens + 1)): for i in range(n_original_tokens - num + 1): tokens_append(space_join(original_tokens[i: i + num])) return tokens
[docs]def flatten(l): for el in l: if isinstance(el, collections.Iterable) and not isinstance(el, (str, bytes)): yield from flatten(el) else: yield el
[docs]class RedditModel(BaseEstimator, TransformerMixin, metaclass=ABCMeta): """Sklearn-style wrapper for the different architectures Parameters ---------- random_state : int, optional Random seed (the default is 24). Attributes ---------- model_type : str Model type name model : model object Model object that is being fitted params : dict Dictionary with model parameters _classes : list List of class labels fitted : bool Indicates whether model was fitted class_embeddings : np.array, shape (num_classes, vector_size) Matrix with class embeddings random_state: int Random seed used for validation splits and for models """ def __init__(self, random_state=24): self.random_state = random_state self.model = None self.classes_ = None self.fitted = False self.class_embeddings = None self.params = {} np.random.seed(random_state)
[docs] def cv_score(self, X, y, cv=0.2, scoring='accuracy', k=3): """Calculate validation score Parameters ---------- X: iterable, shape (n_samples, ) Sequence of tokenized documents y: iterable, shape (n_samples, ) Sequence of labels cv: float, int, cross-validation generator or an iterable, optional Determines the cross-validation splitting strategy. Possible inputs for cv are: - float, to use holdout set of this size - None, to use the default 3-fold cross validation, - integer, to specify the number of folds in a StratifiedKFold, - An object to be used as a cross-validation generator. - An iterable yielding train, test splits. scoring : string, callable or None, optional, optional A string (see sklearn model evaluation documentation) or a scorer callable object or 'top_k_accuracy' k: int, optional k parameter for 'top_k_accuracy' scoring Returns ---------- float Average value of the validation metrics """ if not isinstance(X, np.ndarray): X = np.array(X) if not isinstance(y, np.ndarray): y = np.array(y) if (not hasattr(y[0], '__array__') and isinstance(y[0], Sequence) and not isinstance(y[0], str)): raise ValueError( 'Cross validation does not support multilabels yet') self.classes_ = sorted(np.unique(y)) np.random.seed(self.random_state) if isinstance(cv, float): train_ind, __ = train_test_split(np.arange(0, len(X)), test_size=cv, shuffle=True, random_state=self.random_state) test_fold = np.zeros((len(X), )) test_fold[train_ind] = -1 cv_split = PredefinedSplit(test_fold) else: cv_split = check_cv(cv, y=y, classifier=True) if scoring == 'neg_log_loss': scoring = make_scorer(log_loss, labels=self.classes_, greater_is_better=False, needs_proba=True) elif scoring == 'top_k_accuracy': scoring = make_scorer(top_k_accuracy_score, k=k, greater_is_better=True, needs_proba=True) return cross_val_score(self.model, X, y, cv=cv_split, scoring=scoring)
[docs] def tune_params(self, X, y, param_grid=None, verbose=False, cv=0.2, scoring='accuracy', k=3, refit=False): """Find the best values of hyperparameters using chosen validation scheme Parameters ---------- X: iterable, shape (n_samples, ) Sequence of tokenized documents y: iterable, shape (n_samples, ) Sequence of labels param_grid: dict, optional Dictionary with parameters names as keys and lists of parameter settings as values. If None, loads deafult values from JSON file verbose: bool, optional If True, print scores after fitting each model cv: float, int, cross-validation generator or an iterable, optional Determines the cross-validation splitting strategy. Possible inputs for cv are: - float, to use holdout set of this size - None, to use the default 3-fold cross validation, - integer, to specify the number of folds in a StratifiedKFold, - An object to be used as a cross-validation generator. - An iterable yielding train, test splits. scoring : string, callable or None, optional A string (see sklearn model evaluation documentation) or a scorer callable object or 'top_k_accuracy' k: int, optional k parameter for 'top_k_accuracy' scoring refit: boolean, optional If True, refit model with the best found parameters Returns ---------- best_pars: dict Dictionary with the best combination of parameters best_value: float Best value of the chosen metric """ self.classes_ = sorted(np.unique(y)) model_name = None if param_grid is None: model_type = self.model.__class__.__name__ if model_type == 'FastTextClassifier': model_name = 'fasttext' elif model_type == 'Pipeline': model_type = self.model.named_steps['model'].__class__.__name__ if model_type in ['SVC', 'SVR']: model_name = 'SVM' elif model_type in ['BernoulliNB', 'MultinomialNB']: model_name = 'bayes' if model_name is None: raise ValueError( 'Default grid for model {} is not found'.format(model_type)) file = os.path.join(os.path.dirname(os.path.realpath(__file__)), os.path.join('..', 'data', 'model_pars.json')) with open(file) as f: param_grid = json.load(f)[model_name] if 'step0' not in param_grid: param_grid_temp = {'step0': param_grid} param_grid = param_grid_temp for step in range(len(param_grid)): best_pars = None best_value = -1000000.0 if verbose: print('Fitting step {}'.format(step)) try: current_grid = param_grid['step{}'.format(step)] except KeyError: raise KeyError('Step{} is not in the grid'.format(step)) if isinstance(current_grid, list) is False: current_grid = [current_grid] for param_combination in current_grid: items = sorted(param_combination.items()) keys, values = zip(*items) for v in product(*values): params = dict(zip(keys, v)) self.set_params(**params) if verbose: print('Now fitting model for {}'.format(params)) score = np.mean(self.cv_score(X, y, cv, scoring)) if verbose: print('{}: {}'.format(scoring, score)) if score > best_value: best_pars = params best_value = score self.set_params(**best_pars) if verbose: print('Best {}: {} for {}'.format(scoring, best_value, best_pars)) if refit: self.set_params(**best_pars) self.fit(X, y) return best_pars, best_value
[docs] def fit(self, X, y): """Fit model Parameters ---------- X: iterable, shape (n_samples, ) Sequence of tokenized documents y: iterable, shape (n_samples, ) Sequence of labels Returns ------- RedditModel Fitted model object """ self.classes_ = np.array(sorted(np.unique(y))) if not isinstance(X, np.ndarray): X = np.array(X) if not isinstance(y, np.ndarray): y = np.array(y) self.model.fit(X, y) self.fitted = True return self
[docs] def predict(self, X): """Predict the most likely label Parameters ---------- X: iterable, shape (n_samples, ) Sequence of tokenized documents y: iterable, shape (n_samples, ) Sequence of labels Returns ---------- array, shape (n_samples, ) Predicted class labels """ if not self.fitted: raise NotFittedError('Model has to be fitted first') if not isinstance(X, np.ndarray): X = np.array(X, ndmin=1) return self.model.predict(X)
[docs] def predict_proba(self, X): """Predict the most likely label Parameters ---------- X: iterable, shape (n_samples, ) Sequence of tokenized documents y: iterable, shape (n_samples, ) Sequence of labels Returns ---------- array, shape (n_samples, num_classes) Predicted class probabilities """ if not self.fitted: raise NotFittedError('Model has to be fitted first') if isinstance(X, pd.DataFrame) or isinstance(X, pd.Series): indices = X.index else: indices = list(range(len(X))) if not isinstance(X, np.ndarray): X = np.array(X, ndmin=1) probs = self.model.predict_proba(X) probs.index = indices return probs
[docs] def get_params(self, deep=None): """ Get parameters of the model Returns ---------- dict Dictionary with model parameters """ params = {} for key in self._get_param_names(): params[key] = getattr(self, key, None) params.update(self.model.get_params()) return params
[docs] def set_params(self, **params): """Set parameters of the model Parameters ---------- **params Model parameters to update """ if not params: return self for key, value in params.items(): if hasattr(self, key): setattr(self, key, value) self.params[key] = value elif hasattr(self.model, key): setattr(self.model, key, value) self.params[key] = value else: warnings.warn('Parameter {} does not exist'.format(key)) return self
[docs] def plot_analytics(self, classes=None, fig_sizes=((20, 15), (20, 20)), linkage_pars=None, dendrogram_pars=None, clustering_pars=None, tsne_pars=None, legend_pars=None, label_font_size=17): """Plot hieracical clustering dendrogram and T-SNE visualization based on the learned class embeddings Parameters ---------- classes: iter, optional Iterable, contains list of class labels to include to the plots. If None, use all classes fig_sizes: tuple of tuples, optional Figure sizes for plots linkage_pars: dict, optional Dictionary of parameters for hieracical clustering. (scipy.cluster.hierarchy.linkage) dendrogram_pars: dict, optional Dictionary of parameters for plotting dendrogram. (scipy.cluster.hierarchy.dendrogram) clustering_pars: dict, optional Dictionary of parameters for producing flat clusters. (scipy.cluster.hierarchy.fcluster) tsne_pars: dict, optional Dictionary of parameters for T-SNE. (sklearn.manifold.TSNE) legend_pars: dict, optional Dictionary of parameters for legend plotting (matplotlib.pyplot.legend) label_font_size: int, optional Font size for the labels on T-SNE plot """ if not self.fitted: raise NotFittedError('Model has to be fitted first') if self.class_embeddings is None: raise ValueError( 'Plotting dendrograms is not available for this class of model') if classes is None: classes = self.classes_ if linkage_pars is None: linkage_pars = DEFAULT_LINKAGE_PARS else: linkage_pars = {**DEFAULT_LINKAGE_PARS, **linkage_pars} if dendrogram_pars is None: dendrogram_pars = DEFAULT_DENDROGRAM_PARS else: dendrogram_pars = {**DEFAULT_DENDROGRAM_PARS, **dendrogram_pars} if clustering_pars is None: clustering_pars = DEFAULT_CLUSTERING_PARS else: clustering_pars = {**DEFAULT_CLUSTERING_PARS, **clustering_pars} if tsne_pars is None: tsne_pars = DEFAULT_TSNE_PARS else: tsne_pars = {**DEFAULT_TSNE_PARS, **tsne_pars} if legend_pars is None: legend_pars = DEFAULT_LEGEND_PARS else: legend_pars = {**DEFAULT_LEGEND_PARS, **legend_pars} z = hac.linkage(self.class_embeddings.loc[classes, :], **linkage_pars) fig1 = plt.figure(figsize=fig_sizes[0]) fancy_dendrogram(z, classes, **dendrogram_pars) clusters = fcluster(z, **clustering_pars) - 1 df_clust = pd.DataFrame({'classes': classes, 'cluster': clusters}) num_cl = len(df_clust.cluster.unique()) tsne = TSNE(n_components=2, **tsne_pars) Y = tsne.fit_transform(self.class_embeddings.loc[classes, :]) fig2, ax2 = plt.subplots(figsize=fig_sizes[1]) colors = cm.jet(np.linspace(0, 1, num_cl)) for i in range(num_cl): ax2.plot(Y[clusters == i, 0], Y[clusters == i, 1], marker='o', linestyle='', color=colors[i]) ax2.margins(0.05) ax2.legend(**legend_pars) texts = [] for i in range(len(classes)): texts.append(plt.text(Y[i, 0], Y[i, 1], classes[i], fontsize=label_font_size)) adjust_text(texts, arrowprops=dict( arrowstyle="-", color='black', lw=0.55)) return fig1, fig2