2016-08-17 3 views
0

다른 RDD 시리즈가 들어있는 JavaRDD를 만들려고합니다.다른 RDD의 Java Spark RDD?

RDDMachine.foreach (machine -> startDetectionNow()) 내부에서 컴퓨터가 ES에 대한 쿼리를 시작하고 다른 RDD를 얻습니다. 나는 이것을 모두 (1200hits) 모으고 Lists로 숨긴다. 기계가이 목록으로 작업을 시작한 후

첫째로 : 가능하지 않습니까? 그렇지 않다면 어떤 방법으로 다른 것을하려고 할 수 있습니까?

는 제가 수행하려고 무엇을 보여주지 :

 SparkConf conf = new SparkConf().setAppName("Algo").setMaster("local"); 
    conf.set("es.index.auto.create", "true"); 
    conf.set("es.nodes", "IP_ES"); 
    conf.set("es.port", "9200"); 
    sparkContext = new JavaSparkContext(conf); 

    MyAlgoConfig config_algo = new MyAlgoConfig(Detection.byPrevisionMerge); 

    Machine m1 = new Machine("AL-27", "IP1", config_algo); 
    Machine m2 = new Machine("AL-20", "IP2", config_algo); 
    Machine m3 = new Machine("AL-24", "IP3", config_algo); 
    Machine m4 = new Machine("AL-21", "IP4", config_algo); 

    ArrayList<Machine> Machines = new ArrayList(); 
    Machines.add(m1); 
    Machines.add(m2); 
    Machines.add(m3); 
    Machines.add(m4); 

    JavaRDD<Machine> machineRDD = sparkContext.parallelize(Machines); 

    machineRDD.foreach(machine -> machine.startDetectNow()); 

나는 Elasticsearch에있는 데이터로부터 배워야 각 시스템 내 알고리즘을 시작하려고합니다.


public boolean startDetectNow() 


    // MEGA Requete ELK 
    JavaRDD dataForLearn = Elastic.loadElasticsearch(
      Algo.sparkContext 
      , "logstash-*/Collector" 
      , Elastic.req_AvgOfCall(
        getIP() 
        , "hour" 
        , "2016-04-16T00:00:00" 
        , "2016-06-10T00:00:00")); 

    JavaRDD<Hit> RDD_hits = Elastic.mapToHit(dataForLearn); 
    List<Hit> hits = Elastic.RddToListHits(RDD_hits); 

그래서 나는 모든 "기계"에 쿼리의 모든 데이터를 얻을하려고합니다. 제 질문은 : 스파크로 할 수 있습니까? 아니면 다른 방법일까요? 스파크에서 시작할 때; 코드가 두 번째 RDD 주위에있을 때 잠금과 같은 솔기가 있습니다.

그리고 오류 메시지는 다음과 같습니다

16/08/17 0시 17분 13초 정보의 SparkContext : 시작 작업 : 16/08/17 0시 17분 13초 Elastic.java:94 에서 수집 INFO DAGScheduler : 1 개의 출력 파티션을 가진 작업 1 (Elastic.java:94에서 수집)을 얻었습니다 16/08/17 00:17:13 INFO DAGScheduler : 최종 단계 : ResultStage 1 (Elastic.java:94에서 수집) 16/08/17 00:17:13 정보 DAGScheduler : 최종 단계의 부모 : 목록() 16/08/17 00:17:13 정보 DAGScheduler : 누락 된 부모 : 목록() 16/08/17 00:17:13 INFO DAGScheduler : ResultStage 1 제출 (Elastic.java:106의 MapPartitionsRDD [4]), 부모가없는 경우 16/08/17 00:17:13 정보 MemoryStore : 브로드 캐스트 1을 메모리에 값으로 저장 차단 (예상 크기 4.3KB, 무료 7.7KB) 16/08/17 00:17:13 정보 MemoryStore : BlockServerInfo : localhost : 46356의 메모리에 broadcast_1_piece0을 추가했습니다 (크기 : 2.5KB, 무료 : 511.1MB). block_1_piece0을 메모리에 바이트로 저장합니다 (예상 크기 2.5KB, 무료 10.2KB). 16/08/17 00:17:13 16/08/17 00:17:13 INFO SparkContext : DAGScheduler.scala에서 브로드 캐스트 한 방송 1 작성 : 1006 16/08/17 00:17:13 INFO DAGScheduler : ResultStage 1에서 누락 된 작업 1 개 제출 (MapPartitionsRDD [4 ] Map at Elastic.java:106) 16/08/17 00:17:13 INFO TaskSchedulerImpl : 1 개의 작업으로 작업 세트 1.0 추가 ^ C16/08/17 00:17:22 INFO SparkContext : stop() 호출 종료 호에서 확인 16/08/17 00:17:22 정보 SparkUI : 중지 된 Spark 웹 UI http://192.168.10.23:4040 16/08/17 00:17:22 정보 DAGScheduler : ResultStage 0 (GuardConnect.java:60의 foreach)이 10,292 초 동안 실패했습니다. 16/08/17 00:17:22 정보 DAGScheduler : 작업 0 실패 : foreach가 GuardConnect.java:60에서 10,470974 초 걸렸습니다 스레드 "main"에서 예외가 발생했습니다. org.apache.spark.SparkException : 작업 0이 취소되었습니다. SparkContext가 종료되었습니다. org.apache.spark.scheduler.DAGScheduler $$ anonfun $ cleanUpAfterSchedulerStop $ 1.apply (DAGScheduler.scala : 806) at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ cleanUpAfterSchedulerStop $ 1.apply DAGScheduler.scala : 804) at scala.collection.mutable.HashSet.foreach (HashSet.scala : 79) at org.apache.spark.scheduler.DAGScheduler.cl eanUpAfterSchedulerStop (DAGScheduler.scala : 804) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.이동 중지 (DAGScheduler.scala : 1658) 무료 org.apache.spark.util.EventLoop.stop (EventLoop.scala : 84)에서 : 에서 org.apache.spark.scheduler.DAGScheduler.stop (1581 DAGScheduler.scala)에서 org.apache.spark.SparkContext $$ anonfun $ $ 9.apply $ MCV의 $ 특검팀 (SparkContext.scala : 1740)를 중지 조직에서 org.apache.spark.util.Utils $ .tryLogNonFatalError (1229 Utils.scala) 에서 을 .apache.spark.SparkContext.stop (SparkContext.scala : 1739) org.apache.spark.SparkContext $$ anonfun $ 3.apply $ MCV $ SP (SparkContext.scala : 596)에서 org.apache.spark.util에서 .SparkShutdownHook.run (ShutdownHookManager.scala : 267) org.apache.spark.util.SparkShutdownHookManager $$ anonfun runAll $ $ 1 $$ anonfun에서 $ $ $ MCV SP 1.apply $ $ $ MCV의 SP 적용 (ShutdownHookManager.scala 239 org.apache에서) . spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ apply_schema $ MCV $ SP $ 1.apply (ShutdownHookManager.scala 239) org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun에서 $ apply_schema $ MCV $ SP $ 1.apply (ShutdownHookManager.scala 239) org.apache.spark.util.Utils $ .logUncaughtExceptions에서 (1765 Utils.scala) org.apache.spark.util.SparkShutdownHookManager에서 $$ anonfun $ runAll $ 1.apply $ MCV $ SP (ShutdownHookManager.scala : 239) org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.apply에서 (ShutdownHookManager.scala : 239) org.apache.spark에서 .util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.apply (ShutdownHookManager.scala 239) scala.util.Try $ .apply에서 (Try.scala 161) org.apache.spark.util.SparkShutdownHookManager.runAll에서 (ShutdownHookManager.scala : 2 39) org.apache.spark.util.SparkShutdownHookManager $$ 익명의 $의 2.run에서 (ShutdownHookManager.scala : 218) 조직에서 org.apache.hadoop.util.ShutdownHookManager $ 1.run (ShutdownHookManager.java:54) 에서 org.apache.spark.SparkContext.runJob (SparkContext.scala : 1,832)에 : .apache.spark.scheduler.DAGScheduler.runJob (620 DAGScheduler.scala) org.apache.spark.SparkContext.runJob에서 (SparkContext.scala 1,845) org.apache.spark.SparkContext.runJob (SparkContext.scala에서 : org.apache.spark.SparkContext.runJob (SparkContext.scala에서 1,858) 1929)에서 org.apache.spark.rdd.RDD $$ anonfun $의 foreach $ 1.apply (RDD.scala 912) org.apache.spark.rdd.RDD $$ anonfun $의 foreach $ 1.apply에서 (RDD.scala : 910)에 org.apach e.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala 150)에서 org.apache.spark.rdd.RDD org.apache.spark.rdd.RDDOperationScope $ .withScope (111 RDDOperationScope.scala)에서 . org.apache.spark.rdd.RDD.foreach (RDD.scala : 910)에서 org.apache.spark.api.java.JavaRDDLike $의 class.foreach에서 (JavaRDDLike.scala : 범위 (316 RDD.scala)와 : sun.reflect.NativeMethodAccessorImpl에서 com.seigneurin.spark.GuardConnect.main (GuardConnect.java:60)에서 46) : org.apache.spark.api.java.AbstractJavaRDDLike.foreach (JavaRDDLike.scala 332) . sun.reflect.DelegatingMethodAccessorImpl.invoke에서 sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) 에서 invoke0 (기본 방법) (DelegatingMethodAccess orImpl.java:43) java.lang.reflect.Method.invoke (Method.java:498에서 )에서 org.apache.spark.deploy.SparkSubmit $ .ORG 아파치 $ $ $는 $ SparkSubmit $$ runMain를 배포 킥 (SparkSubmit.scala : 731) org.apache.spark.deploy.SparkSubmit $ .doRunMain $ 1 (SparkSubmit.scala에서 : $ org.apache.spark.deploy.SparkSubmit .submit에서의 181) (SparkSubmit.scala 206) org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala)에서 08/16/17 0시 17분 22초 ERROR : org.apache.spark.deploy.SparkSubmit $의 .main (121 SparkSubmit.scala)에서 LiveListenerBus는 : SparkListenerBus는 alreadyloggedin 중지! 삭제 이벤트 SparkListenerStageCompleted ([email protected]) 16/08/17 0시 17분 22초 정보 DAGScheduler : 결과 다케 (1) (탄성에 수집합니다.java : 94)가 9,301 초 동안 실패했습니다. 16/08/17 00:17:22 ERROR LiveListenerBus : SparkListenerBus가 이미 중지되었습니다! 이벤트 삭제 SparkListenerStageCompleted ([email protected]) 16/08/17 00:17:22 ERROR LiveListenerBus : SparkListenerBus가 이미 중지되었습니다! SparkListenerJobEnd (0,1471385842813, JobFailed (org.apache.spark.SparkException : SparkContext가 종료되어 작업 0이 취소됨) 16/08/17 00:17:22 INFO DAGScheduler : 작업 1이 실패했습니다. Elastic에서 수집합니다. java : 94, 9,317650 s 16/08/17 00:17:22 오류 실행자 : 0.0 단계 (TID 0)의 작업 0.0 예외 org.apache.spark.SparkException : SparkContext가 종료되어 작업 1이 취소됨 다운 at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ cleanUpAfterSchedulerStop $ 1.apply (DAGScheduler.scala : 806) at org.apache.spark.scheduler.DAGScheduler $$ anonfun $ cleanUpAfterSchedulerStop $ 1.apply (DAGScheduler.scala : 804) at scala.collection.mutable.HashSet.foreach (HashSet.scala : 79) at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop (DAGScheduler.s org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop (DAGScheduler.scala : 1658) at org.apache.spark.util.EventLoop.stop (EventLoop.scala : 84) at org.apache. spark.scheduler.DAGScheduler.stop (DAGScheduler.scala : 1581) 에서 org.apache.spark.SparkContext $$ anonfun $ stop $ 9.apply $ mcV $ sp (SparkContext.scala : 1740) at org.apache.spark. util.Utils $ .tryLogNonFatalError (Utils.scala : 1229) at org.apache.spark.SparkContext.stop (SparkContext.scala : 1739) at org.apache.spark.SparkContext $$ anonfun $ 3.apply $ mcV $ sp (SparkContext.scala : 596) at org.apache.spark.util.SparkShutdownHook.run (ShutdownHookManager.scala : 267) at org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ appl y $ mcV $ sp $ 1.apply $ mcV $ sp (ShutdownHookManager.scala : 239) (org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ 적용 $ mcV $ sp $ 1.apply (ShutdownHookManager .scala : 239) at org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1 $$ anonfun $ apply $ mcV $ sp $ 1.apply (ShutdownHookManager.scala : 239) at org.apache.spark.util .Utils $ .logUncaughtExceptions (Utils.scala : 1765) org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.apply $ mcV $ sp (ShutdownHookManager.scala : 239) at org.apache.spark. util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.apply (ShutdownHookManager.scala : 239) 을 org.apache.spark.util.SparkShutdownHookManager $$ anonfun $ runAll $ 1.apply (ShutdownHookManager.scala : 239) (scala.util)에 있습니다. Try $ .apply (Try.scala : 161) at org.apache.spark.util.SparkShutdownHookManager.runAll (ShutdownHookManager.scala : 239) at org.apache.spark.util.SparkShutdownHookManager $$ anon $ 2.run (ShutdownHookManager.scala : 218) at org.apache. hadoop.util.ShutdownHookManager $ 1.run (ShutdownHookManager.java:54) at org.apache.spark.scheduler.DAGScheduler.runJob (DAGScheduler.scala : 620) at org.apache.spark.SparkContext.runJob (SparkContext.scala : 1,832) org.apache.spark.SparkContext.runJob (SparkContext.scala에서 : org.apache.spark.SparkContext.runJob (SparkContext.scala에서 1,845) : 1,858)에 org.apache.spark.SparkContext.runJob (SparkContext.scala : 1929) at org.apache.spark.rdd.RDD $$ anonfun $ collect $ 1.apply (RDD.scala : 927) at org.apac hel.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala : 150) at org.apache.spark.rdd.RDDOperationScope $ .withScope (RDDOperationScope.scala : 111) at org.apache.spark.rdd.RDD. withScope (RDD.scala : 316) at org.apache.spark.rdd.RDD.collect (RDD.scala : 926) at org.apache.spark.api.java.JavaRDDLike $ class.collect (JavaRDDLike.scala : 339) at org.apache.spark.api.java.AbstractJavaRDDLike.collect (JavaRDDLike.scala : 46) at com.seigneurin.spark.Elastic.RddToListHits (Elastic.java:94) com.seigneurin에서 com.seigneurin.spark.OXO.startDetectNow (OXO.java:148)에서 com.seigneurin.spark.OXO.prepareDataAndLearn (OXO.java:126) 에서 . spark.GuardConnect.lambda main $ 1282d8df $ 1 (GuardConnect.java:60) at org.apache.spark.api.java.JavaRDDLike $$ anonfun $ foreach $ 1.apply (JavaRDDLike.scala : 332) at org.apache .spark.api.java.JavaRDDLike $$ anonfun $ foreach $ 1.apply (JavaRDDLike.scala : 332) at scala.collection.Iterator $ class.foreach (Iterator.scala : 727) at org.apache.spark.InterruptibleIterator .foreach (InterruptibleIterator.scala : 28) at org.apache.spark.rdd.RDD $$ anonfun $ foreach $ 1 $$ anonfun $ apply 32.apply (RDD.scala : 912) at org.apache.spark.rdd .RDD $$ anonfun $ foreach $ 1 $$ anonf $ 32.apply (RDD.scala : 912) org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply (SparkContext.scala : 1858) at org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply (SparkContext.scala : 1858) at org.apache.spark.scheduler.ResultTask.runTask (ResultTask.scala : 66) at org.apache.spark.scheduler.Task.run (Task.scala : 89) at org.apache.spark.executor.Executor $ TaskRunner.run (Executor.scala : 214) at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) at java.lang.Thread.run (Thread.java:745) 16/08/17 00:17:22 ERROR LiveListenerBus : SparkListenerBus가 이미 중지되었습니다! SparkListenerJobEnd (1,1471385842814, JobFailed (org.apache.spark.SparkException : SparkContext가 종료되어 작업 1이 취소되었습니다.) 16/08/17 00:17:22 INFO MapOutputTrackerMasterEndpoint : MapOutputTrackerMasterEndpoint가 중지되었습니다. 16/08/17 00:17:22 정보 MemoryStore : MemoryStore가 지워짐 16/08/17 00:17:22 정보 BlockManager : BlockManager가 중지됨 16/08/17 00:17:22 정보 BlockManagerMaster : BlockManagerMaster가 중지됨 16/08/17 00:17:22 정보 OutputCommitCoordinator $ OutputCommitCoordinatorEndpoint : OutputCommitCoordinator가 중지되었습니다! 16/08/17 00:17:22 INFO RemoteActorRefProvider $ RemotingTerminator : 원격 데몬 종료 중. 16/08/17 00:17:22 정보 RemoteActorRefProvider $ RemotingTerminator : 원격 데몬 종료; 원격 전송을 플러시하는 것으로 진행합니다. 16/08/17 00:17:22 정보 TaskSetManager : 1.0 단계 (TID 1, 로컬 호스트, 파티션 0, ANY, 6751 바이트)에서 작업 0.0 시작 16/08/17 00:17:22 오류받은 편지함 : 오류 무시 java.util.concurrent.RejectedExecutionException : 작업 [email protected][email protected]에서 거부 됨 [풀 크기 = 0, 활성 스레드 = 0, 대기중인 작업 = 0, 완료된 작업 = 1] at java.util.concurrent.ThreadPoolExecutor $ AbortPolicy.rejectedExecution (ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject (ThreadPoolExecutor.java:823) at java.util .concurrent.ThreadPoolExecutor.execute (ThreadPoolExecutor.java:1369) at org.apache.spark.executor.Executor.launchTask (Executor.scala : 128) at org.apache.spark.scheduler.local.LocalEndpoint $$ anonfun $ reviveOffers $ 1.apply (LocalBackend.scala : 86) at org.apache.spark.scheduler.local.LocalEndpoint $$ anonfun $ reviveOffers $ 1.apply ((LocalBackend.scala : 84) at org.apache.spark.scheduler.local.LocalEndpoint.scheduleer.ffl (LocalBackend.scala : 84) apache.spark.scheduler.local.LocalEndpoint $$ anonfun $ 수신 $ 1.applyOrElse (LocalBackend.scala : 69) at org.apache.spark.rpc.netty.Inbox $$ anonfun $ process $ 1.apply $ mcV $ sp (Inbox.scala : 116) at org.apache.spark.rpc.netty.Inbox.safelyCall (Inbox.scala : 204) at org.apache.spark.rpc.netty.Inbox.process (Inbox.scala : 100) at org.apache.spark.rpc.netty.Dispatcher $ Me ssageLoop.실행 (Dispatcher.scala : 215) at java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) at java. lang.Thread.run (Thread.java:745) 16/08/17 00:17:22 정보 SparkContext : SparkContext를 성공적으로 중지했습니다. 16/08/17 00:17:22 정보 ShutdownHookManager : 종료 훅을 호출했습니다. 16/08/17 00:17:22 정보 ShutdownHookManager : 디렉터리 삭제/tmp/spark-8bf65e78-a916-4cc0-b4d1-1b0ec9a07157 16/08/17 00:17:22 INFO RemoteActorRefProvider $ RemotingTerminator : 원격 종료. 16/08/17 0시 17분 22초 정보 ShutdownHookManager : 삭제/tmp 디렉토리/스파크 8bf65e78-a916-4cc0-b4d1-1b0ec9a07157/아파치 - 6d3aeb80-808c-4749-8f8b-ac9341f990a7

감사하는 경우 너는 나에게 약간의 충고를 줄 수있다.

+0

도움이 필요하면 내부 예외가 필요합니다. 이 모든 것은 'foreach'에 문제가 있다는 것을 말해줍니다. –

+0

RDD 직후 잠시 동안 (1) 윙윙 거리기 때문에 어쩌면 윙윙 거릴까요? RDD로 작업을 스레딩 할 수 있다고 생각했습니다. –

+0

오류의 전체 메시지를 추가합니다. RDD의 RDD는 실제로 이해가되지 않지만, 그렇습니다. 컴파일러를 트릭하는 방법이 있습니다. – GameOfThrows

답변

0

RDD 유형, 즉 RDD 유형은 RDD 내에 만들 수 없습니다. 이것이 첫 번째 규칙입니다. 이것은 RDD가 데이터를 가리키는 추상화이기 때문입니다.

+0

괜찮습니다.) –