2016-07-01 3 views
1

흥미롭게도 내가 스트리밍하고있는 일련의 사전에서 k, v 쌍의 수를 확인하려면 수표로 쓰고 싶지만 할 수있는 것 같지 않습니다. 이.Spark 스트리밍 및 스트리밍 된 사전 작업 수행

lines = ssc.socketTextStream("127.0.0.1", 5006) 
json_format = lines.flatMap(lambda recieved: json.loads(recieved)) 
dict_format = json_format.flatMap(lambda x : len(x)).reduce(lambda a, b: a+b) 

예를 들어 나는 다음과 같은 오류가 발생합니다 :

File "/home/xx/spark-1.6.1/python/pyspark/rdd.py", line 1776, in combineLocally 
    merger.mergeValues(iterator) 
    File "/home/xx/spark-1.6.1/python/pyspark/shuffle.py", line 236, in mergeValues 
    for k, v in iterator: 
TypeError: 'int' object is not iterable 

나는 우리가 사전 일련의가 있다고 가정 할 수 있습니다 -이 고장이 json.loads()에없는하지만이 간단한 길이를 취할 수없는 것.

답변

0

Spark은 flatMap에 제공된 함수가 소스 RDD/DStream에서 처리하는 각 요소에 대해 탐색 가능한/반복 가능한 결과 (예 : 목록)를 반환 할 것으로 기대합니다. TypeError: 'int' object is not iterable 오류는 아마도 Spark가 전달하는 람다 중 하나에서 반환 된 반복 불가능한 값을 반복하려고 시도하기 때문에 발생합니다. flatMap. len(...)는 항상 int를 반환하기 때문에

두 번째 flatMap 호출 (json_format.flatMap)는 확실히 문제이며, 그래서 가장 가능성있는 원인은 여기에있다. int (즉, 길이)에 일대일 변환을 수행하는 것이 목적이므로, flatMapmap으로 대체하여 문제를 해결할 수 있습니다.

첫 번째flatMap 호출이 유효한 지 여부는 입력에 따라 다릅니다. 소스 파일의 각 행이 JSON 배열로 파싱되는 문자열이라는 것이 확실하다면 예상대로 작동해야합니다. 그러나 파일의 모든 행에 대한 JSON 구문 분석에서 배열보다 기타이 생성되는 경우 구문 분석 함수는 반복 불가능한 결과를 flatMap으로 보내면보고있는 것과 유사한 오류로 작업이 실패합니다 현재 :

>>> type(json.loads('{"asdf": "qwerty"}')) 
<class 'dict'> 
>>> type(json.loads('[{"asdf": "qwerty"}, [1,2,3]]')) 
<class 'list'> 
>>> type(json.loads('3')) 
<class 'int'> 
관련 문제