this question directed towards persons familiar py4j - , can resolve pickling error. trying add method pyspark pythonmllibapi accepts rdd of namedtuple, work, , returns result in form of rdd.
this method modeled after pythonmllibapi.trainalsmodel() method, analogous existing relevant portions are:
def trainalsmodel( ratingsjrdd: javardd[rating], .. )
the existing python rating class used model new code is:
class rating(namedtuple("rating", ["user", "product", "rating"])): def __reduce__(self): return rating, (int(self.user), int(self.product), float(self.rating))
here attempt here relevant classes:
new python class pyspark.mllib.clustering.matrixentry:
from collections import namedtuple class matrixentry(namedtuple("matrixentry", ["x","y","weight"])): def __reduce__(self): return matrixentry, (long(self.x), long(self.y), float(self.weight))
new method foobarrdd in pythonmllibapi:
def foobarrdd( data: javardd[matrixentry]): rdd[foobarresult] = { val rdd = data.rdd.map { d => foobarresult(d.i, d.j, d.value, d.i * 100 + d.j * 10 + d.value)} rdd }
now let try out:
from pyspark.mllib.clustering import matrixentry def convert_to_matrixentry(tuple): return matrixentry(*tuple) pyspark.mllib.clustering import * pic = poweriterationclusteringmodel(2) tups = [(1,2,3),(4,5,6),(12,13,14),(15,7,8),(16,17,16.5)] trdd = sc.parallelize(map(convert_to_matrixentry,tups)) # print out rdd on python side validation print "%s" %(repr(trdd.collect())) pyspark.mllib.common import callmllibfunc pic = callmllibfunc("foobar", trdd)
relevant portions of results:
[(1,2)=3.0, (4,5)=6.0, (12,13)=14.0, (15,7)=8.0, (16,17)=16.5]
which shows input rdd 'whole'. pickling unhappy:
5/04/27 21:15:44 error executor: exception in task 6.0 in stage 1.0 (tid 14) net.razorvine.pickle.pickleexception: expected 0 arguments construction of classdict (for pyspark.mllib.clustering.matrixentry) @ net.razorvine.pickle.objects.classdictconstructor.construct(classdictconstructor.java:23) @ net.razorvine.pickle.unpickler.load_reduce(unpickler.java:617) @ net.razorvine.pickle.unpickler.dispatch(unpickler.java:170) @ net.razorvine.pickle.unpickler.load(unpickler.java:84) @ net.razorvine.pickle.unpickler.loads(unpickler.java:97) @ org.apache.spark.mllib.api.python.serde$$anonfun$pythontojava$1$$anonfun$apply$1.apply(pythonmllibapi.scala:1167) @ org.apache.spark.mllib.api.python.serde$$anonfun$pythontojava$1$$anonfun$apply$1.apply(pythonmllibapi.scala:1166) @ scala.collection.iterator$$anon$13.hasnext(iterator.scala:371) @ scala.collection.iterator$$anon$11.hasnext(iterator.scala:327) @ scala.collection.iterator$class.foreach(iterator.scala:727) @ scala.collection.abstractiterator.foreach(iterator.scala:1157) @ scala.collection.generic.growable$class.$plus$plus$eq(growable.scala:48) @ scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer.scala:103) @ scala.collection.mutable.arraybuffer.$plus$plus$eq(arraybuffer.scala:47) @ scala.collection.traversableonce$class.to(traversableonce.scala:273) @ scala.collection.abstractiterator.to(iterator.scala:1157) @ scala.collection.traversableonce$class.tobuffer(traversableonce.scala:265) @ scala.collection.abstractiterator.tobuffer(iterator.scala:1157) @ scala.collection.traversableonce$class.toarray(traversableonce.scala:252) @ scala.collection.abstractiterator.toarray(iterator.scala:1157) @ org.apache.spark.rdd.rdd$$anonfun$17.apply(rdd.scala:819) @ org.apache.spark.rdd.rdd$$anonfun$17.apply(rdd.scala:819) @ org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext.scala:1523) @ org.apache.spark.sparkcontext$$anonfun$runjob$5.apply(sparkcontext.scala:1523) @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:61) @ org.apache.spark.scheduler.task.run(task.scala:64) @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:212) @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145) @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615) @ java.lang.thread.run(thread.java:724)
below visual of python invocation stack trace:
i had same error using mllib, , turned out had returned wrong datatype in 1 of functions. works after simple cast on returned value. might not answer you're seeking @ least hint direction follow.