2014-09-12 3 views
3

여기 내 코드의간단한 매핑 파티션 작업이

$ IPYTHON=1 pyspark --master local[2] 

그리고 :

1,2,3,4,5,1 
6,4,8,3,2 
9,9,9,9,9,9 
100000000,1 
10,10,10 
1,2,4,2 

그것을 실행하려면, 내가 실행 그럼 내가 코드 %cpaste (더 좋은 방법은 무엇입니까?).

단순히 take()을 시도하고 값을 볼 경우에, 나는 현명한 출력을 얻을 :

In[2]: filteredLinesData.take(6) 
Out[2]: 
[u'1,2,3,4,5,1', 
u'6,4,8,3,2', 
u'9,9,9,9,9,9', 
u'100000000,1', 
u'10,10,10', 
u'1,2,4,2'] 

을하지만 실제로 설치 mapPartitions() 작업을 실행하려고 할 때, 그것은 실패

In [3]: executed = answers.collect() 
14/09/12 11:18:22 INFO SparkContext: Starting job: collect at <ipython-input-3-6461aec48699>:1 
14/09/12 11:18:22 INFO DAGScheduler: Got job 2 (collect at <ipython-input-3-6461aec48699>:1) with 2 output partitions (allowLocal=false) 
14/09/12 11:18:22 INFO DAGScheduler: Final stage: Stage 2(collect at <ipython-input-3-6461aec48699>:1) 
14/09/12 11:18:22 INFO DAGScheduler: Parents of final stage: List() 
14/09/12 11:18:22 INFO DAGScheduler: Missing parents: List() 
14/09/12 11:18:22 INFO DAGScheduler: Submitting Stage 2 (PythonRDD[3] at RDD at PythonRDD.scala:37), which has no missing parents 
14/09/12 11:18:22 INFO DAGScheduler: Submitting 2 missing tasks from Stage 2 (PythonRDD[3] at RDD at PythonRDD.scala:37) 
14/09/12 11:18:22 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks 
14/09/12 11:18:22 INFO TaskSetManager: Starting task 2.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL) 
14/09/12 11:18:22 INFO TaskSetManager: Serialized task 2.0:0 as 3112 bytes in 1 ms 
14/09/12 11:18:22 INFO TaskSetManager: Starting task 2.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL) 
14/09/12 11:18:22 INFO TaskSetManager: Serialized task 2.0:1 as 3112 bytes in 0 ms 
14/09/12 11:18:22 INFO Executor: Running task ID 0 
14/09/12 11:18:22 INFO Executor: Running task ID 1 
14/09/12 11:18:22 INFO BlockManager: Found block broadcast_0 locally 
14/09/12 11:18:22 INFO BlockManager: Found block broadcast_0 locally 
14/09/12 11:18:22 INFO CacheManager: Partition rdd_1_1 not found, computing it 
14/09/12 11:18:22 INFO CacheManager: Partition rdd_1_0 not found, computing it 
14/09/12 11:18:22 INFO HadoopRDD: Input split: file:/Users/will/Code/spark/sumlines/ids.txt:31+32 
14/09/12 11:18:22 INFO HadoopRDD: Input split: file:/Users/will/Code/spark/sumlines/ids.txt:0+31 
14/09/12 11:18:22 INFO MemoryStore: ensureFreeSpace(288) called with curMem=133256, maxMem=308910489 
14/09/12 11:18:22 INFO MemoryStore: Block rdd_1_1 stored as values to memory (estimated size 288.0 B, free 294.5 MB) 
14/09/12 11:18:22 INFO MemoryStore: ensureFreeSpace(304) called with curMem=133544, maxMem=308910489 
14/09/12 11:18:22 INFO MemoryStore: Block rdd_1_0 stored as values to memory (estimated size 304.0 B, free 294.5 MB) 
14/09/12 11:18:22 INFO BlockManagerInfo: Added rdd_1_0 in memory on 18.111.61.9:58306 (size: 304.0 B, free: 294.6 MB) 
14/09/12 11:18:22 INFO BlockManagerInfo: Added rdd_1_1 in memory on 18.111.61.9:58306 (size: 288.0 B, free: 294.6 MB) 
14/09/12 11:18:22 INFO BlockManagerMaster: Updated info of block rdd_1_0 
14/09/12 11:18:22 INFO BlockManagerMaster: Updated info of block rdd_1_1 
0 => 1,2,3,4,5,1 
1 => 6,4,8,3,2 
2 => 9,9,9,9,9,9 
0 => 100000000,1 
1 => PySpark worker failed with exception:10,10,10 

2 => 1,2,4,2 
PySpark worker failed with exception: 
Traceback (most recent call last): 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream 
    self.serializer.dump_stream(self._batched(iterator), stream) 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream 
    for obj in iterator: 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched 
    for item in iterator: 
TypeError: 'NoneType' object is not iterable 

Traceback (most recent call last): 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream 
    self.serializer.dump_stream(self._batched(iterator), stream) 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream 
    for obj in iterator: 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched 
    for item in iterator: 
TypeError: 'NoneType' object is not iterable 

14/09/12 11:18:22 ERROR Executor: Exception in task ID 1 
org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream 
    self.serializer.dump_stream(self._batched(iterator), stream) 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream 
    for obj in iterator: 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched 
    for item in iterator: 
TypeError: 'NoneType' object is not iterable 

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) 
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) 
    at org.apache.spark.scheduler.Task.run(Task.scala:51) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
14/09/12 11:18:22 ERROR Executor: Exception in task ID 0 
org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream 
    self.serializer.dump_stream(self._batched(iterator), stream) 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream 
    for obj in iterator: 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched 
    for item in iterator: 
TypeError: 'NoneType' object is not iterable 

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) 
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) 
    at org.apache.spark.scheduler.Task.run(Task.scala:51) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
14/09/12 11:18:22 WARN TaskSetManager: Lost TID 0 (task 2.0:0) 
14/09/12 11:18:22 WARN TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException 
org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream 
    self.serializer.dump_stream(self._batched(iterator), stream) 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream 
    for obj in iterator: 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched 
    for item in iterator: 
TypeError: 'NoneType' object is not iterable 

    at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) 
    at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145) 
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) 
    at org.apache.spark.scheduler.Task.run(Task.scala:51) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
14/09/12 11:18:22 ERROR TaskSetManager: Task 2.0:0 failed 1 times; aborting job 
14/09/12 11:18:22 INFO TaskSetManager: Loss was due to org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream 
    self.serializer.dump_stream(self._batched(iterator), stream) 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream 
    for obj in iterator: 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched 
    for item in iterator: 
TypeError: 'NoneType' object is not iterable 
[duplicate 1] 
14/09/12 11:18:22 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 
14/09/12 11:18:22 INFO DAGScheduler: Failed to run collect at <ipython-input-3-6461aec48699>:1 
14/09/12 11:18:22 INFO TaskSchedulerImpl: Cancelling stage 2 
--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-3-6461aec48699> in <module>() 
----> 1 executed = answers.collect() 

/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/rdd.pyc in collect(self) 
    581   """ 
    582   with _JavaStackTrace(self.context) as st: 
--> 583   bytesInJava = self._jrdd.collect().iterator() 
    584   return list(self._collect_iterator_through_file(bytesInJava)) 
    585 

/usr/bin/spark-1.0.0-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    535   answer = self.gateway_client.send_command(command) 
    536   return_value = get_return_value(answer, self.gateway_client, 
--> 537     self.target_id, self.name) 
    538 
    539   for temp_arg in temp_args: 

/usr/bin/spark-1.0.0-bin-hadoop2/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    298     raise Py4JJavaError(
    299      'An error occurred while calling {0}{1}{2}.\n'. 
--> 300      format(target_id, '.', name), value) 
    301    else: 
    302     raise Py4JError(

Py4JJavaError: An error occurred while calling o38.collect. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2.0:0 failed 1 times, most recent failure: Exception failure in TID 0 on host localhost: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/worker.py", line 77, in main 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 191, in dump_stream 
    self.serializer.dump_stream(self._batched(iterator), stream) 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 123, in dump_stream 
    for obj in iterator: 
    File "/usr/bin/spark-1.0.0-bin-hadoop2/python/pyspark/serializers.py", line 180, in _batched 
    for item in iterator: 
TypeError: 'NoneType' object is not iterable 

     org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:115) 
     org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:145) 
     org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:78) 
     org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) 
     org.apache.spark.rdd.RDD.iterator(RDD.scala:229) 
     org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111) 
     org.apache.spark.scheduler.Task.run(Task.scala:51) 
     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) 
     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     java.lang.Thread.run(Thread.java:745) 
Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

그것은 분명히 입력에 대한 올바른 일을하고 있지만, 결국 어떤 이유로 Spark은 오류를 일으키는 어떤 기능을 먹이려고합니다.

내가 잘못하고있는 아이디어가 있습니까? 나는 완전히 새롭게 불었습니다.

답변

3

몇 가지 문제 :

첫째, ipython 내에서 스크립트를 실행하려면 use execfile() or %run을 할 수 있습니다.

둘째로, mapPartitions doesn't take a number of partitions parameter; 아마 그것은 어느 시점에서 했습니까? 병렬화를 사용하여 파티션 수를 명시 적으로 설정할 수 있습니다. 당신이 그런 식으로 그것을 실행하는 경우

, 당신은 당신이 예상대로 출력을 얻을하지만 것이다 형태의 오류 :

mapPartition가 변화하기 때문에입니다
TypeError: 'NoneType' object is not iterable 

; 그것은 RDD의 파티션을 취하고 RDD의 새로운 파티션을 반환하는 함수를 기대합니다. 부작용으로 뭔가를 출력하고 있지만 새 RDD 파티션을 반환하지는 않습니다. 즉, 변형이 아닌 action을 찾고 있습니다. foreach는 각 요소에 대해 작동합니다. foreachPartition 파티션으로 파티션을 작동하지만 아무도 반환 발전기를 기대 :

# map function 
def echo(lines): 
    if lines: 
     for i, line in enumerate(lines): 
      print i, "=>", line 
    yield None 

# load the data 
idsFile = "ids.txt" # Should be some file on your system 
linesData = sc.textFile(idsFile).cache() 

# clean it 
cleanLinesData = linesData.map(lambda line: line.strip()) 
filteredLinesData = cleanLinesData.filter(lambda line: True if line else False) 

# setup task 
answers = filteredLinesData.foreachPartition(echo) 
+1

'mapPartitions' 링크가 Spark 설명서의 _ 이전 _ 버전으로 이동하는 것처럼 보입니다. http://spark.apache.org/docs/latest/api/python/pyspark.rdd.RDD-class.html#mapPartitions –

+1

에서 최신 문서를 찾을 수 있습니다. 나는 링크를 우습게 만들 것이다. –

+0

실제로'In [1] : % run SimpleApp.py'는 쉘을 시작하기 위해'IPYTHON = 1 pyspark --master local [2]'를 실행했기 때문에 동작하지 않습니다. sc' 객체 :'ValueError : 한 번에 여러 개의 SparkContext를 실행할 수 없습니다. 기존의 SparkContext'. Spark 앱을 실행하는 올바른 방법은 무엇입니까? – lollercoaster

4

여기서 문제가 mapPartitions는 목록 또는 발전기와 같은 반복 가능한 객체를 반환하는 함수를 받아들이는 것이다. echo 함수는 암시 적으로 None을 반환하므로 PySpark가 object NoneType is not iterable에 대해 불평하는 이유입니다.

Jonathan이 제안한 것처럼 foreachPartition과 함께이 기능을 사용할 수 있습니다. 나는 이것이 local 모드에서 PySpark를 실행할 때 원하는 출력을 인쇄 할 것이라고 믿지만, 클러스터에 배포 할 때 원하는대로되지 않을 수도 있습니다 : print 문장의 출력이 작업자의 로그에 인쇄되고 드라이버에는 표시되지 않습니다 .

대신 echo 기능을 yield (i, "=>", line)으로 수정하십시오. 이제 함수의 반환 유형은 생성기 여야합니다.

+0

알겠습니다. 그러나 파티션 수를 어떻게 설정합니까? 나는 Spark가 64MB 크기의 덩어리로 기본 설정했다고 생각하지만 직접하고 싶습니다. [병렬화] (http://spark.apache.org/docs/latest/programming-guide.html#parallelized-collections)합니까? 파이썬리스트 (또는 스칼라 배열)에서만 작동하며 RDD에서는 작동하지 않는 것 같습니다. – lollercoaster

+2

파티션 수는 몇 가지 방법으로 설정됩니다. 드라이버에서 파이썬 객체로 RDD를 만들면 명시 적으로 파티션 수를 지정할 수 있습니다. 예를 들어'sc.textFile '을 사용하여 파일을로드하는 경우 파티션의 수는 입력 데이터의 양 (예 : 64MB 블록의 HDFS 청킹)에서 결정되어야합니다. map(), filter(), mapPartitions() 등과 같은 RDD 변환은 상위 RDD에서 파티션 수를 유지합니다. reduceByKey(), groupByKey(), repartition(), coalesce() 등의 작업 만 파티션 수를 변경합니다. –

관련 문제