2016-10-21 2 views
1

DSME 스파크 작업 서버을 사용하고 있습니다. 달성하고자하는 작업은 다음과 같습니다.Spark Job Server의 자바 프로그램이 scala.MatchError를 던졌습니다.

Java에서 생성 한 스파크 작업은 cassandra db에서 일부 데이터를 가져오고 DSE Analytics 클러스터에 배포됩니다.

코드를 다음과 같이

package com.symantec.nsp.analytics; 

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; 
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo; 

import java.io.Serializable; 
import java.util.List; 
import java.util.UUID; 

import org.apache.commons.lang.StringUtils; 
import org.apache.spark.SparkContext; 
import org.apache.spark.api.java.JavaSparkContext; 

import spark.jobserver.JavaSparkJob; 
import spark.jobserver.SparkJobInvalid; 
import spark.jobserver.SparkJobValid$; 
import spark.jobserver.SparkJobValidation; 

import com.symantec.nsp.analytics.model.Bucket; 
import com.typesafe.config.Config; 

public class JavaSparkJobBasicQuery extends JavaSparkJob { 

    public String runJob(JavaSparkContext jsc, Config config) { 
     try { 
      List<UUID> bucketRecords = javaFunctions(jsc).cassandraTable("nsp_storage", "bucket", mapRowTo(Bucket.class)) 
        .select("id", "deleted").filter(s -> s.getDeleted()).map(s -> s.getId()).collect(); 

      System.out.println(">>>>>>>> Total Buckets getting scanned by Spark :" + bucketRecords.size()); 
      return bucketRecords.toString(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
      return null; 
     } 
    } 

    public SparkJobValidation validate(SparkContext sc, Config config) { 
     return null; 
    } 

    public String invalidate(JavaSparkContext jsc, Config config) { 
     return null; 
    } 
} 

문제 :

이 코드 나는 아래의 문제가 무엇입니까 운동하는 동안 :

"status": "ERROR", 
    "result": 
    "message": "null", 
    "errorClass": "scala.MatchError", 
    "stack": ["spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:244)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)", "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)", "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)", "java.lang.Thread.run(Thread.java:745)"] 

누군가가 문제를 해결 할 수 있습니다. 참고 : /tmp 폴더를 여러 번 청소하려고했습니다. 이 문제를 해결할 수 없습니다. 제가 사용하는 DSE 버전은 4.8.10입니다.

답변

0

예외에서 null을 반환하지 않아도되는지 잘 모르겠습니다. 나는 그것을 전파하도록 남겨 둘 것이다.

0

null 문을 제거해 보았습니다. 여전히 문제는 지속됩니다. 나는 카산드라 테이블을 스캔하는 실제 Java 스파크 작업 샘플을 보지 못했습니다. 어떤 사람이이 작업의 구조 (runJob 무시하기)와 그 물건이 맞는지 확인할 수 있습니까? 어떤 경우에는 cassandra db 테이블 스캔을 다루는 Java spark 작업 샘플이 있으면 도움이 될 것입니다.

관련 문제