Recommendation system approaches with pyspark
I am currently exploring building recommendation systems using PySpark. Spark is an in memory processing engine that allows for real time distributed computing.
Different techniques for building a recommendation system:
Similarity Approach
Setup Spark:
Different techniques for building a recommendation system:
Similarity Approach
- Cosine Similarty
- Calculate the cosine angle between two feature vectors (such as # of times songs were listened to between two users)
- Jaccard Similarity / Index
- Compute the intersection over the union: wiki
- Matrix Factorization
- Find two dense matrices (which represent latent features) that when multiplied together recreate the original sparse matrix
- K nearest neighbors
- find the k nearest vector points and use that to classify the new observation
Setup Spark:
import pyspark from pyspark.context import SparkContext from pyspark.ml.feature import HashingTF, IDF, Tokenizer from pyspark.sql.types import * from pyspark.sql import Row from sklearn.metrics.pairwise import cosine_similarity from pyspark.mllib.linalg import Vectors sc = SparkContext.getOrCreate()
Cosine Similarity:
""" Cosine Similarity: Group the songs by users and then run tf-idf then compute the cosine similarity between users """ #Split by tab delimiter and get the first two columns music_data = sc.textFile("incidences_piano.tsv").map(lambda line: line.split('\t')[0:2]).groupByKey() music_data = music_data.map(lambda line: Row(user = line[0], songs = list(line[1]))).toDF() #Perform TF-IDF hashingTF = HashingTF(inputCol="songs", outputCol="rawFeatures") tf = hashingTF.transform(music_data)
tf.cache() idf = IDF(inputCol="rawFeatures", outputCol="features").fit(tf) tfidf = idf.transform(tf) #Compute Cosine Similarity vector1 = tfidf.select("user", "features").take(5)[0][1] vector2 = tfidf.select("user", "features").take(5)[3][1] cosine_similarity(vector1,vector2)
Matrix Factorization
""" Matrix Factorization: Run matrix factorization to predict what songs a user will like """ import hashlib from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating def hash_string(s): return int(hashlib.sha256(s.encode('utf-8')).hexdigest(), 16) % 10**8 music_data = sc.textFile("incidences_piano.tsv").map(lambda line: line.split('\t')[0:2]).map(lambda line: Rating(int(line[0]),hash_string(line[1]),1)) # Load and parse the data # Matrix factorization ratings = sc.textFile("incidences_piano.tsv") .map(lambda line: line.split('\t')[0:2]) .map(lambda line: Rating(int(line[0]),hash_string(line[1]),1)) # Build the recommendation model using Alternating Least Squares rank = 10 numIterations = 10 model = ALS.train(ratings, rank, numIterations) # Evaluate the model on training data testdata = ratings.map(lambda p: (p[0], p[1])) predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2])) ratesAndPreds = ratings.map(lambda r: ((r[0], r[1]), r[2])).join(predictions) MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean() print("Mean Squared Error = " + str(MSE)) # Save and load model model.save(sc, "target/tmp/myCollaborativeFilter") sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")
Comments
Post a Comment