2017-02-18 2 views
0

두 개의 .txt 데이터 파일이 있습니다. 첫 번째 열은 두 개의 열 (영화, 영화)을 포함하고 두 번째 열은 두 개의 열 (영화, 뷰어)을 포함합니다 (아래 예 참조). 내가 뭘하고 싶은지는 이며 최대 시청자 수와 함께 cinema_1에 표시된 영화를 찾으십시오. 위의 예에서두 개의 데이터 프레임을 결합하여 값을 합산하고 최대 값을 얻으십시오.

+----------+---------+ 
| movie | cinema | 
+----------+---------+ 
| movie_1 | cinema_2 | 
| movie_2 | cinema_3 | 
| movie_4 | cinema_1 | 
| movie_3 | cinema_1 | 
+------+-------------+ 

+----------+---------+ 
| movie | viewers | 
+----------+---------+ 
| movie_1 | 10 | 
| movie_2 | 98 | 
| movie_4 | 100 | 
| movie_3 | 19 | 
| movie_1 | 340 | 
| movie_3 | 31 | 
+------+-------------+ 

는 두 후보 movie_3movie_4 (cinema_1에 도시)이며, (movie_3 50 (19 + 31) 뷰를 갖는다) 정답 100 전망 movie_4이다.

1 단계 : 내가 지금까지했던 어떤

데이터

val moviesCinemas = sparkSession.read 
     .format("com.databricks.spark.csv") 
     .option("header", "true") 
     .option("mode", "DROPMALFORMED") 
     .load("moviesCinemas.txt"); 

    val moviesViewers = sparkSession.read 
     .format("com.databricks.spark.csv") 
     .option("header", "true") 
     .option("mode", "DROPMALFORMED") 
     .load("moviesViewers.txt"); 

2 단계하기 : 영화는 최고의 cinema_1

val cinema1Movies = moviesCinemas.filter(col("cinema").like("cinema_1")) 

에 표시하기 ~까지 :

+----------+---------+ 
| movie | cinema | 
+----------+---------+ 
| movie_4 | cinema_1 | 
| movie_3 | cinema_1 | 
+------+-------------+ 

3 단계 : 이제 두 개의 영화를 보려면 최대 시청자 수 (데이터 프레임 moviesViewers)를 요약하고 최대 수를보고해야합니다. 이것은 내가 실제로 붙어있는 곳입니다.

은 내가 cinema1MoviesmoviesViewers dataframes 가입을 시도했습니다

다음과 같은 결과 제공
val joinMoviesViewers = moviesViewers.join(cinema1Movies, Seq("movie")) 

: 나는 각 movieviewers을 요약하는 방법을 아주 잘 모르겠습니다 이제

+----------+---------+ 
| movie | viewers | 
+----------+---------+ 
| movie_4 | 100 | 
| movie_3 | 19 | 
| movie_3 | 31 | 
+------+-------------+ 

을 이런 식으로 뭔가를 얻으려면 (그리고 최대 시청자와 함께 영화를 얻으십시오) :

+----------+---------+ 
| movie | viewers | 
+----------+---------+ 
| movie_4 | 100 | 
| movie_3 | 50 | 
+------+-------------+ 

답변

1

시작 :

이 또한 작동
val aggJoin = joinMoviesViewers.groupBy("movie").agg(sum($"viewers").as("viewers")) 
// aggJoin: org.apache.spark.sql.DataFrame = [movie: string, viewers: bigint] 

val maxViewers = aggJoin.agg(max($"viewers")).first().getLong(0) 
// maxViewers: Long = 100 

// depending on what data type you have for viewers, you might use getDouble here 
// val maxViewers = aggJoin.agg(max($"viewers")).first().getDouble(0) 

aggJoin.filter($"viewers" === maxViewers).show 
+-------+-------+ 
| movie|viewers| 
+-------+-------+ 
|movie_4| 100| 
+-------+-------+ 
+0

그것은 작동하지만'.getLong (0)'대신'.getDouble (0)'을 추가하십시오. 그렇지 않으면 예외가 발생합니다. 답변을 수락하려면 답변을 수정하십시오. 고마워. –

1

다음은 결과를 유도하는 API 방식입니다.

import org.apache.spark.sql.functions._ 

val result = moviesCinemas 
    .filter($"cinema" === "cinema_1") 
    .join(moviesViewers, "movie") 
    .select(moviesCinemas("movie"),moviesViewers("viewers")) 
    .groupBy($"movie") 
    .agg(sum($"viewers").as("sum_cnt")) 
    .orderBy($"sum_cnt".desc) 

    result.first 
    res34: org.apache.spark.sql.Row = [movie_4,100] 

아래에서 동일한 결과를 얻으려면 spark sql을 사용합니다.

결합 된 데이터 프레임에서
moviesCinemas.registerTempTable("movies_cinemas") 
moviesViewers.registerTempTable("movies_viewers") 

val spark = SparkSession.builder. 
    master("local") // set your master here 
    .appName("spark session example") 
    .getOrCreate() 

val result = spark.sql( 
""" 
SELECT 
t0.movie, 
sum(viewers) as total_viewers 
FROM 
movies_cinemas t0 JOIN movies_viewers t1 
on t0.movie = t1.movie 
WHERE t0.cinema = "cinema_1" 
GROUP BY t0.movie 
ORDER BY total_viewers desc 
""" 
) 

result.first 

res6: org.apache.spark.sql.Row = [movie_4,100] 
+0

! 고마워. –

관련 문제