#!/usr/bin/python3 """The point of this script is to implement a simple multivariate multinomial naive bayes and use it to classify documents from the reuters21578 dataset. In a multivariate multinomial naive bayes we represent a document with M random variables, where M is the number of position in the document. Each random variable represent a single word in the document, and can therefore assume N different values (the size of the vocabulary). Requires the env var PYTHONPATH to be set to the root directory containin the code for WMR. """ import os import math import random # -- mylibs import myutil import vector_space_model.vsm as vsm from vector_space_model.vsm import Document CLASSIFIER_FILENAME = "./bayes_multinomial_reuters21578.pickle" class MultinomialBayesClassifier(object): def __init__(self): self.prior = None self.cond = None self.dictionary = None def train(self, train_docs): prior = {} cond = {} tc = {} # -- tc[c] := number of total terms in class c dictionary = set() for i, doc in enumerate(train_docs): print(f"Processing doc: {i}") token_seen = set() # -- count docs in each class for c in doc.topics: prior[c] = prior[c] + 1 if c in prior else 1 tc[c] = 0 for token in doc.get_tokens(): # -- update dictionary if token not in dictionary: dictionary.add(token) # -- update term in class count for c in doc.topics: cond[(c, token)] = cond[(c, token)] + 1 if (c, token) in cond else 1 tc[c] += 1 # -- estimate probabilities tot_docs = len(train_docs) doc_len = len(dictionary) # ---- cond for c in prior: for t in dictionary: # -- add basic smoothing if (c, t) in cond: cond[(c, t)] = (cond[(c,t)] + 1)/(tc[c] + doc_len) else: cond[(c, t)] = 1/(tc[c] + doc_len) # -- save parameters self.prior = prior self.cond = cond self.dictionary = dictionary def classify(self, doc): # -- contains post probabilities post = {} map_class = None # -- compute post probabilities for c in self.prior: post[c] = self.prior[c] for t in doc.get_tokens(): post[c] += math.log(self.cond[(c, t)], 2) # -- compute MAP class # -- # -- C^* = argmax_{c \in C} { P(c) + \sum_{i = 1}^n \log{P(x_i | c)} } classes = list(self.prior.keys()) map_class = classes[0] for c in classes[1:]: if post[c] > post[map_class]: map_class = c return map_class def test(self, docs): """ Test the model against a set of documents. """ accuracy = 0 for doc in docs: res = self.classify(doc) if res in doc.topics: print(f"HIT :) res={res}, topics={doc.topics}") accuracy += 1 else: print(f"NO HIT :( res={res}, topics={doc.topics}") return accuracy/len(docs) # ---------------------------------- if __name__ == "__main__": docs = vsm.load_dataset()[:5000] train_docs, test_docs = myutil.split_dataset(docs, 0.7) if not os.path.exists(CLASSIFIER_FILENAME): # -- train and save b = MultinomialBayesClassifier() b.train(train_docs) myutil.store_pickle(b, CLASSIFIER_FILENAME) # -- use it b = myutil.load_pickle(CLASSIFIER_FILENAME) b.test(test_docs)