저는 Pyspark와 협력 중이며 이전 열과 복잡한 계산 방법을 알아 내려고합니다. 일반적으로 이전 열의 계산에는 Windows와 mapwithPartition이라는 두 가지 방법이 있다고 생각합니다. 내 문제는 너무 복잡해서 창문으로 해결할 수 없다고 생각합니다. 그 결과를 열이 아닌 sepreate 행으로 원합니다. 그래서 mapwithpartition을 사용하려고합니다. 이 문법에 문제가 있습니다. 예를 들어 코드의 대략적인 초안이 있습니다.이전 행이있는 pyspark 데이터 프레임 복잡한 계산
그러나 prev_rows의 단일 데이터에 액세스 할 수 없습니다. prev_rows [0]은 itertools.chain과 같습니다. 어떻게 이것을 반복합니까? prev_rows [0]? 결국
neighbor = sc.broadcast(df_sliced.where(df_sliced.id == neighbor_idx).collect()[0][:-1]).value
current = df_sliced.where(df_sliced.id == i)
def oversample_dt(dataframe):
for row in dataframe:
new_row = []
for entry, neigh in zip(row, neighbor):
if isinstance(entry, str):
if scale < 0.5:
new_row.append(entry)
else:
new_row.append(neigh)
else:
if isinstance(entry, int):
new_row.append(int(entry + (neigh - entry) * scale))
else:
new_row.append(entry + (neigh - entry) * scale)
yield new_row
sttt = time.time()
sample = current.rdd.mapPartitions(oversample_dt).toDF(schema)
편집, 지금이 같은 일을 결국,하지만 난 정말 첫 번째 행에서 수집 사용하지 않습니다. 누군가이 문제를 해결하는 방법을 알고 있다면/pyspark를 사용할 때의 문제를 지적하십시오.
EDIT2 --Suppose 앨리스, 그리고 이웃 Alice_2
scale = 0.4
+---+-------+--------+
|age| name | height |
+---+-------+--------+
| 10| Alice | 170 |
| 11|Alice_2| 175 |
+---+-------+--------+
그런 다음, 나는 행을
+---+-------+----------------------------------+
|age | name | height |
+---+-------+---------------------------------+
| 10+1*0.4 | Alice_2 | 170 + 5*0.4 |
+---+-------+---------------------------------+
@LA와의 토론 혼란 스럽네. 예제 입력 행과 예상 결과를 제공 할 수 있습니까? –
@ TwUxTLi51Nus 질문에 임시 코드를 추가했습니다. –
이것은 나를 위해 명확하지 않습니다, 미안 해요. Q1 : 원칙적으로'pyspark.sql.DataFrame' 또는'pyspark.RDD'를 사용 하시겠습니까? 'DataFrame'은 stronly 형식이므로'isinstance (entry, str)'는 의미가 없습니다. 전체 열이'str'이거나 그렇지 않습니다. 'for' 루프 내의 논리는 새로운 열 대신에 새로운 행을 얻기위한'join'과 함께'Window'와 함께 사용하는 것이 완벽하게 잘 보입니다. 그러나 샘플 입력 행과 예상 출력을 게시 한 경우 도움이 훨씬 쉬울 것입니다 (실제 데이터를 게시 할 필요가 없습니다.) –