2016-08-01 3 views
0

아파치 스파크를 시작합니다. json 로그를 병합 된 메트릭으로 변환해야한다는 요구 사항이 있으며 간단한 CSV로 간주 될 수도 있습니다.apache 스파크의 JSON 로그에서 집계 메트릭 만들기

예 :

"orderId":1, 
    "orderData": { 
    "customerId": 123, 
    "orders": [ 
    { 
     "itemCount": 2, 
     "items": [ 
     { 
      "quantity": 1, 
      "price": 315 
     }, 
     { 
      "quantity": 2, 
      "price": 300 
     }, 

     ] 
    } 
    ] 
} 

이것은 하나의 JSON 로그로 간주 될 수있다, 나는

orderId,customerId,totalValue,units 
    1 , 123 , 915 , 3 

내가 sparkSQL 문서를 통과하고 같은 개별 값 잡아하는 데 사용할 수 있습니다 "를 선택,이 점을 변환 할 orderId, orderData.customerId에서 주문 "하지만 모든 가격과 단위의 합계를 얻는 방법을 모르겠습니다.

아파치 스파크를 사용하여이를 수행하는 가장 좋은 방법은 무엇입니까?

+0

캔트 JSON을. ("/ path/to/file"). toDF(); df.registerTempTable ("df"); df.printSchema(); 그 후 SQL을 통해 집계를 수행합니까? –

+0

SQL을 통해 개별 요소를 보류 할 수 있지만 orders.items에 대해서는 확신 할 수 없습니다. 어떻게 집계를 실행할 수 있습니까? 나는 그것이 json 값으로 만 올 것이라고 생각한다. 내가 뭔가를 놓치고 있다면 나를 바로 잡아라. – fireants

+0

[this] (http://xinhstechblog.blogspot.in/2015/06/reading-json-data-in-spark-dataframes.html) 및 [중첩 된 json] (http : // xinhstechblog)을 통해 살펴볼 수 있습니다. .blogspot.in/2016/05/reading-json-nested-array-in-spark.html) –

답변

1

시도 : 위의 자바 솔루션을 찾고있는 사람들을 위해

>>> from pyspark.sql.functions import * 
>>> doc = {"orderData": {"orders": [{"items": [{"quantity": 1, "price": 315}, {"quantity": 2, "price": 300}], "itemCount": 2}], "customerId": 123}, "orderId": 1} 
>>> df = sqlContext.read.json(sc.parallelize([doc])) 
>>> df.select("orderId", "orderData.customerId", explode("orderData.orders").alias("order")) \ 
... .withColumn("item", explode("order.items")) \ 
... .groupBy("orderId", "customerId") \ 
... .agg(sum("item.quantity"), sum(col("item.quantity") * col("item.price"))) 
+0

작동 논리를 보내 주셔서 감사합니다. 자바로 매핑하고 다른 사람들을 위해 게시하려고 시도합니다. – fireants

0

는 따르십시오 : 우리가 DataFrame DF = sqlContext.read()처럼 할

SparkSession spark = SparkSession 
      .builder() 
      .config(conf) 
      .getOrCreate(); 

    SQLContext sqlContext = new SQLContext(spark); 

    Dataset<Row> orders = sqlContext.read().json("order.json"); 
    Dataset<Row> newOrders = orders.select(
      col("orderId"), 
      col("orderData.customerId"), 
      explode(col("orderData.orders")).alias("order")) 
      .withColumn("item",explode(col("order.items"))) 
      .groupBy(col("orderId"),col("customerId")) 
      .agg(sum(col("item.quantity")),sum(col("item.price"))); 
    newOrders.show(); 
관련 문제