7

블루투스 저에너지와 관련된 특정 요구 사항이있는 안드로이드 애플리케이션을 만들고 있습니다.지속적인 연결을 유지하는 RxAndroidBle + 쓰기/알림 처리

쓰기 전용 특성을 작성하고 별도의 통지 특성으로 응답을 수신해야하므로 많은 활동에서이를 수행해야합니다. 첫 번째 특성에 대한 요청을 보내고 두 번째 특성에 대한 응답을 기다린 다음 다른 요청으로 진행하는 Rx 방법이 있습니까?

또한 RxAndroidBle의 인스턴스를 공유하기 위해 Observables를 노출시키는 BleManager Singleton을 생각 했으므로 발표자에서 쉽게 구독 할 수 있습니다. 난 그냥 각 활동에 대한 연결 논리를 복사하고 (이상적으로) 지속적인 연결이 필요하지 않도록하고 싶습니다. 이 방법을 사용하면 connectionObservable 만 노출하고 구독 할 수 있으므로 쉽게 쓰기 요청을 보내고 알림을받을 수 있지만 더 좋은 방법은 있습니다.

이것은 내가 지금 무엇을 가지고 :

@Singleton 
public class BleManager { 

    private PublishSubject<Void> disconnectTriggerSubject = PublishSubject.create(); 
    private Observable<RxBleConnection> connectionObservable; 
    private boolean isConnected; 

    private final UUID CTRL_FROM_BRIDGE_UUID = UUID.fromString("someUUID"); 
    private final UUID BLE_WRITE_CHARACTERISTIC_UUID = UUID.fromString("someOtherUUID"); 

    private final RxBleClient bleClient; 
    private String mMacAddress; 
    private final Context context; 
    private RxBleDevice bleDevice; 

    @Inject 
    public BleManager(Context context, RxBleClient client) { 
    Timber.d("Constructing BleManager and injecting members"); 
    this.context = context; 
    this.bleClient = client; 
    } 

    public void setMacAddress(String mMacAddress) { 
    this.mMacAddress = mMacAddress; 

    // Set the associated device on MacAddress change 
    bleDevice = bleClient.getBleDevice(this.mMacAddress); 
    } 

    public String getMacAddress() { 
    return mMacAddress; 
    } 

    public RxBleDevice getBleDevice() { 
    Preconditions.checkNotNull(mMacAddress); 
    return bleClient.getBleDevice(mMacAddress); 
    } 

    public Observable<RxBleScanResult> getScanSubscription() { 
    Preconditions.checkNotNull(context); 
    Preconditions.checkNotNull(bleClient); 

    return bleClient.scanBleDevices().distinct(); 
    } 

    public Observable<RxBleConnection> getConnectionSubscription() { 
    Preconditions.checkNotNull(context); 
    Preconditions.checkNotNull(bleDevice); 

    if (connectionObservable == null) { 
     connectionObservable = bleDevice.establishConnection(context, false) 
             .takeUntil(disconnectTriggerSubject) 
             .observeOn(AndroidSchedulers.mainThread()) 
             .doOnUnsubscribe(this::clearSubscription) 
             .compose(new ConnectionSharingAdapter()); 
    } 

    return connectionObservable; 
    } 

    public Observable<byte[]> setupListeners() { 
    return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.setupNotification(CTRL_FROM_BRIDGE_UUID)) 
           .doOnNext(notificationObservable -> Timber.d("Notification Setup")) 
           .flatMap(notificationObservable -> notificationObservable) 
           .observeOn(AndroidSchedulers.mainThread()); 
    } 

    private void triggerDisconnect() { 
    disconnectTriggerSubject.onNext(null); 
    } 


    public Observable<byte[]> writeBytes(byte[] bytes) { 
    return connectionObservable.flatMap(rxBleConnection -> rxBleConnection.writeCharacteristic(
     BLE_WRITE_CHARACTERISTIC_UUID, 
     bytes)).observeOn(AndroidSchedulers.mainThread()); 
    } 

    private boolean isConnected() { 
    return bleDevice.getConnectionState() == RxBleConnection.RxBleConnectionState.CONNECTED; 
    } 

    /** 
    * Will update the UI with the current state of the Ble Connection 
    */ 
    private void registerConnectionStateChange() { 
    bleDevice.observeConnectionStateChanges().observeOn(AndroidSchedulers.mainThread()).subscribe(connectionState -> { 
     isConnected = connectionState.equals(RxBleConnection.RxBleConnectionState.CONNECTED); 
    }); 
    } 

    private void clearSubscription() { 
    connectionObservable = null; 
    } 

} 

답변

4

나는 당신의 사용 사례에 대해 조금 생각했다. 동일한 연결을 공유하면 응용 프로그램에 상태를 약간의 상태 처리가 필요하기 때문에 상태를 가져 오는 것이므로 순수하게 반응 할 수 없다는 (또는 적어도 어떻게되는지는 모릅니다).

나는 직렬화 된 BLE 장치에 연결을 설정하고 쓰기 알림 전송을 수행하는 데 중점을 두었습니다.

private PublishSubject<Pair<byte[], Integer>> inputSubject = PublishSubject.create(); 

private PublishSubject<Pair<byte[], Integer>> outputSubject = PublishSubject.create(); 

private Subscription connectionSubscription; 

private volatile int uniqueId = 0; // used to identify the transmission we're interested in in case more than one will be started at the same time 

public void connect() { 
    Observable<RxBleConnection> connectionObservable = // your establishing of the connection wether it will be through scan or RxBleDevice.establishConnection() 
    final UUID notificationUuid = // your notification characteristic UUID 
    final UUID writeUuid = // your write-only characteristic UUID 

    connectionSubscription = connectionObservable 
      .flatMap(
        rxBleConnection -> rxBleConnection.setupNotification(notificationUuid), // subscribing for notifications 
        (rxBleConnection, notificationObservable) -> // connection is established and notification prepared 
          inputSubject // waiting for the data-packet to transmit 
            .onBackpressureBuffer() 
            .flatMap(bytesAndFilter -> { 
               return Observable.combineLatest(// subscribe at the same time to 
                 notificationObservable.take(1), // getting the next notification bytes 
                 rxBleConnection.writeCharacteristic(writeUuid, bytesAndFilter.first), // transmitting the data bytes to the BLE device 
                 (responseBytes, writtenBytes) -> responseBytes // interested only in the response bytes 
               ) 
                 .doOnNext(responseBytes -> outputSubject.onNext(new Pair<>(responseBytes, bytesAndFilter.second))); // pass the bytes to the receiver with the identifier 
              }, 
              1 // serializing communication as only one Observable will be processed at the same time 
            ) 
      ) 
      .flatMap(observable -> observable) 
      .subscribe(
        response -> { /* ignored here - used only as side effect with outputSubject */ }, 
        throwable -> outputSubject.onError(throwable) 
      ); 
} 

public void disconnect() { 
    if (connectionSubscription != null && !connectionSubscription.isUnsubscribed()) { 
     connectionSubscription.unsubscribe(); 
     connectionSubscription = null; 
    } 
} 

public Observable<byte[]> writeData(byte[] data) { 
    return Observable.defer(() -> { 
       final int uniqueId = this.uniqueId++; // creating new uniqueId for identifying the response 
       inputSubject.onNext(new Pair<>(data, uniqueId)); // passing the data with the id to be processed by the connection flow in connect() 
       return outputSubject 
         .filter(responseIdPair -> responseIdPair.second == uniqueId) 
         .first() 
         .map(responseIdPair -> responseIdPair.first); 
      } 
    ); 
} 

내가 전체 흐름을 한 곳에서 설명하고 이해하기 때문에 쉽게 좋은 생각 접근 방식이다. 상태 저장 (쓰기 요청 및 응답 대기) 중 일부는 직렬화되며 disconnect() 호출까지 연결을 유지할 가능성이 있습니다.

단점은 연결이 설정되기 전에 전송이 다른 흐름의 부작용에 의존하고 writeData()을 호출하고 통지 설정이 반환 된 관찰 가능을 결코 완료하지 않는다는 것입니다. 그러나이 시나리오에 대한 처리를 추가하는 것은 문제가되지 않습니다 상태 확인.

최고 감사합니다.

+0

대단합니다. 해결책을 찾아보고 어떻게 진행되는지 알려 드리겠습니다. 내가 Rx 세계에 새로운 사람이고, 아직도 배우는 것에 따라 그것은 정말로 인정 받는다! –

+3

제안 된 솔루션을 사용해 보셨습니까? 의도 한대로 작동 했습니까? –

관련 문제