2017-01-19 6 views
1

다운 스트림 사업자 (SQLite는) 거래 전 항목 중 일부 스트림이 가정 :데이터베이스 rxJava

Observable<Item> stream = ... 

난 달성하기 위해 노력하고 무엇을?

  • 스트림에는 모든 조작이 있습니다. 트랜잭션을 시작하기 전에 모든 작업은 경우에 완료 될 한
  • 은 어떻게 든 db.beginTransaction()
  • 가 거래를 시작한 후 모든 사업자는
  • 거래 트랜잭션 내에서 실행되어야한다 스트림의 중간에 거래를 시작 트랜잭션 외부에서 실행해야합니다 성공적인 작업의 db.setTransactionSuccessful
  • 거래는 항상 두 조각 가지고 좋은 것 db.endTransaction
  • 종료해야 : 오픈 한 다운 스트림 작업의 모든 항목에 대한 거래;
    //some upstream operators 
    stream.doOnNext(i -> ...) 
        .map(i -> ...) 
        //somehow start transaction here 
        //operator inside transaction. All database changes will be reverted in case error 
        .doOnNext(i -> /*database ops*/) 
        .subscribe() 
    

    PS

스트림의 항목에 대한 개폐 트랜잭션 DB는 응용 프로그램이 지금은 해결책을 가지고

쓰기 SQLiteDatabase의 인스턴스를 범위입니다. 하지만 더 깨끗한 방법에 대한 제안이 있습니까?

+0

소스로부터 오는'Item' 각각에 대해 별도의 트랜잭션을 원하십니까? '데이터베이스 '란 무엇입니까? 이 특정 항목 또는 일부 전역 변수에 대한 연결입니까? 그것이 글로벌 일 경우 트랜잭션 또한 글로벌 일 것입니다.모든 항목이 처리 될 때까지 거래를 지속 하시겠습니까? –

+0

'database'는 쓰기 가능한'SQLiteDatabase'의 전역 인스턴스 (응용 프로그램 범위)입니다. 스트림의 모든 항목을 처리하기 위해 하나의 트랜잭션을 열고 싶습니다. 그러나 각 항목에 대한 새로운 트랜잭션도 흥미로운 접근 방식입니다. 그러한 거래가 세계적이라고 언급했는데, 제한적입니까? – Beloo

+1

글로벌 거래가 이상하게 보입니다. 이는 단일 트랜잭션이 동시에 프로그램에서 발생하는 모든 데이터베이스 활동을 포함하게된다는 것을 의미합니다. 결과적으로 정확하게 거래를 시작할 때 많은 문제가 발생하지 않아야합니다. 파이프 라인의 모든 작업은 각 항목에 대해 반복됩니다. 첫 번째 항목은 트랜잭션을 열고, 후속 항목에 대한 * 모든 * 작업은 해당 트랜잭션 내에있게됩니다. –

답변

2

1) 일반적

class ItemWithTransaction { 
    Item item; 
    Connection conn; // connection associated with this item 
    boolean rollback; 
} 

stream 
    .map(i -> new ItemWithTransaction(i, openTransaction())) 
    .map(i -> i.conn.executeSql(..., i.item.prop1)) 
    . ... 
    .map(i -> { 
     if (...) i.rollback = true; // error related to this item 
     return i; 
    }) 
    . ... 
    .subscribe(i -> { 
      ... 
      if (i.rollback) i.conn.rollback(); 
      else i.conn.commit(); 
      i.conn.close(); 
     }, 
     e -> rollbackAndCloseAllOpenConnections(), 
     () -> {...}) 

I는 않을 것 너무 많은 (제어되지 않은) 동시 데이터베이스 연결이 필요할 수 있으므로 두 번째 방법을 사용하십시오.

3) 필요한 모든 정보를 먼저 수집 한 다음 간단한 트랜잭션으로 한 번에 데이터베이스 업데이트를 수행하는 것이 좋습니다. 이것은 내가 어떻게 할 것인가입니다 :

stream 
    . ... // processing 
    .buffer(...) // collect updates all or in batches 
    .subscribe(Collection<ItemUpdate> batch -> { 
      database.beginTransaction(); 
      try { 
       ... // update multiple items 
       database.setTransactionSuccessful(); 
      } finally { 
       database.endTransaction(); 
      } 
     }, 
     e -> {...}, 
     () -> {...}); 
+0

라고 생각합니다. 첫 번째 접근법이 더 좋지만 구현시 데이터베이스 연결이'doOnSubscribe'에서 열리므로 모든 스트림 작업에 영향을 미칩니다 (업스트림 포함). 예를 들어'Retrofit'에서 스트림이 시작되면 관찰 할 수있는 시간보다 훨씬 더 오래 트랜잭션을 열게되므로 그 이유는'doOnNext'를 사용했기 때문입니다. – Beloo

+0

@Beloo 스트림에 하나 이상의 항목이있는 경우 어쨌든 업스트림에 영향을 미칩니다. 내 대답에 3 번으로 실제 솔루션을 추가했습니다. –

1

나는 스트림의 모든 항목에 하나 거래를 달성하기 위해 변압기를 만들었습니다

/** @return transformer which starts SQLite database transaction for each downstream operators, 
* closes transaction in {@link Observable#doOnTerminate}. So transaction will be closed either on successful completion or error of stream. 
* set previously opened SQLite database transaction to completed in {@link Observable#doOnCompleted} call */ 
public <T> Observable.Transformer<T, T> inTransaction() { 
    return observable -> observable 
      .doOnNext(o -> { 
       if (!database.inTransaction()) database.beginTransaction(); 
      }) 
      .doOnCompleted(() -> { 
       if (database.inTransaction()) database.setTransactionSuccessful(); 
      }) 
      .doOnTerminate(() -> { 
       if (database.inTransaction()) database.endTransaction(); 
      }); 

그리고 전화 : 내가 .doOnNext에서 트랜잭션을 시작하고 각 시간을 확인

stream 
    //start transaction here 
    .compose(inTransaction()) 
    .doOnNext(i -> /*database ops*/) 
    .subscribe() 

주 거래가 이미 시작된 경우 처음에는 전화를 걸 수없는 것 같습니다. 각 항목에 대한 별도의 트랜잭션이있을 때

stream 
    .doOnSubscribe(d -> database.beginTransaction()) 
    . ... 
    .subscribe(v -> {...}, 
     e -> database.endTransaction(), 
     () -> { database.setTransactionSuccessful(); database.endTransaction(); }) 

2) 케이스의 경우 : 모든 항목이 단일 트랜잭션에서 처리되는 경우에 대해

+0

정상적으로 작동합니까? 나는'ActiveAndroid'를 사용합니다. @Beloo –

+0

예, 예상대로 작동하는 것 같습니다. 녹색 DAO 데이터베이스를 가지고 생각하면 – Beloo