2016-07-11 3 views
2

데이터베이스에서 데이터를 쿼리하고 일부 변환을 수행하고 hdfs에 새 데이터를 마루판 형식으로 저장하려고합니다.spark 데이터 프레임을 쪽모 세공 형식으로 쓸 때 메모리 부족 오류가 발생했습니다.

데이터베이스 쿼리가 많은 수의 행을 반환하기 때문에 데이터를 일괄 처리하고 모든 들어오는 일괄 처리에서 위 프로세스를 실행합니다.

UPDATE 2, 배치 처리 로직은 다음

import scala.collection.JavaConverters._ 

import org.apache.spark.SparkContext 
import org.apache.spark.sql.SQLContext 
import org.apache.spark.sql.Row 
import org.apache.spark.sql.types.{StructType, StructField, StringType} 

class Batch(rows: List[String], 
      sqlContext: SQLContext) { 

     // The actual schema has around 60 fields 
     val schema = Array("name", "age", "address").map(field => 
         StructField(field, StringType, true) 
        ) 

     val transformedRows = rows.map(rows => { 

       // transformation logic (returns Array[Array[String]] type) 

      }).map(row => Row.fromSeq(row.toSeq)) 

     val dataframe = sqlContext.createDataFrame(transformedRows.asJava, schema) 

} 

val sparkConf = new sparkConf().setAppName("Spark App") 
val sparkContext = new SparkContext(sparkConf) 
val sqlContext = new SQLContext(sparkContext) 

// Code to query database 
// queryResponse is essentially an iterator that fetches the next batch on calling queryResponse.next 

var batch_num = 0 

while (queryResponse.hasNext) { 
    val batch = queryResponse.next 

    val batchToSave = new Batch(
          batch.toList.map(_.getDocument.toString), 
          sqlContext) 

    batchToSave.dataframe.write.parquet(batch_num + "_Parquet") 

    batch_num += 1 

} 

내 스파크 1.6.1의 버전과 스파크 제출은 :

spark-submit target/scala-2.10/Spark\ Application-assembly-1.0.jar 

문제는 그 특정 후 배치 수가 많으면 java.lang.OutOfMemoryError 오류가 발생합니다.

전체 스택 트레이스는 다음과 같습니다

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space 
    at java.util.Arrays.copyOfRange(Arrays.java:2694) 
    at java.lang.String.<init>(String.java:203) 
    at java.lang.StringBuilder.toString(StringBuilder.java:405) 
    at scala.StringContext.standardInterpolator(StringContext.scala:125) 
    at scala.StringContext.s(StringContext.scala:90) 
    at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:70) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:52) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) 
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) 
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334) 
    at app.Application$.main(App.scala:156) 
    at app.Application.main(App.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

내가 단일 파티션에 데이터를 병합하려고하지만 어떤 차이를하지 않았다.

dataframe.coalesce(1).write.parquet(batch_num + "_Parquet") 

어떤 도움을 주시면 감사하겠습니다.

UPDATE

1은 유착이 RDD에 변환을 수행하지 여전히 오류를 제공하지만, 다음과 같이 스택 트레이스입니다. 마루에 문제가있는 것 같습니다.

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1855) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1868) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1945) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58) 
    at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56) 
    at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55) 
    at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:256) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:139) 
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:334) 
    at app.Application$.main(App.scala:156) 
    at app.Application.main(App.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.lang.OutOfMemoryError: Java heap space 
    at org.apache.parquet.column.values.dictionary.IntList.initSlab(IntList.java:90) 
    at org.apache.parquet.column.values.dictionary.IntList.<init>(IntList.java:86) 
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:93) 
    at org.apache.parquet.column.values.dictionary.DictionaryValuesWriter$PlainBinaryDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:229) 
    at org.apache.parquet.column.ParquetProperties.dictionaryWriter(ParquetProperties.java:131) 
    at org.apache.parquet.column.ParquetProperties.dictWriterWithFallBack(ParquetProperties.java:178) 
    at org.apache.parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:203) 
    at org.apache.parquet.column.impl.ColumnWriterV1.<init>(ColumnWriterV1.java:83) 
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.newMemColumn(ColumnWriteStoreV1.java:68) 
    at org.apache.parquet.column.impl.ColumnWriteStoreV1.getColumnWriter(ColumnWriteStoreV1.java:56) 
    at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:183) 
    at org.apache.parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:375) 
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:109) 
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:99) 
    at org.apache.parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:100) 
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:303) 
    at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:262) 
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetRelation.scala:94) 
    at org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anon$3.newInstance(ParquetRelation.scala:286) 
    at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129) 
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:255) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:89) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
+1

아니요 아니요 아니요 아니요. 합병하지 마세요. 그건 당신이하고 싶은 것과는 반대입니다. – GameOfThrows

+0

[ "java.lang.OutOfMemoryError : Java 힙 공간"오류 (64MB 힙 크기)를 처리하는 방법] (http://stackoverflow.com/questions/37335/how-to-deal-with-java- lang-outofmemoryerrorror-java-heap-space-error-64mb-heap) –

+0

데이터를 병합하기 때문에 모든 파티션에서 데이터가 사용 가능한 메모리보다 크면 OOM을 갖게됩니다. 단일 파티션으로 병합하는 것은 대개의 경우 확장 성 이점을 대부분 취하기 때문에 바람직하지 않습니다. –

답변

2

오늘도 같은 문제가있었습니다. 내 실행 계획이 다소 복잡해졌으며 toString은 150MB의 정보를 생성하여 스칼라의 문자열 보간과 결합하여 드라이버가 메모리 부족으로 이어지는 결과를 낳았습니다.

드라이버 메모리를 늘릴 수 있습니다 (8GB에서 16GB로 두 배로 늘려야 함).

관련 문제