2017-11-11 4 views
0

UDF에서 for 루프를 사용하여 열 하나씩 목록을 보내려고하지만 오류가 발생했습니다. 즉 데이터 프레임에서 col_name을 찾을 수 없습니다. 현재 목록에 list_col 우리는 두 개의 열이 있지만 그것은 변경 될 수 있습니다. 그래서 난 열의 모든 목록에 대해 작동하는 코드를 작성하고 싶습니다.이 코드에서 한 번에 열의 한 행을 연결하고 행 값은 구조체 형식 즉 목록 안에 나열하십시오. 모든 null에 대해 나는 공간을 주어야한다.Pyspark : UDF에서 동적 열 전달

list_col=['pcxreport','crosslinediscount'] 
    def struct_generater12(row): 
    list3 = [] 
    main_str = '' 
    if(row is None): 
     list3.append(' ') 
    else: 
     for i in row: 
      temp = '' 
      if(i is None): 
       temp+= ' ' 
      else: 
       for j in i: 
        if (j is None): 
         temp+= ' ' 
        else: 
         temp+= str(j) 
      list3.append(temp) 
    for k in list3: 
     main_str +=k 
    return main_str 


    A = udf(struct_generater12,returnType=StringType()) 
    # z = addlinterestdetail_FDF1.withColumn("Concated_pcxreport",A(addlinterestdetail_FDF1.pcxreport)) 
    for i in range(0,len(list_col)-1): 
     struct_col='Concate_' 
     struct_col+=list_col[i] 
     col_name=list_col[i] 
     z = addlinterestdetail_FDF1.withColumn(struct_col,A(addlinterestdetail_FDF1.col_name)) 
     struct_col='' 

    z.show() 

답변

1

addlinterestdetail_FDF1.col_name는 변수 col_name에 포함 된 문자열에 접근하지 않는, 열이 "col_name"라는 것을 의미한다.

컬럼에 UDF를 호출 할 수

  • 직접 문자열 이름 사용 : A(col_name)
  • 또는 pyspark의 SQL 함수 col 사용

    import pyspark.sql.functions as psf 
    z = addlinterestdetail_FDF1.withColumn(struct_col,A(psf.col(col_name))) 
    

을 당신이해야 UDF를 쓰는 대신 연결을 위해 pyspark SQL 함수를 사용하는 것을 고려하십시오. 우리는 중첩 된 열 이름과 사전 쓸 것이다

import json 
j = {'pcxreport':{'a': 'a', 'b': 'b'}, 'crosslinediscount':{'c': 'c', 'd': None, 'e': 'e'}} 
jsonRDD = sc.parallelize([json.dumps(j)]) 
df = spark.read.json(jsonRDD) 
df.printSchema() 
df.show() 

    root 
    |-- crosslinediscount: struct (nullable = true) 
    | |-- c: string (nullable = true) 
    | |-- d: string (nullable = true) 
    | |-- e: string (nullable = true) 
    |-- pcxreport: struct (nullable = true) 
    | |-- a: string (nullable = true) 
    | |-- b: string (nullable = true) 

    +-----------------+---------+ 
    |crosslinediscount|pcxreport| 
    +-----------------+---------+ 
    |  [c,null,e]| [a,b]| 
    +-----------------+---------+ 

: 첫 번째의 중첩 구조와 샘플 dataframe 만들 수 있도록,

list_col=['pcxreport','crosslinediscount'] 
list_subcols = dict() 
for c in list_col: 
    list_subcols[c] = df.select(c+'.*').columns 

지금 우리가 StructType을 "결합"할 수 ' 'None을 대체하고, 연결 :

import itertools 
import pyspark.sql.functions as psf 
df.select([c + '.*' for c in list_col])\ 
    .na.fill({c:' ' for c in list(itertools.chain.from_iterable(list_subcols.values()))})\ 
    .select([psf.concat(*sc).alias(c) for c, sc in list_subcols.items()])\ 
    .show() 

    +---------+-----------------+ 
    |pcxreport|crosslinediscount| 
    +---------+-----------------+ 
    |  ab|    c e| 
    +---------+-----------------+ 
+0

감사합니다. 저에게 도움이되었습니다. –

+0

@RahulKumarSingh [Accepting the 답변] (https://stackoverflow.com/help/someone-answers). – Prem

+0

목록에 내가 하나의 데이터 프레임에있는 모든 데이터 프레임을 병합해야 얼마나 많은 데이터 프레임이 있습니다. 목록의 길이가 고정되어 있지 않습니다 ................... 감사합니다 사전입니다 –