Faire progresser TestScheduler de RxJava pour un appel de blocage pour retarder ()

voix
0

En utilisant RxJava2 J'ai une classe qui fait le pont réactif au monde non réactif. Dans cette classe, il y a une méthode surchargée, qui retourne une valeur après un traitement potentiellement longue . Ce traitement est intégré au monde réactif à l' aide Single.create(emitter -> ...).

Le temps de traitement est crucial pour moi, car il y a une différence en ce qui concerne la logique métier, si un second abonné apparaît alors que le traitement est en cours d'exécution ou après avoir terminé (ou annulé).

Maintenant , je suis en testant cette classe - donc, j'ai besoin de temps de traitement pour être seulement virtuelle . RxJava de TestSchedulerà la rescousse!

Pour créer une application de test pour cette classe de base de pont, qui émule le retard, je besoin d'un blocage (!) Faire appel au TestScheduler, où l'avancement dans le temps est déclenché sur un thread séparé. (Sinon, l'avancement dans le temps ne serait jamais déclenché.)

Mon problème est que je dois le faire avancer dans le temps pour retarder au moins jusqu'à ce que le « calcul coûteux » bloque - de autre temps se changer avant le délai opérateur obtient actif et , par conséquent, il attend jusqu'à ce qu'une autre promotion qui ne se produira jamais.

Je peux résoudre ce problème en appelant Thread.sleep(100)avant que l'appel pour faire avancer dans le temps - mais qui me semble un peu laid ... Combien de temps attendre? Pour plusieurs tests, que le temps ajoute, mais à défaut en raison de problèmes de synchronisation ... ugh.

Toute idée comment tester cette situation d'une manière plus propre , espérons? Merci!

import io.reactivex.Completable;
import io.reactivex.Single;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.schedulers.TestScheduler;

import java.util.concurrent.TimeUnit;

public class TestSchedulerBlocking {

    public static void main(String[] args) throws Exception {

        TestScheduler testScheduler = new TestScheduler();

        // the task - in reality, this is a task with a overridden method returning some value, where I need to delay the return in a test-implementation.
        Single<String> single = Single.create(emitter -> {

            System.out.println(emitting on  + Thread.currentThread() +  ...);

            // This is the core of my problem: I need some way to test a (synchronous) delay before the next line executes
            // (e.g. method returns, or, in another case, an exception is thrown).
            // Thread.sleep(<delay>) would be straight forward, but I want to use TestScheduler's magic time-advancing in my tests...
            // Using the usual non-blocking methods of RX, everything works fine for testing using the testScheduler, but here it doesn't help.
            // Here I need to synchronize it somehow, that the advancing in time is done *after* this call is blocking.
            // I tried a CountDownLatch in doOnSubscribe, but apparently, that's executed too early for me - I'd need doAfterSubscribe or so...
            // Final word on architecture: I'm dealing with the bridging of the reactive to the non-reactive world, so I can't just change the architecture.
            Completable.complete()
                       .delay(3, TimeUnit.SECONDS, testScheduler)
                       .doOnSubscribe(__ -> System.out.println(onSubcribe: completable))
                       .blockingAwait();

            System.out.println(<< blocking released >>);

            // this has to be delayed! Might be, for example, a return or an exception in real world.
            emitter.onSuccess(FooBar!);

        });


        System.out.println(running the task ...);
        TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println(onSubcribe: single))
                                                  .subscribeOn(Schedulers.newThread())
                                                  .test();

        // Without this sleep, the advancing in testScheduler's time is done *before* the doIt() is executed,
        // and therefore the blockingAwait() blocks until infinity...
        System.out.println(---SLEEPING---);
        Thread.sleep(100);

        // virtually advance the blocking delay 
        System.out.println(---advancing time---);
        testScheduler.advanceTimeBy(3, TimeUnit.SECONDS);

        // wait for the task to complete
        System.out.println(await...);
        testObserver.await();

        // assertions on task results, etc.
        System.out.println(finished. now do some testing... values (expected [FooBar!]):  + testObserver.values());

    }

}

[Note: Je pose cette question d'une manière plus complexe et bavard hier - mais comme il faisait trop chaud dans mon bureau et il y avait plusieurs défauts et erreurs. Par conséquent, je l'ai supprimé et maintenant il est plus condensé au problème réel.]

Créé 19/09/2018 à 13:31
source utilisateur
Dans d'autres langues...                            


1 réponses

voix
0

Si je comprends bien votre problème, vous avez un service de longue durée et que vous voulez être en mesure de tester les abonnements aux services qui seront touchés par votre service de longue durée. Je l'ai abordé ce de la manière suivante.

Définissez votre service que vous avez, en utilisant un TestSchedulerpour contrôler son avancement dans le temps. Définissez ensuite vos stimuli de test en utilisant le même programmateur de test.

Cette émule l'entité de blocage. Ne pas utiliser « bloc ».

Single<String> single = Observable.timer( 3, TimeUnit.SECONDS, scheduler )
                          .doOnNext( _l -> {} )
                          .map( _l -> "FooBar!" )
                          .take( 1 )
                          .toSingle();

// set up asynchronous operations
Observable.timer( 1_500, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction1() );
Observable.timer( 1_900, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction2() );
Observable.timer( 3_100, TimeUnit.MILLISECONDS, scheduler )
  .subscribe( _l -> performTestAction3() );
System.out.println("running the task ...");
TestObserver<String> testObserver = single.doOnSubscribe(__ -> System.out.println("onSubcribe: single"))
                                            .subscribeOn(Schedulers.newThread())
                                            .test();

// virtually advance the blocking delay 
System.out.println("---advancing time---");
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);

Vous devriez voir que performTestAction1()se produit, performTestAction2()puis les singlefinitions, puis performTestAction3(). Tout cela est entraîné par le planificateur de test.

Créé 20/09/2018 à 15:00
source utilisateur

Cookies help us deliver our services. By using our services, you agree to our use of cookies. Learn more