Recommendation system approaches with pyspark

I am currently exploring building recommendation systems using PySpark.  Spark is an in memory processing engine that allows for real time distributed computing.

Different techniques for building a recommendation system:

Similarity Approach
  • Cosine Similarty
    • Calculate the cosine angle between two feature vectors (such as # of times songs were listened to between two users)
  • Jaccard Similarity / Index
    • Compute the intersection over the union: wiki
Collaborative Filtering Approach
  • Matrix Factorization
    • Find two dense matrices (which represent latent features) that when multiplied together recreate the original sparse matrix
  • K nearest neighbors
    • find the k nearest vector points and use that to classify the new observation

Setup Spark:

import pyspark  
from pyspark.context import SparkContext
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.sql.types import *
from pyspark.sql import Row
from sklearn.metrics.pairwise import cosine_similarity
from pyspark.mllib.linalg import Vectors

sc = SparkContext.getOrCreate()


Cosine Similarity:

"""
Cosine Similarity:
Group the songs by users and then run tf-idf then compute the cosine similarity between users
"""

#Split by tab delimiter and get the first two columns
music_data = sc.textFile("incidences_piano.tsv").map(lambda line: line.split('\t')[0:2]).groupByKey()
music_data = music_data.map(lambda line: Row(user = line[0], songs = list(line[1]))).toDF()

#Perform TF-IDF
hashingTF = HashingTF(inputCol="songs", outputCol="rawFeatures")
tf = hashingTF.transform(music_data)
tf.cache()
idf = IDF(inputCol="rawFeatures", outputCol="features").fit(tf)
tfidf = idf.transform(tf)

#Compute Cosine Similarity
vector1 = tfidf.select("user", "features").take(5)[0][1]
vector2 = tfidf.select("user", "features").take(5)[3][1]
cosine_similarity(vector1,vector2)

Matrix Factorization

"""
Matrix Factorization:

Run matrix factorization to predict what songs a user will like
"""

import hashlib
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

def hash_string(s):
    return int(hashlib.sha256(s.encode('utf-8')).hexdigest(), 16) % 10**8

music_data = sc.textFile("incidences_piano.tsv").map(lambda line: line.split('\t')[0:2]).map(lambda line: Rating(int(line[0]),hash_string(line[1]),1))

# Load and parse the data
# Matrix factorization
ratings = sc.textFile("incidences_piano.tsv")    .map(lambda line: line.split('\t')[0:2])    .map(lambda line: Rating(int(line[0]),hash_string(line[1]),1))

# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(ratings, rank, numIterations)

# Evaluate the model on training data
testdata = ratings.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")

Comments

Popular posts from this blog

grandmaster level chess AI using python - Part 2 (the code)

building a chess ai - part 4: learning an evaluation function using deep learning (keras)

Brief intro to recurrent neural networks