pyspark - Add date field to RDD in Spark -


i have pretty simple rdd called stjoin on pass simple function day out of string representing date-time.

the code passes lazy evaluation, if run last line (stjoinday.take(5)), error.

def parsedate(x):     try:         dt=dateutil.parser.parse(x[1]).date()     except:         dt=dateutil.parser.parse("01 jan 1900 00:00:00").date()      x.append(dt)         return x  stjoinday=stjoin.map(lambda line: parsedate(line)) #stjoinday.take(5) 

what problem here?

long error traceback below:

15/04/27 22:14:02 error executor: exception in task 0.0 in stage 6.0 (tid 8) org.apache.spark.api.python.pythonexception: traceback (most recent call last):   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/python/pyspark/worker.py", line 79, in main     serializer.dump_stream(func(split_index, iterator), outfile)   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 196, in dump_stream     self.serializer.dump_stream(self._batched(iterator), stream)   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 127, in dump_stream     obj in iterator:   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 185, in _batched     item in iterator:   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/python/pyspark/rdd.py", line 1147, in takeuptonumleft     yield next(iterator)   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/test3.py", line 72, in parsedate     dt=dateutil.parser.parse("01 jan 1900 00:00:00").date() attributeerror: 'module' object has no attribute 'parser'      @ org.apache.spark.api.python.pythonrdd$$anon$1.read(pythonrdd.scala:124)     @ org.apache.spark.api.python.pythonrdd$$anon$1.<init>(pythonrdd.scala:154)     @ org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:87)     @ org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:262)     @ org.apache.spark.rdd.rdd.iterator(rdd.scala:229)     @ org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:62)     @ org.apache.spark.scheduler.task.run(task.scala:54)     @ org.apache.spark.executor.executor$taskrunner.run(executor.scala:178)     @ java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)     @ java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)     @ java.lang.thread.run(thread.java:745) 15/04/27 22:14:02 error tasksetmanager: task 0 in stage 6.0 failed 1 times; aborting job traceback (most recent call last):   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/test3.py", line 79, in <module>     stjoinday.take(5)   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/python/pyspark/rdd.py", line 1152, in take     res = self.context.runjob(self, takeuptonumleft, p, true)   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/python/pyspark/context.py", line 770, in runjob     = self._jvm.pythonrdd.runjob(self._jsc.sc(), mappedrdd._jrdd, javapartitions, allowlocal)   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value py4j.protocol.py4jjavaerror: error occurred while calling z:org.apache.spark.api.python.pythonrdd.runjob. : org.apache.spark.sparkexception: job aborted due stage failure: task 0 in stage 6.0 failed 1 times, recent failure: lost task 0.0 in stage 6.0 (tid 8, localhost): org.apache.spark.api.python.pythonexception: traceback (most recent call last):   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/python/pyspark/worker.py", line 79, in main     serializer.dump_stream(func(split_index, iterator), outfile)   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 196, in dump_stream     self.serializer.dump_stream(self._batched(iterator), stream)   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 127, in dump_stream     obj in iterator:   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/python/pyspark/serializers.py", line 185, in _batched     item in iterator:   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/python/pyspark/rdd.py", line 1147, in takeuptonumleft     yield next(iterator)   file "/home/terrapin/spark_hadoop/spark-1.1.1-bin-cdh4/test3.py", line 72, in parsedate     dt=dateutil.parser.parse("01 jan 1900 00:00:00").date() attributeerror: 'module' object has no attribute 'parser'          org.apache.spark.api.python.pythonrdd$$anon$1.read(pythonrdd.scala:124)         org.apache.spark.api.python.pythonrdd$$anon$1.<init>(pythonrdd.scala:154)         org.apache.spark.api.python.pythonrdd.compute(pythonrdd.scala:87)         org.apache.spark.rdd.rdd.computeorreadcheckpoint(rdd.scala:262)         org.apache.spark.rdd.rdd.iterator(rdd.scala:229)         org.apache.spark.scheduler.resulttask.runtask(resulttask.scala:62)         org.apache.spark.scheduler.task.run(task.scala:54)         org.apache.spark.executor.executor$taskrunner.run(executor.scala:178)         java.util.concurrent.threadpoolexecutor.runworker(threadpoolexecutor.java:1145)         java.util.concurrent.threadpoolexecutor$worker.run(threadpoolexecutor.java:615)         java.lang.thread.run(thread.java:745) driver stacktrace:     @ org.apache.spark.scheduler.dagscheduler.org$apache$spark$scheduler$dagscheduler$$failjobandindependentstages(dagscheduler.scala:1185)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1174)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$abortstage$1.apply(dagscheduler.scala:1173)     @ scala.collection.mutable.resizablearray$class.foreach(resizablearray.scala:59)     @ scala.collection.mutable.arraybuffer.foreach(arraybuffer.scala:47)     @ org.apache.spark.scheduler.dagscheduler.abortstage(dagscheduler.scala:1173)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:688)     @ org.apache.spark.scheduler.dagscheduler$$anonfun$handletasksetfailed$1.apply(dagscheduler.scala:688)     @ scala.option.foreach(option.scala:236)     @ org.apache.spark.scheduler.dagscheduler.handletasksetfailed(dagscheduler.scala:688)     @ org.apache.spark.scheduler.dagschedulereventprocessactor$$anonfun$receive$2.applyorelse(dagscheduler.scala:1391)     @ akka.actor.actorcell.receivemessage(actorcell.scala:498)     @ akka.actor.actorcell.invoke(actorcell.scala:456)     @ akka.dispatch.mailbox.processmailbox(mailbox.scala:237)     @ akka.dispatch.mailbox.run(mailbox.scala:219)     @ akka.dispatch.forkjoinexecutorconfigurator$akkaforkjointask.exec(abstractdispatcher.scala:386)     @ scala.concurrent.forkjoin.forkjointask.doexec(forkjointask.java:260)     @ scala.concurrent.forkjoin.forkjoinpool$workqueue.runtask(forkjoinpool.java:1339)     @ scala.concurrent.forkjoin.forkjoinpool.runworker(forkjoinpool.java:1979)     @ scala.concurrent.forkjoin.forkjoinworkerthread.run(forkjoinworkerthread.java:107) 

looks dateutil not standard python pkg. need distribute every worker node. can post happens when import dateutil after running python shell? may missing entry in pythonpath