2017-12-13 1 views
0

Spark 1.6.2 코드를 Java의 Spark 2.2.0에 전달해야합니다.특정 코드 조각을 Spark 1.6.2에서 Spark 2.2.0으로 변환하는 방법은 무엇입니까?

DataFrame eventsRaw = sqlContext.sql("SELECT * FROM my_data"); 
Row[] rddRows = eventsRaw.collect(); 
for (int rowIdx = 0; rowIdx < rddRows.length; ++rowIdx) 
{ 
    Map<String, String> myProperties = new HashMap<>(); 
    myProperties.put("startdate", rddRows[rowIdx].get(1).toString()); 
    JEDIS.hmset("PK:" + rddRows[rowIdx].get(0).toString(), myProperties); // JEDIS is a Redis client for Java 
} 

는 지금까지 내가 이해, Java 용 스파크 2.2.0에는 DataFrame이 없습니다. Dataset 만 그러나 DataFrameDataset으로 대체하면 Row[] 대신 Object[]이 출력되고 eventsRaw.collect()이됩니다. 그런 다음 get(1)이 빨간색으로 표시되고 코드를 컴파일 할 수 없습니다.

어떻게 올바르게 할 수 있습니까?

답변

2

DataFrame (스칼라) Dataset<Row>입니다 :

SparkSession spark; 

... 

Dataset<Row> eventsRaw = spark.sql("SELECT * FROM my_data"); 

대신 collect 당신이 오히려 (게으른 싱글 연결을 사용) foreach를 사용해야합니다

eventsRaw.foreach(
    (ForeachFunction<Row>) row -> ... // replace ... with appropriate logic 
); 

또는 foreachPartition는 (각 파티션에 대한 연결을 초기화) :

eventsRaw.foreachPartition((ForeachPartitionFunction<Row)) rows -> { 
    ... // replace ... with appropriate logic 
}) 
+0

'collect' (컴파일)에서 작동합니다. foreach를 사용하면 무엇을 의미합니까? – Markus

+0

예,하지만'JEDIS'는 직렬화 할 수 없습니다. – Markus

관련 문제