2017-10-19 2 views
0

저는 Pyspark와 협력 중이며 이전 열과 복잡한 계산 방법을 알아 내려고합니다. 일반적으로 이전 열의 계산에는 Windows와 mapwithPartition이라는 두 가지 방법이 있다고 생각합니다. 내 문제는 너무 복잡해서 창문으로 해결할 수 없다고 생각합니다. 그 결과를 열이 아닌 sepreate 행으로 원합니다. 그래서 mapwithpartition을 사용하려고합니다. 이 문법에 문제가 있습니다. 예를 들어 코드의 대략적인 초안이 있습니다.이전 행이있는 pyspark 데이터 프레임 복잡한 계산

그러나 prev_rows의 단일 데이터에 액세스 할 수 없습니다. prev_rows [0]은 itertools.chain과 같습니다. 어떻게 이것을 반복합니까? prev_rows [0]? 결국

  neighbor = sc.broadcast(df_sliced.where(df_sliced.id == neighbor_idx).collect()[0][:-1]).value 
      current = df_sliced.where(df_sliced.id == i) 
      def oversample_dt(dataframe): 
       for row in dataframe: 
        new_row = [] 
        for entry, neigh in zip(row, neighbor): 
         if isinstance(entry, str): 
          if scale < 0.5: 
           new_row.append(entry) 
          else: 
           new_row.append(neigh) 
         else: 
          if isinstance(entry, int): 
           new_row.append(int(entry + (neigh - entry) * scale)) 
          else: 
           new_row.append(entry + (neigh - entry) * scale) 
        yield new_row 
      sttt = time.time() 
      sample = current.rdd.mapPartitions(oversample_dt).toDF(schema) 

편집, 지금이 같은 일을 결국,하지만 난 정말 첫 번째 행에서 수집 사용하지 않습니다. 누군가이 문제를 해결하는 방법을 알고 있다면/pyspark를 사용할 때의 문제를 지적하십시오.

EDIT2 --Suppose 앨리스, 그리고 이웃 Alice_2

scale = 0.4 
+---+-------+--------+ 
|age| name | height | 
+---+-------+--------+ 
| 10| Alice | 170 | 
| 11|Alice_2| 175 | 
+---+-------+--------+ 

그런 다음, 나는 행을

+---+-------+----------------------------------+ 
|age  | name   | height  | 
+---+-------+---------------------------------+ 
| 10+1*0.4 | Alice_2   | 170 + 5*0.4 | 
+---+-------+---------------------------------+ 
+0

@LA와의 토론 혼란 스럽네. 예제 입력 행과 예상 결과를 제공 할 수 있습니까? –

+0

@ TwUxTLi51Nus 질문에 임시 코드를 추가했습니다. –

+0

이것은 나를 위해 명확하지 않습니다, 미안 해요. Q1 : 원칙적으로'pyspark.sql.DataFrame' 또는'pyspark.RDD'를 사용 하시겠습니까? 'DataFrame'은 stronly 형식이므로'isinstance (entry, str)'는 의미가 없습니다. 전체 열이'str'이거나 그렇지 않습니다. 'for' 루프 내의 논리는 새로운 열 대신에 새로운 행을 얻기위한'join'과 함께'Window'와 함께 사용하는 것이 완벽하게 잘 보입니다. 그러나 샘플 입력 행과 예상 출력을 게시 한 경우 도움이 훨씬 쉬울 것입니다 (실제 데이터를 게시 할 필요가 없습니다.) –

답변

0

왜 사용하지 dataframes 줄까?

같은 윈도우 함수를 사용하여 이전 값으로 dataframe에 열을 추가

from pyspark.sql import SparkSession, functions 
from pyspark.sql.window import Window 

spark_session = SparkSession.builder.getOrCreate() 

df = spark_session.createDataFrame([{'name': 'Alice', 'age': 1}, {'name': 'Alice_2', 'age': 2}]) 

df.show() 

+ --- + ------- +
| 나이 | 이름 |
+ --- + ------- +
| 1 | 앨리스 |
| 2 | Alice_2 |
+ --- + ------- +

window = Window.partitionBy().orderBy('age') 
df = df.withColumn("age-1", functions.lag(df.age).over(window)) 

df.show() 

당신은 모든 컬럼에 대해이 기능을 사용할 수 있습니다
+ --- + ------- + ----- +
| 연령 | 이름 | 나이 1 |
+ --- + ------- + ----- +
| 1 | 앨리스 | null |
| 2 | Alice_2 | 1 |
+ --- + ------- + ----- +

는 당신의 수학

을 그리고 당신은 RDD를 사용하려는 경우, 그럼 그냥 사용 df.rdd

+0

으로 레코드를 만들고 싶습니다. 창을 사용하고 그런 열을 추가하면 계산을 할 때마다 데이터를 수집해야하므로 전반적인 작업이 너무 느려집니다. –

+0

그래서 파이썬을 사용하고 싶습니다. 스파크가 미적분을하지 않기를 바랍니다. 그렇습니까? 데이터 프레임의 toPandas 메소드 사용은 어떻습니까? –

+0

아니요 spark를 사용하고 싶습니다. –

관련 문제