#!/usr/bin/python3 """The point of this script is to implement a simple multivariate binomial naive bayes and use it to classify documents from the reuters21578 dataset. In a multivariate binomial naive bayes we represent a document with N random variables, where N is the size of the vocabulary. Each random variable can only assume two possible values: 1 if the word is in the document, and 0 otherwise. Requires the env var PYTHONPATH to be set to the root directory containin the code for WMR. """ import os import math # -- my libs import myutil import vector_space_model.vsm as vsm from vector_space_model.vsm import Document CLASSIFIER_FILENAME = "./bayes_binomial_reuters21578.pickle" class BinomialBayesClassifier(object): def __init__(self): self.prior = None self.cond = None self.dictionary = None def train(self, train_docs): prior = {} cond = {} dictionary = set() for i, doc in enumerate(train_docs): print(f"Processing doc: {i}") token_seen = set() # -- count docs in each class for c in doc.topics: prior[c] = prior[c] + 1 if c in prior else 1 for token in doc.get_tokens(): # -- update dictionary if token not in dictionary: dictionary.add(token) if token not in token_seen: token_seen.add(token) # -- update term in class count for c in doc.topics: cond[(c, token)] = cond[(c, token)] + 1 if (c, token) in cond else 1 # -- estimate probabilities tot_docs = len(train_docs) # ---- cond for c in prior: for t in dictionary: # -- add basic smoothing if (c, t) in cond: cond[(c, t)] = (cond[(c,t)] + 1)/(prior[c] + 2) else: cond[(c, t)] = 1/(prior[c] + 2) # ---- prior for c in prior: prior[c] = prior[c] / tot_docs # -- save parameters self.prior = prior self.cond = cond self.dictionary = dictionary def classify(self, doc): # -- contains post probabilities post = {} map_class = None # -- compute post probabilities for c in self.prior: post[c] = self.prior[c] tokens = set(doc.get_tokens()) for t in self.dictionary: if t in tokens: post[c] += math.log(self.cond[(c, t)], 2) else: post[c] += math.log(1 - self.cond[(c, t)], 2) # -- compute MAP class # -- # -- C^* = argmax_{c \in C} { P(c) + \sum_{i = 1}^n \log{P(x_i | c)} } classes = list(self.prior.keys()) map_class = classes[0] for c in classes[1:]: if post[c] > post[map_class]: map_class = c return map_class def test(self, docs): """ Test the model against a set of documents. """ accuracy = 0 for doc in docs: res = self.classify(doc) if res in doc.topics: print(f"HIT :) res={res}, topics={doc.topics}") accuracy += 1 else: print(f"NO HIT :( res={res}, topics={doc.topics}") return accuracy/len(docs) def summary(self): print(self.prior) print(self.cond) # ----------------- def most_common_category(docs): """compues the most common category of the given dataset. This can be used as a baseline measure to compare the results of the Bayes classifier.""" # -- count for each class how many docs are in that class c_freqs = {} for doc in docs: for topic in doc.topics: c_freqs[topic] = c_freqs[topic] + 1 if topic in c_freqs else 1 print(c_freqs) classes = list(c_freqs.keys()) max_c = classes[0] for c in classes[1:]: if c_freqs[c] > c_freqs[max_c]: print(max_c) max_c = c return max_c, c_freqs[max_c], len(classes) # ---------------------------------- if __name__ == "__main__": docs = vsm.load_dataset()[:5000] train_docs, test_docs = myutil.split_dataset(docs, 0.7) if not os.path.exists(CLASSIFIER_FILENAME): # -- train and save b = BinomialBayesClassifier() b.train(train_docs) myutil.store_pickle(b, CLASSIFIER_FILENAME) # -- use it b = myutil.load_pickle(CLASSIFIER_FILENAME) b.test(test_docs)