내 dStream.foreachRDD 메서드 내에 처리 블록이 있고 그 처리에는 spark sql을 사용하여 mysql에 대한 지속성이 포함됩니다. 그 게시, 나는 다른 스키마/테이블에서 최신 처리 오프셋을 유지하고있다. 전체 블록을 트랜잭션 (스칼라)로 만들고 싶습니다. 그것을 성취하는 방법? 코드에서 발췌 한 내용은 다음과 같습니다.트랜잭션 블록 | Spark SQL, rdd
.foreachRDD (rdd => {{... ................. ..................
df.write.mode ("append") .jdbc (url + rawstore_schema + "? rewriteBatchedStatements = true", tablesToFetch (index) , connectionProperties)
.................... metricsStatement.executeUpdate ("metrics.txn_offsets (topic, part, off, date_updated) 값에 삽입하십시오. ..........................
}
쓰기 작업 (처리 된 데이터와 오프셋 된 데이터)이 서로 다른 두 데이터베이스/연결에서 수행되므로 트랜잭션을 만드는 방법은 무엇입니까?
감사합니다.