2016-09-13 2 views
1

내 dStream.foreachRDD 메서드 내에 처리 블록이 있고 그 처리에는 spark sql을 사용하여 mysql에 대한 지속성이 포함됩니다. 그 게시, 나는 다른 스키마/테이블에서 최신 처리 오프셋을 유지하고있다. 전체 블록을 트랜잭션 (스칼라)로 만들고 싶습니다. 그것을 성취하는 방법? 코드에서 발췌 한 내용은 다음과 같습니다.트랜잭션 블록 | Spark SQL, rdd

.foreachRDD (rdd => {{... ................. ..................

df.write.mode ("append") .jdbc (url + rawstore_schema + "? rewriteBatchedStatements = true", tablesToFetch (index) , connectionProperties)

.................... metricsStatement.executeUpdate ("metrics.txn_offsets (topic, part, off, date_updated) 값에 삽입하십시오. ..........................

}

쓰기 작업 (처리 된 데이터와 오프셋 된 데이터)이 서로 다른 두 데이터베이스/연결에서 수행되므로 트랜잭션을 만드는 방법은 무엇입니까?

감사합니다.

답변

1

나는 동일한 질문을했습니다. Spark 코드 (최대 v2.1)를 살펴보면 트랜잭션 관리를 지정할 수있는 옵션이 없습니다.

내 대답은 여기에 있습니다 : https://stackoverflow.com/a/42964361/47551

관련 문제