고유 한 정수 ID에 문자열 ID를 변환, 각 사용자와 제품 ID는 문자열입니다 : ~ 280 만 개 제품 300 만명의 사용자와스파크 나는이처럼 보이는 데이터 세트를
userA, productX
userA, productX
userB, productY
; 약 21 억 사용자 제품 협회.
나의 최종 목표는이 데이터 세트에서 Spark 협업 필터링 (ALS)을 실행하는 것입니다. 사용자와 제품에 대해 int 키를 사용하기 때문에 첫 번째 단계는 각 사용자와 제품에 고유 한 int를 할당하고 사용자와 제품이 int로 표시되도록 위의 데이터 집합을 변환하는 것입니다.
가 여기에 지금까지 시도한 작업은 다음과 같습니다
val rawInputData = sc.textFile(params.inputPath)
.filter { line => !(line contains "\\N") }
.map { line =>
val parts = line.split("\t")
(parts(0), parts(1)) // user, product
}
// find all unique users and assign them IDs
val idx1map = rawInputData.map(_._1).distinct().zipWithUniqueId().cache()
// find all unique products and assign IDs
val idx2map = rawInputData.map(_._2).distinct().zipWithUniqueId().cache()
idx1map.map{ case (id, idx) => id + "\t" + idx.toString
}.saveAsTextFile(params.idx1Out)
idx2map.map{ case (id, idx) => id + "\t" + idx.toString
}.saveAsTextFile(params.idx2Out)
// join with user ID map:
// convert from (userStr, productStr) to (productStr, userIntId)
val rev = rawInputData.cogroup(idx1map).flatMap{
case (id1, (id2s, idx1s)) =>
val idx1 = idx1s.head
id2s.map { (_, idx1)
}
}
// join with product ID map:
// convert from (productStr, userIntId) to (userIntId, productIntId)
val converted = rev.cogroup(idx2map).flatMap{
case (id2, (idx1s, idx2s)) =>
val idx2 = idx2s.head
idx1s.map{ (_, idx2)
}
}
// save output
val convertedInts = converted.map{
case (a,b) => a.toInt.toString + "\t" + b.toInt.toString
}
convertedInts.saveAsTextFile(params.outputPath)
내 클러스터에서이 작업을 실행 (5기가바이트 RAM 각각 40 개 집행) 할 때, 그것은 idx1map을 생산할 수 그리고 idx2map 잘 파일, 메모리 부족 오류로 실패하고 cogroup 후에 첫 번째 flatMap에서 실패를 가져옵니다. 전에 Spark을 많이 사용하지 않았으므로 이것을 수행하는 더 좋은 방법이 있는지 궁금합니다. 나는이 일에서 어떤 단계가 비쌀지에 관해 좋은 생각이 없다. 확실하게 코 그룹은 네트워크를 통해 전체 데이터 세트를 셔플 링해야합니다. 하지만이게 무슨 뜻입니까?
FetchFailed(BlockManagerId(25, ip-***.ec2.internal, 48690), shuffleId=2, mapId=87, reduceId=25)
난 그냥 해시 기능을 사용하지 않는 이유는 내가 결국 (10 억 개 제품, 10 억 사용자, 350 억 개 협회의 순서에) 훨씬 더 큰 데이터 세트에서이 작업을 실행하고 싶습니다이다 , Int 키 충돌 수는 상당히 커집니다. 해당 척도의 데이터 세트에서 ALS를 실행해도 실행 가능합니까?