2017-10-06 3 views
0

아무도이 PySpark 문제를 도와주지 않겠습니까? 나는 이것에 일을 보냈다. 그냥 여러 번 인쇄 할 때 왜 내 스키마의 길이가 바뀌는 지 알 수 없습니다. Spark 버전은 2.2이고 Jupyter Notebook을 사용하여 20 노드의 클러스터에서 코드를 실행합니다. 여기 플랫 맵 이후에 스키마 길이가 변경되고 pyspark에 여러 번 인쇄 될 때

import myReader 

    # read data from binary files 
    data=sc.binaryFiles('Data/20171006') 

    # binary file reader convert binary file to a tuple of schema and data array 
    # the 1st item in the tuple is the schema of type StructType 
    # the 2nd item in the tuple is a numpy 2D array 
    tableWithSchemaRDD = data.map(myReader.convert) 

    # print out the length of the schema to check its length 
    # since the schema is the same for all items in the RDD, I only check the first one 
    print "1st print: ", len(tableWithSchemaRDD.first()[0]) 

    # extract the data array from RDD 
    tableRDD = tableWithSchemaRDD.map(lambda x:x[1]) 

    # print out the length of the schema to check its length again 
    print "2nd print: ", len(tableWithSchemaRDD.first()[0]) 

    # flatmap so each item in the rdd is a row instead of 2D array 
    # and sort all the rows by the last item in each row, which is a timestamp 
    rowRDD = tableRDD.flatMap(lambda y:[z for z in y]).sortBy(lambda x:x[-1]) 

    # print out the length of the schema multiple times 
    print "multiple print: " 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 
    print len(tableWithSchemaRDD.first()[0]) 

이 출력됩니다 : 여기

내 코드 당신이 처음 2 print는 RDD의 정확한 길이를 인쇄 볼 수 있듯이

1st print: 73 
    2nd print: 73 
    multiple print: 
    3961 
    3961 
    3961 
    3961 
    73 
    73 

하지만 이후 flatMap 변환, 훨씬 더 큰 숫자로 증가했다. 그런 다음 잠시 후 (예 : 4 print) 올바른 길이로 되돌아 왔습니다. 때로는 잘못된 숫자가 3961이 아니라 72의 배수 중 다른 숫자에 1을 더한 것입니다. 내 스키마의 첫 번째 72 StructField는 데이터의 이름이므로 73 번째 StructField는 타임 스탬프이므로 숫자 3961은 72 * 51 + 1입니다. 또한 11737, 23401, 9793, 2017 등의 숫자를 보았습니다.

코드에서 설명하는 코드는 myReader입니다. 그것을 모듈로 가져 오기. 나는이 문제를 보지 못했다. 내 모듈을 노드에 배포하는 데 sc.addPyFile()을 사용했습니다.

모든 의견 및 제안을 부탁드립니다. 고맙습니다!

답변

0

이 질문이 지금 언급되었으므로 결과에 예상치 못한 결과가 없습니다. 종류의 당신 때문에 :

.sortBy(lambda x: x[-1]) 

불꽃이 관계의 경우, 데이터를 셔플하고있다는 first 하나로서 임의의 요소를 선택할 수 있습니다. 결과 값은 실행마다 바뀔 수 있습니다.

+0

답변 해 주셔서 감사합니다. 왜 당신은 거기에 유대 관계가 있다고 생각하는지 물어봐도 될까요? 모든 타임 스탬프는 고유하며'sortBy'에 사용됩니다. – dangwh

관련 문제