# -*- coding: utf-8 -*-
CrazyTokenizer: spaCy-based tokenizer with Twitter- and Reddit-specific features

Splitting hashtags is based on the idea from

import html
import json
import os
import re
import string
import sys
import warnings
from collections import OrderedDict
from http import client
from math import log
from socket import gaierror
from urllib import parse

import requests
import tldextract
from bs4 import BeautifulSoup
from import urlopen
from eventlet.timeout import Timeout
from redditscore.models.redditmodel import word_ngrams
from spacy.lang.en import English
from spacy.matcher import Matcher
from spacy.tokens import Doc, Token

    from nltk.corpus import stopwords
    from nltk.stem import PorterStemmer, WordNetLemmatizer
except ImportError:
        'nltk could not be imported, some features will be unavailable')

Token.set_extension('transformed_text', default='', force=True)
Doc.set_extension('tokens', default='', force=True)


POS_EMOJIS = [u'😂', u'❤', u'♥', u'😍', u'😘', u'😊', u'👌', u'💕',
              u'👏', u'😁', u'☺', u'♡', u'👍', u'✌', u'😏', u'😉', u'🙌', u'😄']
NEG_EMOJIS = [u'😭', u'😩', u'😒', u'😔', u'😱']

NORMALIZE_RE = re.compile(r"([a-zA-Z])\1\1+")
ALPHA_DIGITS_RE = re.compile(r"[a-zA-Z0-9_]+")
TWITTER_HANDLES_RE = re.compile(r"@\w{1,15}")
REDDITORS_RE = re.compile(r"u/\w{1,20}")
SUBREDDITS_RE = re.compile(r"/r/\w{1,20}")
QUOTES_RE = re.compile(r'^".*"$')
REDDIT_QUOTES_RE = re.compile(r'&gt;[^\n]+\n')
BREAKS_RE = re.compile(r"[\r\n]+")
URLS_RE = re.compile(
    r"http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\ ),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")

UTF_CHARS = r'a-z0-9_\u00c0-\u00d6\u00d8-\u00f6\u00f8-\u00ff'
TAG_EXP = r'(^|[^0-9A-Z&/]+)(#|\uff03)([0-9A-Z_]*[A-Z_]+[%s]*)' % UTF_CHARS

URL_SHORTENERS = ['t', 'bit', 'goo', 'tinyurl']

DECONTRACTIONS = OrderedDict([("won't", "will not"), ("can't", "can not"),
                              ("n't", " not"), ("'re", " are"), ("'s", " is"),
                              ("'d", " would"), ("'ll", " will"),
                              ("'t", " not"), ("'ve", " have"),
                              ("'m", " am")])

DATA_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)),

with open(os.path.join(DATA_PATH, 'emojis_utf.json')) as f:
    EMOJIS_UTF = json.load(f)
with open(os.path.join(DATA_PATH, 'emojis_unicode.json')) as f:
    EMOJIS_UNICODE = json.load(f)
with open(os.path.join(DATA_PATH, 'latin_chars.json')) as f:
    LATIN_CHARS = json.load(f)

EMOJIS_UTF_RE = re.compile(r"\\x", re.IGNORECASE)
EMOJIS_UNICODE_RE = re.compile(r"u\+", re.IGNORECASE)
EMOJIS_UTF_NOSPACE_RE = re.compile(r'(?<!x..)(\\x)', re.IGNORECASE)
EMOJIS_UNICODE_NOSPACE_RE = re.compile(r'(\D{2,})(U\+)', re.IGNORECASE)
LATIN_CHARS_RE = re.compile(r'\\xe2\\', re.IGNORECASE)

for key, value in EMOJIS_UTF.items():
    EMOJIS_UTF_PATS[key] = re.compile(re.escape(key), re.IGNORECASE)
for key, value in EMOJIS_UNICODE.items():
    EMOJIS_UNICODE_PATS[key] = re.compile(re.escape(key), re.IGNORECASE)
for key, value in LATIN_CHARS.items():
    LATIN_CHARS_PATS[key] = re.compile(re.escape(key), re.IGNORECASE)

[docs]def alpha_digits_check(text): return bool(ALPHA_DIGITS_RE.fullmatch(text))
[docs]def hashtag_check(text): return bool(HASHTAGS_RE.fullmatch(text))
[docs]def twitter_handle_check(text): return bool(TWITTER_HANDLES_RE.fullmatch(text))
[docs]def retokenize_check(text): if (text.count('@') > 1 or text.count('#') > 1) and text.count(' ') == 0: return True elif (text.count('@') == 1 or text.count('#') == 1) \ and text.startswith('@') is False and text.startswith('#') is False: return True return False
[docs]def batch(iterable, n=1): length = len(iterable) for ndx in range(0, l, n): yield iterable[ndx:min(ndx + n, length)]
[docs]def unshorten_url(url, url_shorteners=None, verbose=False): # Fast URL domain extractor domain = tldextract.extract(url).domain if url_shorteners is not None and domain not in url_shorteners: return domain parsed = parse.urlparse(url) if parsed.scheme == 'http': h = client.HTTPConnection(parsed.netloc) elif parsed.scheme == 'https': h = client.HTTPSConnection(parsed.netloc) else: return domain resource = parsed.path if parsed.query != "": resource += "?" + parsed.query try: h.request('HEAD', resource) except (TimeoutError, ConnectionRefusedError, ConnectionResetError, gaierror): if verbose: warnings.warn('Connection error for {}'.format(url)) return domain response = h.getresponse() if response.status // 100 == 3 and response.getheader('Location'): return unshorten_url(response.getheader('Location'), URL_SHORTENERS, verbose) else: return domain
[docs]def get_url_title(url, verbose=False): soup = None try: with Timeout(TIMEOUT, False): response = urlopen(url) if 'text/html' not in response.getheader('Content-Type'): warnings.warn("Url {} is not a text/html page".format(url)) return '' soup = BeautifulSoup(response, "lxml") except Exception: if verbose: warnings.warn("Couldn't extract title from url {}".format(url)) return '' if soup is None or soup.title is None or soup.title.string is None: return '' return soup.title.string
[docs]def get_twitter_realname(twitter_handle): try: response = requests.get('' + twitter_handle) except requests.exceptions.ConnectionError: warnings.warn( "Couldn't extract real name for {}".format(twitter_handle)) return '' soup = BeautifulSoup(response.text, "lxml") if soup.title is not None: realname = soup.title.text.split('(')[0] else: realname = '' if 'Twitter' in realname: return '' else: return realname
[docs]class CrazyTokenizer(object): """ Tokenizer with Reddit- and Twitter-specific options Parameters ---------- lowercase : bool, optional If True, lowercase all tokens. Defaults to True. keepcaps: bool, optional If True, keep ALL CAPS WORDS uppercased. Defaults to False. normalize: int or bool, optional If not False, perform normalization of repeated charachers ("awesoooooome" -> "awesooome"). The value of parameter determines the number of occurences to keep. Defaults to 3. ignore_quotes: bool, optional If True, ignore tokens contained within double quotes. Defaults to False. ignore_reddit_quotes: bool, optional If True, remove quotes from the Reddit comments. Defaults to False. ignore_stopwords: str, list, or boolean, optional Whether to ignore stopwords - str: language to get a list of stopwords for from NLTK package - list: list of stopwords to remove - True: use built-in list of the english stop words - False: keep all tokens Defaults to False stem: {False, 'stem', 'lemm'}, optional Whether to perform word stemming - False: do not perform word stemming - 'stem': use PorterStemmer from NLTK package - 'lemm': use WordNetLemmatizer from NLTK package remove_punct: bool, optional If True, remove punctuation tokens. Defaults to True. remove_breaks: bool, optional If True, remove linebreak tokens. Defaults to True. decontract: bool, optional If True, attempt to expand certain contractions. Defaults to False. Example: "'ll" -> " will" numbers, subreddits, reddit_usernames, emails: False or str, optional Replacement of the different types of tokens - False: leaves these tokens intact - str: replacement token - '': removes all occurrences of these tokens twitter_handles: False, 'realname' or str, optional Processing of twitter handles - False: do nothing - str: replacement token - 'realname': replace with the real screen name of Twitter account - 'split': split handles using Viterbi algorithm Example: "#vladimirputinisthebest" -> "vladimir putin is the best" hashtags: False or str, optional Processing of hashtags - False: do nothing - str: replacement token - 'split': split hashtags according using Viterbi algorithm urls: False or str, optional Replacement of parsed URLs - False: leave URL intact - str: replacement token - dict: replace all URLs stored in keys with the corresponding values - '': removes all occurrences of these tokens - 'domain': extract domain ("" -> "cnn") - 'domain_unwrap_fast': extract domain after unwraping links for a list of URL shorteners (,,, - 'domain_unwrap': extract domain after unwraping all links - 'title': extract and tokenize title of each link after unwraping it Defaults to False. extra_patterns: None or list of tuples, optional Replacement of any user-supplied extra patterns. Tuples must have the following form: (name, re_pattern, replacement_token): - name (str): name of the pattern - re_pattern (_sre.SRE_Pattern): compiled re pattern - replacement_token (str): replacement token Defaults to None keep_untokenized: None or list, optional List of expressions to keep untokenized Example: ["New York", "Los Angeles", "San Francisco"] whitespaces_to_underscores: boolean, optional If True, replace all whitespace characters with underscores in the final tokens. Defaults to True. remove_nonunicode: boolean, optional If True, remove all non-unicode characters. Defaults to False. pos_emojis, neg_emojis, neutral_emojis: None, True, or list, optional Replace positive, negative, and neutral emojis with the special tokens - None: do not perform replacement - True: perform replacement of the default lists of emojis - list: list of emojis to replace print_url_warnings: bool, optional If True, print URL-related warnings. Defaults to False. latin_chars_fix: bool, optional Try applying this fix if you have a lot of \\xe2\\x80\\x99-like or U+1F601-like strings in your data. Defaults to False. ngrams: int, optional Add ngrams of tokens after tokenizing """ def __init__(self, lowercase=True, keepcaps=False, normalize=3, ignore_quotes=False, ignore_reddit_quotes=False, ignore_stopwords=False, stem=False, remove_punct=True, remove_breaks=True, decontract=False, twitter_handles=False, urls=False, hashtags=False, numbers=False, subreddits=False, reddit_usernames=False, emails=False, extra_patterns=None, keep_untokenized=None, whitespaces_to_underscores=True, remove_nonunicode=False, pos_emojis=None, neg_emojis=None, neutral_emojis=None, print_url_warnings=False, latin_chars_fix=False, ngrams=1): self.params = locals() self._nlp = English() self._merging_matcher = Matcher(self._nlp.vocab) self._matcher = Matcher(self._nlp.vocab) self._replacements = {} self._domains = {} self._realnames = {} self._stopwords = None alpha_digits_flag = self._nlp.vocab.add_flag(alpha_digits_check) hashtag_flag = self._nlp.vocab.add_flag(hashtag_check) twitter_handle_flag = self._nlp.vocab.add_flag(twitter_handle_check) self._merging_matcher.add( 'HASHTAG', None, [{'ORTH': '#'}, {'IS_ASCII': True}]) self._merging_matcher.add( 'SUBREDDIT', None, [{'ORTH': '/r'}, {'ORTH': '/'}, {alpha_digits_flag: True}], [{'ORTH': 'r'}, {'ORTH': '/'}, {alpha_digits_flag: True}]) self._merging_matcher.add('REDDIT_USERNAME', None, [{'ORTH': '/u'}, {'ORTH': '/'}, {alpha_digits_flag: True}], [{'ORTH': 'u'}, {'ORTH': '/'}, {alpha_digits_flag: True}]) if isinstance(ignore_stopwords, str) and ('nltk' in sys.modules): try: self._stopwords = stopwords.words(ignore_stopwords) except OSError: raise ValueError( 'Language {} was not found by NLTK'.format(ignore_stopwords)) elif ignore_stopwords is True: self._matcher.add('STOPWORDS', self._remove_token, [{'IS_STOP': True}]) elif isinstance(ignore_stopwords, list): self._stopwords = [word.lower() for word in ignore_stopwords] elif ignore_stopwords is not False: raise TypeError('Type {} is not supported by ignore_stopwords parameter or NLTK is not installed'.format( type(ignore_stopwords))) if lowercase and (not keepcaps): self._matcher.add('LOWERCASE', self._lowercase, [{'IS_LOWER': False}]) elif lowercase and keepcaps: self._matcher.add('LOWERCASE', self._lowercase, [ {'IS_LOWER': False, 'IS_UPPER': False}]) if remove_punct: self._matcher.add('PUNCTUATION', self._remove_token, [ {'IS_PUNCT': True}]) if remove_breaks: def break_check(text): return bool(BREAKS_RE.fullmatch(text)) break_flag = self._nlp.vocab.add_flag(break_check) self._matcher.add('BREAK', self._remove_token, [{break_flag: True}]) if normalize: def normalize_check(text): return bool( normalize_flag = self._nlp.vocab.add_flag(normalize_check) self._matcher.add('NORMALIZE', self._normalize, [{normalize_flag: True}]) if numbers is not False: self._matcher.add('NUMBER', self._replace_token, [{'LIKE_NUM': True}]) self._replacements['NUMBER'] = numbers if urls is not False: if urls in ['domain', 'domain_unwrap_fast', 'domain_unwrap', 'title']: self._urls = urls self._matcher.add('URL', self._process_url, [ {'LIKE_URL': True}]) elif isinstance(urls, dict): self._domains = urls self._urls = 'domain_unwrap_fast' self._matcher.add('URL', self._process_url, [ {'LIKE_URL': True}]) else: self._matcher.add('URL', self._replace_token, [{'LIKE_URL': True}]) self._replacements['URL'] = urls if emails is not False: self._matcher.add('EMAIL', self._replace_token, [{'LIKE_EMAIL': True}]) self._replacements['EMAIL'] = emails if reddit_usernames is not False: def reddit_username_check(text): return bool(REDDITORS_RE.fullmatch(text)) reddit_username_flag = self._nlp.vocab.add_flag( reddit_username_check) self._matcher.add('REDDIT_USERNAME', self._replace_token, [ {reddit_username_flag: True}]) self._replacements['REDDIT_USERNAME'] = reddit_usernames if subreddits is not False: def subreddit_check(text): return bool(SUBREDDITS_RE.fullmatch(text)) subreddit_flag = self._nlp.vocab.add_flag(subreddit_check) self._matcher.add('SUBREDDIT', self._replace_token, [{subreddit_flag: True}]) self._replacements['SUBREDDIT'] = subreddits if twitter_handles is not False: self._matcher.add('TWITTER_HANDLE', self._handles_postprocess, [{twitter_handle_flag: True}]) if hashtags is not False: self._matcher.add('HASHTAG', self._hashtag_postprocess, [ {hashtag_flag: True}]) if hashtags == 'split' or twitter_handles == 'split': file = os.path.join(DATA_PATH, 'wordsfreq_wiki2.txt') with open(file) as f: self._words = self._wordcost = dict((k, log((i + 1) * log(len(self._words)))) for i, k in enumerate(self._words)) self._maxword = max(len(x) for x in self._words) if twitter_handles == 'realname': with open(os.path.join(DATA_PATH, 'realnames.json')) as f: self._realnames = json.load(f) if ignore_quotes: self._merging_matcher.add('QUOTE', None, [{'ORTH': '"'}, { 'OP': '*', 'IS_ASCII': True}, {'ORTH': '"'}]) def doublequote_check(text): return bool(QUOTES_RE.fullmatch(text)) doublequote_flag = self._nlp.vocab.add_flag(doublequote_check) self._matcher.add('DOUBLE_QUOTES', self._remove_token, [ {doublequote_flag: True}]) if self._stopwords: def stopword_check(text): return bool(text.lower() in self._stopwords) stopword_flag = self._nlp.vocab.add_flag(stopword_check) self._matcher.add('STOPWORD', self._remove_token, [{stopword_flag: True}]) if keep_untokenized is not None: if not isinstance(keep_untokenized, list): raise ValueError( "keep_untokenized has to be either None or a list") for i, phrase in enumerate(keep_untokenized): phrase_tokens = phrase.split(' ') rule = [] for token in phrase_tokens: rule.append({'LOWER': token.lower()}) self._merging_matcher.add('RULE_' + str(i), None, rule) if pos_emojis: if not isinstance(pos_emojis, list): pos_emojis = POS_EMOJIS pos_patterns = [[{'ORTH': emoji}] for emoji in pos_emojis] self._matcher.add('HAPPY', self._replace_token, *pos_patterns) self._replacements['HAPPY'] = 'POS_EMOJI' if neg_emojis: if not isinstance(neg_emojis, list): neg_emojis = NEG_EMOJIS neg_patterns = [[{'ORTH': emoji}] for emoji in neg_emojis] self._matcher.add('SAD', self._replace_token, *neg_patterns) self._replacements['SAD'] = 'NEG_EMOJI' if neutral_emojis: if not isinstance(neutral_emojis, list): neutral_emojis = NEUTRAL_EMOJIS neutral_patterns = [[{'ORTH': emoji}] for emoji in neutral_emojis] self._matcher.add('NEUTRAL', self._replace_token, *neutral_patterns) self._replacements['NEUTRAL'] = 'NEUTRAL_EMOJI' if isinstance(extra_patterns, list): self._flags = {} for name, re_pattern, replacement_token in extra_patterns: def flag(text): return bool(re_pattern.match(text)) self._flags[name] = self._nlp.vocab.add_flag(flag) self._matcher.add(name, self._replace_token, [{self._flags[name]: True}]) self._replacements[name] = replacement_token if stem and ('nltk' in sys.modules): if stem == 'stem': self._stemmer = PorterStemmer() elif stem == 'lemm': self._stemmer = WordNetLemmatizer() else: raise ValueError( 'Stemming method {} is not supported'.format(stem)) self._matcher.add('WORD_TO_STEM', self._stem_word, [{'IS_ALPHA': True}]) retokenize_flag = self._nlp.vocab.add_flag(retokenize_check) self._matcher.add('RETOKENIZE', self._retokenize, [{retokenize_flag: True, 'IS_PUNCT': False, 'LIKE_URL': False, 'LIKE_EMAIL': False, 'LIKE_NUM': False, hashtag_flag: False, twitter_handle_flag: False}]) self._nlp.add_pipe(self._merge_doc, name='merge_doc', last=True) self._nlp.add_pipe(self._match_doc, name='match_doc', last=True) self._nlp.add_pipe(self._postproc_doc, name='postproc_doc', last=True) @staticmethod def _lowercase(__, doc, i, matches): # Lowercase tokens __, start, end = matches[i] span = doc[start:end] for tok in span: tok._.transformed_text = tok._.transformed_text.lower() def _stem_word(self, __, doc, i, matches): # Stem tokens __, start, end = matches[i] span = doc[start:end] for tok in span: if self.params['stem'] == 'stem': tok._.transformed_text = self._stemmer.stem( tok._.transformed_text) elif self.params['stem'] == 'lemm': tok._.transformed_text = self._stemmer.lemmatize( tok._.transformed_text) def _normalize(self, __, doc, i, matches): # Normalize repeating symbols __, start, end = matches[i] span = doc[start:end] for tok in span: tok._.transformed_text = NORMALIZE_RE.sub(r"\1" * self.params['normalize'], tok._.transformed_text) def _process_url(self, __, doc, i, matches): # Process found URLs __, start, end = matches[i] span = doc[start:end] for tok in span: found_urls = URLS_RE.findall(tok.text) if found_urls: if found_urls[0] in self._domains: tok._.transformed_text = self._domains[found_urls[0]] elif self._urls == 'domain': tok._.transformed_text = tldextract.extract( found_urls[0]).domain elif self._urls != 'title': if self._urls == 'domain_unwrap': domain = unshorten_url( found_urls[0], None, self.params['print_url_warnings']) else: domain = unshorten_url( found_urls[0], URL_SHORTENERS, self.params['print_url_warnings']) self._domains[found_urls[0]] = domain tok._.transformed_text = domain elif self._urls == 'title': domain = unshorten_url(found_urls[0], URL_SHORTENERS) if domain != 'twitter': title = get_url_title( found_urls[0], self.params['print_url_warnings']) title = self.tokenize(URLS_RE.sub('', title)) else: title = '' tok._.transformed_text = title self._domains[found_urls[0]] = title def _replace_token(self, __, doc, i, matches): # Replace tokens with something else match_id, start, end = matches[i] span = doc[start:end] replacement_token = self._replacements[doc.vocab.strings[match_id]] for tok in span: tok._.transformed_text = replacement_token @staticmethod def _remove_token(__, doc, i, matches): # Remove tokens __, start, end = matches[i] span = doc[start:end] for tok in span: tok._.transformed_text = '' def _retokenize(self, __, doc, i, matches): # Retokenize __, start, end = matches[i] span = doc[start:end] for tok in span: text = tok.text text = re.sub(r'([#@])', r' \1', text) text = re.sub(r'\s{2,}', ' ', text).strip() tok._.transformed_text = self.tokenize(text) def _infer_spaces(self, text): # Infer location of spaces in hashtags text = text.lower() text = re.sub(r'[^\w\s]', '', text) def best_match(i): # Find the best match for the first i characters # assuming costs has been built for the first (i-1) characters candidates = enumerate(reversed(cost[max(0, i - self._maxword):i])) return min((c + self._wordcost.get(text[i - k - 1:i], 9e999), k + 1) for k, c in candidates) cost = [0] for i in range(1, len(text) + 1): cur_cost, k = best_match(i) cost.append(cur_cost) out = [] i = len(text) while i > 0: cur_cost, k = best_match(i) assert cur_cost == cost[i] out.append(text[i - k:i]) i -= k return list(reversed(out)) def _handles_postprocess(self, __, doc, i, matches): # Process twitter handles __, start, end = matches[i] span = doc[start:end] for tok in span: if self.params['twitter_handles'] == 'realname': if tok.text in self._realnames: tok._.transformed_text = self._realnames[tok.text] else: handle = get_twitter_realname(tok.text) realname = self.tokenize(TWITTER_HANDLES_RE.sub('', handle)) tok._.transformed_text = realname self._realnames[tok.text] = realname elif self.params['twitter_handles'] == 'split': poss = self._infer_spaces(tok._.transformed_text[1:]) if poss: tok._.transformed_text = poss else: tok._.transformed_text = self.params['twitter_handles'] def _hashtag_postprocess(self, __, doc, i, matches): # Process hashtags __, start, end = matches[i] span = doc[start:end] for tok in span: if self.params['hashtags'] == 'split': poss = self._infer_spaces(tok._.transformed_text[1:]) if poss: tok._.transformed_text = poss else: tok._.transformed_text = self.params['hashtags'] @staticmethod def _decontract(text): # Expand contractions for contraction, decontraction in DECONTRACTIONS.items(): text = re.sub(contraction, decontraction, text) return text def _preprocess_text(self, text): # Do some preprocessing text = re.sub("’", "'", text) if self.params['remove_nonunicode']: try: text = text.encode('utf-8').decode('unicode-escape') text = ''.join( filter(lambda x: x in string.printable, text)).strip() except UnicodeDecodeError: warnings.warn( '(UnicodeDecodeError while trying to remove non-unicode characters') if self.params['decontract']: text = self._decontract(text) text = html.unescape(text) if self.params['latin_chars_fix']: if EMOJIS_UTF_RE.findall(text): text = EMOJIS_UTF_NOSPACE_RE.sub(r' \1', text) for utf_code, emoji in EMOJIS_UTF.items(): text = EMOJIS_UTF_PATS[utf_code].sub(emoji, text) if EMOJIS_UNICODE_RE.findall(text): text = EMOJIS_UNICODE_NOSPACE_RE.sub(r'\1 \2', text) for utf_code, emoji in EMOJIS_UNICODE.items(): text = EMOJIS_UNICODE_PATS[utf_code].sub(emoji, text) if LATIN_CHARS_RE.findall(text): for _hex, _char in LATIN_CHARS.items(): text = LATIN_CHARS_PATS[_hex].sub(_char, text) if self.params['ignore_reddit_quotes']: text = REDDIT_QUOTES_RE.sub(text, ' ') text = text.replace('.@', '. @') text = re.sub(r'([*;,!?\(\)\[\]])', r' \1', text) text = re.sub(r'\s{2,}', ' ', text) return text.strip() def _merge_doc(self, doc): # Perform merging for certain types of tokens matches = self._merging_matcher(doc) spans = [] for __, start, end in matches: spans.append(doc[start:end]) for span in spans: span.merge() for tok in doc: tok._.transformed_text = tok.text return doc def _match_doc(self, doc): # Perform all additional processing self._matcher(doc) return doc def _postproc_doc(self, doc): # Perform postprocessing doc._.tokens = [] for tok in doc: if isinstance(tok._.transformed_text, list): doc._.tokens.extend(tok._.transformed_text) elif tok._.transformed_text.strip() != '': if self.params['whitespaces_to_underscores']: tok._.transformed_text = "_".join( tok._.transformed_text.split()) doc._.tokens.append(tok._.transformed_text.strip()) return doc
[docs] def tokenize(self, text): """ Tokenize document Parameters ---------- text : str Document to tokenize Returns ------- list List of tokens Examples -------- >>> from redditscore.tokenizer import CrazyTokenizer >>> tokenizer = CrazyTokenizer(splithashtags=True, hashtags=False) >>> tokenizer.tokenize("#makeamericagreatagain") ["make", "america", "great", "again"] """ if not isinstance(text, str): warnings.warn('Document {} is not a string'.format(text)) return [] text = self._preprocess_text(text) doc = self._nlp(text) tokens = doc._.tokens if self.params['ngrams'] > 1: if self.params['whitespaces_to_underscores']: tokens = word_ngrams( tokens, (1, self.params['ngrams']), separator='_') else: tokens = word_ngrams( tokens, (1, self.params['ngrams'])) return tokens