2016-10-16 1 views
1

내가의 startDate와 endDate가간에 파일의 목록을 얻을 이러한 폴더에서 파일을 읽을하려고를 사용하여 동적 S3 파일 경로를 생성하는 방법 BucketName을/년/월/일/파일SPARK :</p> <p>내 파일 구조는 다음과 같습니다 예를 들면 다음과 같습니다 : 날짜 DIFF

s3://testBucket/2016/10/16/part00000 

이 파일들은 모두 jsons입니다. 문제는 별표 날짜와 종료일 사이의 모든 경로를로드해야합니다.

시작일 (10/16/2016) 및 종료일 (09/16/2016)으로 2011 년 9 월 16 일부터 읽기를 원합니다. (포함) .... .... .... 10/16/2016 (포함)

import org.joda.time.Days 
    import org.joda.time.DurationFieldType 
    import org.joda.time.LocalDate 
    import org.joda.time.format.DateTimeFormat 
    import org.joda.time.format.DateTimeFormatter 

    val s3Bucket: String = "S3://myTestBucket/" 

    val startTimestamp: String = "2016-09-16T00:00:00Z" 
    val endTimestamp: String = "2016-10-16T00:00:00Z" 

    val dtf: DateTimeFormatter = DateTimeFormat.forPattern("yyyy-MMM-dd") 
    val startDate: LocalDate = dtf.parseLocalDate(startTimestamp) 

    val endDate: LocalDate = dtf.parseLocalDate(endTimestamp) 


    val days: Int = Days.daysBetween(startDate, endDate).getDays 

    System.out.print(days) 

    val dates = new ListBuffer[String]() 
    var i: Int = 0 
    while (i < days) { 
     { 
     val d: LocalDate = startDate.withFieldAdded(DurationFieldType.days, i) 
     val tempDate: String = s3Bucket + d.getYear + "/" + d.getMonthOfYear + "/" + d.getDayOfMonth + "/" + "*" 
     dates += tempDate 
     } 
     { 
     i += 1; 
     } 
    } 
    val dateList = dates.toList 
    val files = dateList.mkString(", ") 
    sqlContext.read.json(files) 

이 방법이 좋습니까? 이 작업을 수행하는 다른 효율적인 방법이 있습니까? 주고받는 솔루션을 기반으로

이 오류 얻을 :

org.apache.spark.SparkException: Job 2 cancelled because SparkContext was shut down 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804) 
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) 
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658) 
    at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84) 
    at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581) 
    at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1731) 
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229) 
    at org.apache.spark.SparkContext.stop(SparkContext.scala:1730) 
    at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:147) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1929) 
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:927) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111) 
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:926) 
    at org.apache.spark.sql.sources.HadoopFsRelation$.listLeafFilesInParallel(interfaces.scala:904) 
    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:445) 
    at org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:477) 
    at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:489) 
    at org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:487) 
    at org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:494) 
    at org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$4.apply(JSONRelation.scala:110) 
    at org.apache.spark.sql.execution.datasources.json.JSONRelation$$anonfun$4.apply(JSONRelation.scala:109) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.sql.execution.datasources.json.JSONRelation.dataSchema$lzycompute(JSONRelation.scala:109) 
    at org.apache.spark.sql.execution.datasources.json.JSONRelation.dataSchema(JSONRelation.scala:108) 
    at org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:636) 
    at org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:635) 
    at org.apache.spark.sql.execution.datasources.LogicalRelation.<init>(LogicalRelation.scala:37) 
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125) 
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:136) 
    at org.apache.spark.sql.DataFrameReader.json(DataFrameReader.scala:263) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:58) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:63) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:65) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:67) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:69) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:71) 
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:73) 
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:75) 
    at $iwC$$iwC$$iwC.<init>(<console>:77) 
    at $iwC$$iwC.<init>(<console>:79) 
    at $iwC.<init>(<console>:81) 
    at <init>(<console>:83) 
    at .<init>(<console>:87) 
    at .<clinit>(<console>) 
    at .<init>(<console>:7) 
    at .<clinit>(<console>) 
    at $print(<console>) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:483) 
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) 
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
    at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:664) 
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:629) 
    at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:622) 
    at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57) 
    at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) 
    at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276) 
    at org.apache.zeppelin.scheduler.Job.run(Job.java:170) 
    at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

답변

2

내가 훨씬 더 효율적으로이 될 수 있다고 생각하지 않습니다, 그러나 그것은 확실히 관용적 (while를 사용 var) 아니다 및 수 더 짧고 간결하게 만들 수 :

val s3Bucket: String = "S3://myTestBucket/" 
val startDate: LocalDate = new LocalDate(2016, 9, 16) 
val endDate: LocalDate = new LocalDate(2016, 10, 16) 

val days: Int = Days.daysBetween(startDate, endDate).getDays 

val pathDTF = DateTimeFormat.forPattern("yyyy/MM/dd") 

val files: Seq[String] = (0 to days) 
    .map(startDate.plusDays) 
    .map(d => s"$s3Bucket${pathDTF.print(d)}/*") 

val result = sqlContext.read.json(files: _*) 

편집 : 감사 @Newbie에 대한 알았어 - 참으로 하나가 read.json(...)에 파일의 목록을 통과 할 수 없으므로 마지막 줄이 있어야한다 :

val result = sqlContext.read.json(sc.textFile(files.mkString(","))) 
+0

내가 여기 명시 적 파일 검사를 추가하거나 불꽃이 경우에 그 처리됩니다 필요 수행 S3 키 아무튼 경우 다음 폴더로 건너 뛰고 싶은 파일이 없습니다. – Newbie

+0

스파크 컨텍스트가 종료되고 있습니다. 파일 검사의 becuz입니까? – Newbie

+0

초보자, 예외가 근본 원인인지 증상인지 잘 모르겠습니다. 다른 스택 추적이 있습니까? – rado

관련 문제