저는 Spark 앱을 스칼라로 작성 중이며 더러운 입력 파일을 처리하려고합니다.Spark Scala scala.util.control.Exception지도에서 잡기 및 놓기 없음
// CSV file
val raw_data = sc.textFile(...)
val clean_data = raw_data.map(_.split(delimiter))
.map(r => (r(0), r(1).toDouble)
은 r (1)이 숫자가 아닐 때 NumberFormatException을 발생시킵니다. 이는 추한 입력 데이터의 적은 수의 행에서 발생합니다.
나는 마침내 내가 원하는 것을 달성하기 위해 추한 방법에 착륙 :이 두 가지 질문으로 나를 잎
import scala.util.control.Exception._
val clean_data = raw_data.map(_.split(delimiter))
.map(r => (r(0),
catching(classOf[NumberFormatException]).opt(r(1).toDouble))
.filter(r => r._2 != None)
.map(r => (r._1, r._2.get))
.
1) 잘못된 형식의 행을지도에 단순히 드롭하는 가장 좋은 방법은 무엇입니까?
2) None을 명시 적으로 필터링하지 않고 catch 옵션을 사용하여 생성 된 Option 유형을 처리 한 다음 non-None Option 값에 .get 함수를 매핑하고 적용하려면 어떻게해야합니까?
.flatMap (identity) 단계를 적용하여 Nones를 제거하려고 시도했지만 예상되는 : TraversableOnce [?] 예외가 발생했습니다.
좋아, 일부 해결 방법을 찾은 후에 (예 : Array (x, y)는 x와 y를 만들고 있었으므로 대신 (x, y)를 문자열로 보존해야했습니다.) 하지만 몇 가지 질문이 있습니다. 먼저, .toDouble을 두 번 호출해야한다는 것이 이상한 것처럼 보입니다. 한 번만 수행 할 수 있는지, 실제로 한 번 수행 할 것인지를 한 번 확인하십시오. 둘째,이 경우 드라이버에서 메모리로 RDD를 가져올 수 있기 때문에 .collect()가 제대로 작동합니다. 하지만 실제로 큰 데이터를 처리하고 .collect()를 사용할 수없는 경우 어떻게해야합니까? 도움을 주셔서 감사합니다! – Metropolis
re :'collect' 혼란 스럽습니다. RDD에는 두 가지'collect' 메소드가 있습니다. 매개 변수없는 버전'def collect() : Array [T]'는 계산을 트리거하고 모든 데이터를 드라이버에 전달합니다. 'def collect [U] (f : PartialFunction [T, U]) (암시 적 arg0 : ClassTag [U]) : RDD [U]'는 부분 함수를 취하는 스칼라'collect' 함수 결과로 다른 RDD를 생성하므로 다른 RDD 변환과 마찬가지로 여전히 병렬 적입니다. – maasg
rd : toDouble (x2) - 부분 함수 버전을 선호하기 때문에 사용하지만 실제로는 두 번 연산을 실행합니다. 다음은 대안입니다 :'data.flatMap (entry => Try (entry.toDouble) .toOption)' – maasg