2017-03-09 5 views
0

pyspark dfdata에 두 개의 데이터 프레임이 있습니다. 스키마는 같은 두 데이터 프레임의 열 이름 비교 pyspark

>>> df.printSchema() 
root 
|-- id: integer (nullable = false) 
|-- name: string (nullable = true) 
|-- address: string (nullable = true) 
|-- nation: string (nullable = true) 
|-- Date: timestamp (nullable = false) 
|-- ZipCode: integer (nullable = true) 
|-- car: string (nullable = true) 
|-- van: string (nullable = true) 

>>> data.printSchema() 
root 
|-- id: integer (nullable = true) 
|-- name: string (nullable = true) 
|-- address: string (nullable = true) 
|-- nation: string (nullable = true) 
|-- date: string (nullable = true) 
|-- zipcode: integer (nullable = true) 

가 지금은 스키마를 모두 비교하여 내 data 데이터 프레임에 열 자동차와 밴을 추가 할 수 아래.

열이 동일하면 두 데이터 프레임을 비교하고 싶지만 열이 다른 경우 열이없는 데이터 프레임에 열을 추가하십시오.

어떻게 우리가 pyspark에서이를 달성 할 수 있습니까? 열이 상기 데이터 프레임에 추가되면

는 FYI I 스파크 1.6

를 사용하고있다. 새로 추가 된 데이터 프레임의 해당 열 값은 null이어야합니다.

여기 예를 들어 데이터 데이터 프레임의 열 자동차와 밴이 null 값을 포함해야하지만, DF 데이터 프레임에서 같은 열이 경우 발생하는 원래 값

을 가져야한다, 그래서 우리는 data 데이터 프레임에 열을 추가 스키마가 아니라 StructFields의 목록으로 구성된 StructType, 우리는 비교하고 누락 된 열을 찾기 위해, 필드 목록을 검색 할 수 있습니다으로 2 개 이상의 새로운 열이,

답변

0

를 추가 할있다

df_schema = df.schema.fields 
data_schema = data.schema.fields 
df_names = [x.name.lower() for x in df_scehma] 
data_names = [x.name.lower() for x in data_schema] 
if df_schema <> data_schema: 
    col_diff = set(df_names)^set(data_names)  
    col_list = [(x[0].name,x[0].dataType) for x in map(None,df_schema,data_schema) if ((x[0] is not None and x[0].name.lower() in col_diff) or x[1].name.lower() in col_diff)] 
    for i in col_list: 
     if i[0] in df_names: 
      data = data.withColumn("%s"%i[0],lit(None).cast(i[1])) 
     else: 
      df = df.withColumn("%s"%i[0],lit(None).cast(i[1])) 
else: 
    print "Nothing to do" 

null 값이 없지만 스키마 차이가 Null 허용 열인 경우 열을 추가하기 위해 언급 했으므로 해당 검사를 사용하지 않았습니다. 당신이 그것을 필요로하는 경우에 당신이 여러 테이블에이 작업을 수행 할 경우,

col_list = [(x[0].name,x[0].dataType) for x in map(None,df_schema,data_schema) if (x[0].name.lower() in col_diff or x[1].name.lower() in col_diff) and not x.nullable] 

이 StructType 및 StructFields에 대한 자세한 설명서를 확인하십시오, 다음과 같이 널 (NULL)에 대한 https://spark.apache.org/docs/1.6.2/api/python/pyspark.sql.html#pyspark.sql.types.StructType

+0

위의 대답에는 차이로 2 개의 열만 있습니다. 2 개 이상의 열이있는 경우는 어떻게됩니까? 어떻게 우리가 그들을 동적으로 전달할 수 있습니까? – User12345

+0

위 코드는 두 스키마의 차이를 동적으로 취합니다. 몇 개의 칼럼이 차이점으로 존재 하는가? 모든 것이 고려됩니다. – Suresh

+0

@Suresh 초기 데이터 프레임이 하이브 테이블에서 왔다고 가정합니다. 어떻게 테이블을 변경할 수 있습니까? 데이터 프레임에 열을 추가하는 대신 Null 값을 사용하여 기존 하이브 테이블에 추가 할 수 있습니까? –

0

을 체크를 추가, 그것은 수 있습니다 코드를 좀 더 일반화 할만한 가치가 있습니다. 이 코드는 일치하지 않는 원본 열의 첫 번째 null이 아닌 값을 사용하여 대상 테이블에 새 열을 만듭니다.

from pyspark.sql.functions import lit, first 

def first_non_null(f,t): # find the first non-null value of a column 
    return f.select(first(f[t], ignorenulls=True)).first()[0] 

def match_type(f1,f2,miss): # add missing column to the target table 
    for i in miss: 
     try: 
      f1 = f1.withColumn(i, lit(first_non_null(f2,i))) 
     except: 
      pass 
     try: 
      f2 = f2.withColumn(i, lit(first_non_null(f1,i))) 
     except: 
      pass 
    return f1, f2 

def column_sync_up(d1,d2): # test if the matching requirement is met 
    missing = list(set(d1.columns)^set(d2.columns)) 
    if len(missing)>0: 
     return match_type(d1,d2,missing) 
    else: 
     print "Columns Match!" 

df1, df2 = column_sync_up(df1,df2) # reuse as necessary 
관련 문제