2017-03-13 1 views
1

두 개의 RDD가 있습니다.파이썬 조건을 사용하여 RDD에 가입하십시오.

[Row(unic_key=1608422, idx=18, s_date='2016-12-31', s_time='15:00:07', c_ip='119.228.181.78', c_session='3hyj0tb434o23uxegpnmvzr0', origine_file='inFile', process_date='2017-03-13'), 
Row(unic_key=1608423, idx=19, s_date='2016-12-31', s_time='15:00:08', c_ip='119.228.181.78', c_session='3hyj0tb434o23uxegpnmvzr0', origine_file='inFile', process_date='2017-03-13'), 
] 

그리고 IP의 위치 정보 또 다른 RDD : 첫 번째는 정보 관련 IP 주소를 (COL에 따라 c_ip 참조)이 포함되어 있습니다.

network,geoname_id,registered_country_geoname_id,represented_country_geoname_id,is_anonymous_proxy,is_satellite_provider,postal_code,latitude,longitude,accuracy_radius 
1.0.0.0/24,2077456,2077456,,0,0,,-33.4940,143.2104,1000 
1.0.1.0/24,1810821,1814991,,0,0,,26.0614,119.3061,50 
1.0.2.0/23,1810821,1814991,,0,0,,26.0614,119.3061,50 
1.0.4.0/22,2077456,2077456,,0,0,,-33.4940,143.2104,1000 

이 두 가지를 일치시키고 싶습니다.하지만 문제는 두 RDD의 열간에 엄격한 동일성이 없습니다.

나는 Python3 패키지 IPADDRESS를 사용하고이 같은 체크하고 싶으면 : 가입에서 모든 라인을 제외하지 않는 조인 (왼쪽 바깥을 수행하는 파이썬 함수를 사용하는

> import ipaddress 
> ipaddress.IPv4Address('1.0.0.5') in ipaddress.ip_network('1.0.0.0/24') 
True 

이 가능 내 첫 번째 RDD)? 어떻게해야합니까?

+0

사용중인 Spark의 버전 ? 1.x 또는 2.x? – Jaco

+0

나는 불꽃 1.6을 사용한다. – Steven

답변

1

Apache Spark 1.6을 사용할 때 조인에서 술어로 UDF 함수를 계속 사용할 수 있습니다. 테스트 데이터를 생성 한 후 :

def ip_range(ip, network_range): 
    return ipaddress.IPv4Address(unicode(ip)) in ipaddress.ip_network(unicode(network_range)) 

pred = udf(lambda ip, network_range:ipaddress.IPv4Address(unicode(ip)) in ipaddress.ip_network(unicode(network_range)), BooleanType()) 

을 그리고 조인 다음 경우에 당신은 UDF를 사용할 수 있습니다 :

import ipaddress 
from pyspark.sql.functions import udf 
from pyspark.sql.types import StringType, StructField, StructType, BooleanType, ArrayType, IntegerType 

sessions = sc.parallelize([(1608422,'119.228.181.78'),(1608423, '119.228.181.78')]).toDF(['unic_key','c_ip']) 

geo_ip = sc.parallelize([('1.0.0.0/24',2077456,2077456), 
         ('1.0.1.0/24',1810821,1814991), 
         ('1.0.2.0/23',1810821,1814991), 
         ('1.0.4.0/22',2077456,2077456)]).toDF(['network','geoname_id','registered_country_geoname_id']) 

다음과 같이 당신은 UDF 술어를 만들 수 있습니다

sessions.join(geo_ip).where(pred(sessions.c_ip, geo_ip.network)) 

불행하게도이 현재 Spark 2.x에서 작동하지 않습니다. https://issues.apache.org/jira/browse/SPARK-19728

관련 문제