2

저는 분류자를 훈련시키기 위해 자체 학습 접근법을 구현하려고합니다. 나는 불꽃 1.6.0을 사용하고있다. 문제는 RDD를 다른지도에 연결할 때 잘못 계산된다는 것입니다. 동일한 코드가 작은 데이터 세트에 대해서는 큰 효과를 발휘합니다. ------------------ 238.182Spark : 각 반복에서 RDD 항목이 누락되었습니다.

:

INITIAL 트레이닝 세트 사이즈 :

println("INITIAL TRAINING SET SIZE : " + trainingSetInitial.count()) 
for(counter <- 1 to 10){ 
    println("------------------- This is the_" + counter + " run -----------------") 
    println("TESTING SET SIZE : " + testing.count()) 

    val lowProbabilitiesSet = testing.flatMap { item => 
    if (model.predictProbabilities(item._2)(0) <= 0.75 && model.predictProbabilities(item._2)(1) <= 0.75) { 
     List(item._1) 
    } else { 
     None 
    }}.cache() 
    val highProbabilitiesSet = testing.flatMap { item => 
    if (model.predictProbabilities(item._2)(0) > 0.75 || model.predictProbabilities(item._2)(1) > 0.75) { 
     List(item._1 +","+ model.predict(item._2).toDouble) 
    } else { 
     None 
    }}.cache() 
    println("LOW PROBAB SET : " + lowProbabilitiesSet.count()) 
    println("HIGH PROBAB SET : " + highProbabilitiesSet.count()) 

    trainingSetInitial = trainingSetInitial.union(highProbabilitiesSet.map(x => LabeledPoint(List(x)(0).split(",")(8).toString.toDouble, htf.transform(List(x)(0).toString.split(",")(7).split(" "))))) 
    model = NaiveBayes.train(trainingSetInitial, lambda = 1.0) 
    println("NEW TRAINING SET : " + trainingSetInitial.count()) 

    previousCount = lowProbabilitiesSet.count() 
    testing = lowProbabilitiesSet.map { line => 
    val parts = line.split(',') 
    val text = parts(7).split(' ') 
    (line, htf.transform(text)) 
    } 
    testing.checkpoint() 
} 
올바른 출력에서 ​​로그 인 3.158.722

LOW PROBAB의 SET : 22.996-이 the_1 실행 -----------------

TESTING SET 크기입니다

HIGH PROBAB의 SET : 3.135.726

새로운 훈련 SET가 : 3,373,908

-------------------이 the_2 실행 - ---------------

테스팅 SET 크기 : 22,996

LOW PROBAB의 SET : 566

HIGH PROBAB의 SET : 22,430

새로운 훈련 SET : ----- 31.990.660

:

초기 훈련 집합 크기 : 문제가 시작되면 여기 3,396,338

그리고는 (대형 데이터 세트 입력)입니다 423.173.780

: --------------이 the_1 실행 -----------------

TESTING SET 크기입니다

LOW PROBAB의 SET : 62.615.460

HIGH PROBAB의 SET : 360.558.320

새로운 훈련 SET : 395265857

----------------- 52673986

LOW PROBAB의 SET : 51460875

-이 the_2 실행 -----------------

TESTING SET 크기입니다 510,403,210 HIGH PROBAB 세트 : 1,213,111

NEW 트레이닝 세트 : 401,950,263

첫번째 반복에 'LOW PROBAB 세트'번째 반복에 대한 'TESTING의 SET'이어야한다. 어딘가에 어떻게 든 1 천만 항목이 사라집니다.또한 첫 번째 반복에서 'NEW TRAINING SET'은 'INITIAL TRAINING'과 'HIGH PROB SET'의 연결이어야합니다. 다시 숫자가 일치하지 않습니다.

코드를 실행하는 동안 오류가 발생하지 않았습니다. 나는 각각의 세트를 캐쉬하려했으나 각 반복의 끝에서 unpersist했다 (HIGH와 LOW 세트 만). 그러나 같은 결과. 나는 또한 체크 포인트를 시도했으나 작동하지 않았다. 왜 이런 일이 일어나는 걸까요?

편집

그냥 테스트를 위해 난 그냥 어떻게되는지 루프 내부의 새로운 모델을 만들지 않은 : 원래 모델에서,

for(counter <- 1 to 5){ 
    println("------------------- This is the_" + counter + " run !!! -----------------") 
    var updated_trainCnt = temp_train.count(); 
    var updated_testCnt = test_set.count(); 
    println("Updated Train SET SIZE: " + updated_trainCnt) 
    println("Updated Testing SET SIZE: " + updated_testCnt) 

    val highProbabilitiesSet = test_set.filter { item => 
    val output = model.predictProbabilities(item._2) 
    output(0) > 0.75 || output(1) > 0.75 
    }.map(item => (item._1 + "," + model.predict(item._2), item._2)).cache() 

    test_set = test_set.filter { item => 
    val output = model.predictProbabilities(item._2) 
    output(0) <= 0.75 && output(1) <= 0.75 
    }.map(item => (item._1, item._2)).cache() 
    var hiCnt = highProbabilitiesSet.count() 
    var lowCnt = test_set.count() 
    println("HIGH PROBAB SET : " + hiCnt) 
    println("LOW PROBAB SET : " + lowCnt) 
    var diff = updated_testCnt - hiCnt - lowCnt 
    if (diff!=0) println("ERROR: Test set not correctly split into high low" + diff) 
    temp_train= temp_train.union(highProbabilitiesSet.map(x => LabeledPoint(x._1.toString.split(",")(8).toDouble, x._2))).cache() 
    println("NEW TRAINING SET: " + temp_train.count()) 
//  model = NaiveBayes.train(temp_train, lambda = 1.0, modelType = "multinomial") 
    println("HIGH PROBAB SET : " + highProbabilitiesSet.count()) 
    println("LOW PROBAB SET : " + test_set.count()) 
    println("NEW TRAINING SET: " + temp_train.count()) 
} 

생성 된 번호를 확인 심지어 노조했다 RDD가 문제없이 수행되었습니다. 그러나 큰 문제는 남아 있습니다. 분류 모델이 각 루프 (또는 다른 RDD)의 끝에서 수정하지 않고도 교육 세트 (lowProbabilititesSet)를 어떻게 엉망으로 만들었습니까?

콘솔 로그 및 스파크 로그에는 오류나 실행자의 호감이 표시되지 않습니다. 분류 교육 과정에서 데이터가 어떻게 손상됩니까?

+0

최소한의 로직으로 코드를 단순화하십시오. 그러면 귀하와 Google 모두 문제를 발견하는 데 도움이됩니다. –

+0

나는 노동 조합이 문제라고 생각한다. 노동 조합을 시도해 보라. 노조가 두 세트간에 중복을 제거합니다. –

+0

데이터 프레임이 아닌 RDD를 처리하고 있습니다. –

답변

-2

지금 당장 문제가 표시되지 않습니다. 코드를 실제 문제로 최소화하십시오. 우선은 내가 filterflatMap 작업을 재 작성하는 것이 좋습니다 것입니다 :

val highProbabilitiesSet = testing.flatMap { item => 
    if (model.predictProbabilities(item._2)(0) > 0.75 || model.predictProbabilities(item._2)(1) > 0.75) { 
     List(item._1 +","+ model.predict(item._2).toDouble) 
    } else { 
    None 
    } 
}.cache() 

받는 사람 : 난 아직도 파악하지 않은 비록

val highProbabilitiesSet = testing.filter { item => 
    val output = model.predictProbabilities(item._2) 
    output(0) > 0.75 || output(1) > 0.75 
}.map(item => (item._1, model.predict(item._2).toDouble)).cache() 
+0

필자는 이것을 확인 하겠지만 map을 사용하여 flatMap을 작성하는 또 다른 방법이라고 생각합니다. 나는 그것이 뭔가 다른 것을 생산할 것이라고 생각하지 않는다. 곧 알려 드리겠습니다. –

+0

아무 것도 변경되지 않았다고 생각 했으므로 작은 데이터 세트에서는 괜찮 았지만 큰 데이터 세트에서 이상한 결과가 나타났습니다. –

0

이유는 RDDs를 플러시 해킹으로 이런 일이 를 HDFS에 보내고 반복적으로 클래스를 실행하고 매번 HDFS에서 데이터를 읽는 bash 스크립트를 작성했습니다. 루프 내부에서 분류자를 훈련 할 때 문제가 나타납니다.

관련 문제