2017-09-23 3 views
1

데이터 프레임이 있습니다.Json Parsing이 Spark UDF 내에서 예기치 않은 출력을 throw합니다.

해당 데이터 프레임의 모든 열의 데이터 유형은 문자열입니다. 열 중 일부는 내가 혼자 jsonString을 분석하고 그 사이의 값을 새로운 열로 것을 추가 할

+--------+---------+--------------------------+ 
|event_id|event_key|    rights  | 
+--------+---------+--------------------------+ 
|  410|(default)|{"conditions":[{"devic...| 
+--------+---------+--------------------------+ 

을 jsonString 있습니다. 잭슨 파서를 사용하고 있습니다. 여기

{ 
"conditions": [ 
    { 
     "devices": [ 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "IOS", 
       "type": "MOBILE", 
       "provider": "TELETV" 
      }, 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "ANDROID", 
       "type": "MOBILE", 
       "provider": "TELETV" 
      }, 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "IOS", 
       "type": "TABLET", 
       "provider": "TELETV" 
      }, 
      { 
       "connection": [ 
        "BROADBAND", 
        "MOBILE" 
       ], 
       "platform": "ANDROID", 
       "type": "TABLET", 
       "provider": "TELETV" 
      } 
     ], 
     "endDateTime": "2017-01-09T22:59:59.000Z", 
     "inclusiveGeoTerritories": [ 
      "DE", 
      "IT", 
      "ZZ" 
     ], 
     "mediaType": "Linear", 
     "offers": [ 
      { 
       "endDateTime": "2017-01-09T22:59:59.000Z", 
       "isRestartable": true, 
       "isRecordable": true, 
       "isCUTVable": false, 
       "recordingMode": "UNIQUE", 
       "retentionCUTV": "P7DT2H", 
       "retentionNPVR": "P2Y6M5DT12H35M30S", 
       "offerId": "MOTOGP-RACE", 
       "offerType": "IPPV", 
       "startDateTime": "2017-01-09T17:00:00.000Z" 
      } 
     ], 
     "platformName": "USA", 
     "startDateTime": "2017-01-09T17:00:00.000Z", 
     "territory": "USA" 
    } 
] 
} 

가 지금은 기존 dataframe에 새 열을 생성 할 "권리"의 값입니다. 추가 할 새 열의 이름이 "제공자"입니다.

conditions -> devices -> provider 

데이터 프레임에서 매우 많은 행에 대해이 작업을 수행하려고합니다. 그러므로 내가 UDF를 생성하고 난 내가 JSON 문자열을 구문 분석 원이 UDF에 해당 UDF 내부 jsonString를 보유하고 문자열로 값을 반환하는 을 필요로 열을 전달하고

내 스파크 코드 :

import org.apache.spark.sql.functions.udf 
import org.apache.spark.sql.functions._ 
import org.json4s._ 
import org.json4s.jackson.JsonMethods 
import org.json4s.jackson.JsonMethods._ 


    // 
    some codes to derive base dataframe 
    // 

    val fetchProvider_udf = udf(fetchProvider _) 
    val result = df.withColumn("provider",fetchProvider_udf(col("rights"))) 
    result.select("event_id,"event_key","rights","provider").show(10) 


    def fetchProvider(jsonStr:String): String = { 

    val json = JsonMethods.parse(jsonStr) 

    val providerData = json \\ "conditions" \\"devices" \\ "provider" 

    compact(render(providerData)) 
    } 

네비게이션 키를 사용할 수없는 경우 어떻게 처리합니까? 그것은 예외를 던집니까? "조건"이 있고 "장치"는 있지만 json 문자열에는 "공급자"키가 없다고 말할 수 있습니다. 그러면 어떻게 처리할까요?

누군가가 나에게

예상 출력 도움이 될 : 당신은 당신이 다음을 사용한다 최초의 프로 바이더의 값을 추출하려면하지만 아래의 출력

+--------+---------+-----------------------+-------------------------------  ------------------------------------------------------+ 
|event_id|event_key|    rights  |              provider  | 
     +--------+---------+-----------------------+--------------------------  -----------------------------------------------------------+ 
|  410|(unknown)|{"conditions":[{"devic...| {"provider":"TELETV","provider":"TELETV","provider":"TELETV","provider":"TELETV"  } | 
    +--------+---------+-----------------------+-----------------------------  --------------------------------------------------------+ 
+1

으로는 어떤 이유로하지가 있습니까 spark의'get_json_object'를 사용하고 싶습니까? – Mariusz

+0

요구 사항은 모든 스칼라 파서를 사용하는 것입니다. –

답변

0

을 얻고있다

+--------+---------+-----------------------+-------------+ 
|event_id|event_key|    rights  |provider  | 
+--------+---------+-----------------------+-------------+ 
|  410|(unknown)|{"conditions":[{"devic...| TELETV | 
+--------+---------+-----------------------+-------------+ 

을 UDF 내부 코드 :

(json \\ "conditions" \\"devices")[0] \\ "provider" 

현재 코드는 모든 제공자 (Map)를 가져오고 UDF 결과로 문자열로 변환됩니다.

또한 UDF가 모든 예외를 발생시키지 않도록해야합니다 (전체 작업이 실패 할 수 있기 때문에). 가장 쉬운 방법은 반환하는 것입니다 널 (null) 다음 :

  • 당신이 조사하려는 경우 - 필터를 df.provider.isNull()
  • 으로 만 유효한 항목을 유지하려면 - 필터를 df.provider.isNullNull()
관련 문제