2016-10-02 1 views
1

두 개의 데이터 세트가 있고 각 데이터 세트에는 두 개의 요소가 있습니다. 다음은 예제입니다.키로 스칼라 스파크로 두 데이터 세트를 결합하는 방법

데이터 1 : (이름, 동물)

('abc,def', 'monkey(1)') 
('df,gh', 'zebra') 
... 

데이터 2 : (이름, 과일)

('a,efg', 'apple') 
('abc,def', 'banana(1)') 
... 

결과 예상 : (이름, 동물, 과일)

('abc,def', 'monkey(1)', 'banana(1)') 
... 

I 첫 번째 열 'name'을 사용하여이 두 데이터 집합에 참여하려고합니다. 나는 이것을 몇 시간 동안 해보려고했지만, 알아 내지 못했습니다. 누구든지 나를 도울 수 있습니까?

val sparkConf = new SparkConf().setAppName("abc").setMaster("local[2]") 
val sc = new SparkContext(sparkConf) 
val text1 = sc.textFile(args(0)) 
val text2 = sc.textFile(args(1)) 

val joined = text1.join(text2) 

위의 코드가 작동하지 않습니다!

+0

을 확인 결과하자? – maasg

+0

어떤 종류의 오류가 발생합니까? 그것은 무엇을 말하는가? – maasg

+0

@maasg ''기호 결합을 해결할 수 없습니다 '라는 메시지가 나타납니다. – tobby

답변

1

join는 쌍 RDDs에 정의 된 스칼라 쉘, 즉 타입 RDDs이다로부터 결과 RDD[(K,V)]. 첫 번째 단계는 입력 데이터를 올바른 유형으로 변환하는 것입니다.

우리는 첫번째 (Key, Value)의 쌍으로 유형 String의 원래 데이터를 변환해야합니다

val parse:String => (String, String) = s => { 
    val regex = "^\\('([^']+)',[\\W]*'([^']+)'\\)$".r 
    s match { 
    case regex(k,v) => (k,v) 
    case _ => ("","") 
    } 
} 

그런 다음

(키가 쉼표를 포함하고 있기 때문에 우리는 간단한 split(",") 표현을 사용할 수 없습니다) 이 함수를 사용하여 텍스트 입력 데이터를 구문 분석합니다.

val s1 = Seq("('abc,def', 'monkey(1)')","('df,gh', 'zebra')") 
val s2 = Seq("('a,efg', 'apple')","('abc,def', 'banana(1)')") 

val rdd1 = sparkContext.parallelize(s1) 
val rdd2 = sparkContext.parallelize(s2) 

val kvRdd1 = rdd1.map(parse) 
val kvRdd2 = rdd2.map(parse) 

마지막으로 join 방법은 두 RDDs

val joined = kvRdd1.join(kvRdd2) 

을 가입 // 당신`(키, 값)`튜플에 입력 된 텍스트를 분할하는 경우의이

joined.collect 

// res31: Array[(String, (String, String))] = Array((abc,def,(monkey(1),banana(1)))) 
+0

정말 고마워요! – tobby

+0

질문이 하나 더 있습니다. 데이터에서 작은 따옴표를 유지하려면 어떻게해야합니까? – tobby

+1

@tobby 따옴표를 유지하려면 정규식을 변경하십시오. – maasg

0

먼저 데이터 세트에 pairRDD를 작성해야하며 조인 변환을 적용해야합니다. 데이터 세트가 정확하지 않습니다.

아래의 예를 고려하십시오.

**Dataset1** 

a 1 
b 2 
c 3 

**Dataset2** 

a 8 
b 4 

하여 코드 스칼라 이하 같아야 여기

val pairRDD1 = sc.textFile("/path_to_yourfile/first.txt").map(line => (line.split(" ")(0),line.split(" ")(1))) 

    val pairRDD2 = sc.textFile("/path_to_yourfile/second.txt").map(line => (line.split(" ")(0),line.split(" ")(1))) 

    val joinRDD = pairRDD1.join(pairRDD2) 

    joinRDD.collect 

res10: Array[(String, (String, String))] = Array((a,(1,8)), (b,(2,4))) 
관련 문제