2015-01-22 5 views
4

고유 한 정수 ID에 문자열 ID를 변환, 각 사용자와 제품 ID는 문자열입니다 : ~ 280 만 개 제품 300 만명의 사용자와스파크 나는이처럼 보이는 데이터 세트를

userA, productX 
userA, productX 
userB, productY 

; 약 21 억 사용자 제품 협회.

나의 최종 목표는이 데이터 세트에서 Spark 협업 필터링 (ALS)을 실행하는 것입니다. 사용자와 제품에 대해 int 키를 사용하기 때문에 첫 번째 단계는 각 사용자와 제품에 고유 한 int를 할당하고 사용자와 제품이 int로 표시되도록 위의 데이터 집합을 변환하는 것입니다.

가 여기에 지금까지 시도한 작업은 다음과 같습니다

val rawInputData = sc.textFile(params.inputPath) 
    .filter { line => !(line contains "\\N") } 
    .map { line => 
     val parts = line.split("\t") 
     (parts(0), parts(1)) // user, product 
    } 

// find all unique users and assign them IDs 
val idx1map = rawInputData.map(_._1).distinct().zipWithUniqueId().cache() 

// find all unique products and assign IDs 
val idx2map = rawInputData.map(_._2).distinct().zipWithUniqueId().cache() 

idx1map.map{ case (id, idx) => id + "\t" + idx.toString 
}.saveAsTextFile(params.idx1Out) 
idx2map.map{ case (id, idx) => id + "\t" + idx.toString 
}.saveAsTextFile(params.idx2Out) 

// join with user ID map: 
// convert from (userStr, productStr) to (productStr, userIntId) 
val rev = rawInputData.cogroup(idx1map).flatMap{ 
    case (id1, (id2s, idx1s)) => 
    val idx1 = idx1s.head 
    id2s.map { (_, idx1) 
    } 
} 

// join with product ID map: 
// convert from (productStr, userIntId) to (userIntId, productIntId) 
val converted = rev.cogroup(idx2map).flatMap{ 
    case (id2, (idx1s, idx2s)) => 
    val idx2 = idx2s.head 
    idx1s.map{ (_, idx2) 
    } 
} 

// save output 
val convertedInts = converted.map{ 
    case (a,b) => a.toInt.toString + "\t" + b.toInt.toString 
} 
convertedInts.saveAsTextFile(params.outputPath) 

내 클러스터에서이 작업을 실행 (5기가바이트 RAM 각각 40 개 집행) 할 때, 그것은 idx1map을 생산할 수 그리고 idx2map 잘 파일, 메모리 부족 오류로 실패하고 cogroup 후에 첫 번째 flatMap에서 실패를 가져옵니다. 전에 Spark을 많이 사용하지 않았으므로 이것을 수행하는 더 좋은 방법이 있는지 궁금합니다. 나는이 일에서 어떤 단계가 비쌀지에 관해 좋은 생각이 없다. 확실하게 코 그룹은 네트워크를 통해 전체 데이터 세트를 셔플 링해야합니다. 하지만이게 무슨 뜻입니까?

FetchFailed(BlockManagerId(25, ip-***.ec2.internal, 48690), shuffleId=2, mapId=87, reduceId=25) 

난 그냥 해시 기능을 사용하지 않는 이유는 내가 결국 (10 억 개 제품, 10 억 사용자, 350 억 개 협회의 순서에) 훨씬 더 큰 데이터 세트에서이 작업을 실행하고 싶습니다이다 , Int 키 충돌 수는 상당히 커집니다. 해당 척도의 데이터 세트에서 ALS를 실행해도 실행 가능합니까?

답변

2

본질적으로 모든 사용자 목록을 수집하고있는 것처럼 보입니다. 사용자를 다시 분류하는 것 같습니다. cogroup 대신에 join을 사용해보십시오. 여러분이 원하는 것보다 더 많이하는 것처럼 보입니다. 예 :

import org.apache.spark.SparkContext._ 
// Create some fake data 
val data = sc.parallelize(Seq(("userA", "productA"),("userA", "productB"),("userB", "productB"))) 
val userId = sc.parallelize(Seq(("userA",1),("userB",2))) 
val productId = sc.parallelize(Seq(("productA",1),("productB",2))) 

// Replace userName with ID's 
val userReplaced = data.join(userId).map{case (_,(prod,user)) => (prod,user)} 
// Replace product names with ID's 
val bothReplaced = userReplaced.join(productId).map{case (_,(user,prod)) => (user,prod)} 

// Check results: 
bothReplaced.collect()) // Array((1,1), (1,2), (2,2)) 

성능 향상에 대한 의견을 적어주십시오.

(어떤 의미인지 모르겠다. FetchFailed(...))

관련 문제