Testing RxJS

March 19th, 2016

When you hear for a new technology, all you want to do is just to use it. This is how my experience with RxJS started a while ago. It is for sure cool, but also a new way of thinking. Just like anything else new, you are going sooner or later to hit a wall. After grasping myself to find which sequence of operators whould do the trick, I tried to just test the thing.

Async testing

Well, no matter how close should streams be to normal iterables, they are in nature async. You have a stream and at a moment you subscribe to it. This "one moment" plays a crucial role to your testing code. Most of the times you would like to test the behaviour right before or/and right after a specific event. Being carefoul, will make you pass the first test successfully, but as soon as you might need a little bit more, you will pretty soon find yourself strungling with the setup of you test just to have something like the following at the very end of it.

someStream.subscribe((val) => {
    expect(val).toBeDefined();
    //any other expectation
});

Marble tests

If you have that problem, let me tell you a well hidden secret: marble tests. After finding that our, you are going to have yet another problem and that is the lack of documentation on how to use them. That means more baby steps, more source code reading(thanks god, the library is written in typescript) and more experimentation. Given that, I thought it would be better to also write down experiments.

After some reading and searching, I found out that it is all about the TestScheduler class that is provided with the framework. Still not that clear on how to use it.

Stream creation

While in the docu, you have the hot and cold functions, there is no info on where to find them. It looks like they are actually part of the TestScheduler's api:

  • createHotObservable<T>(marbles: string, values?: any, error?: any): Subject<T> creates a hot observable
  • createColdObservable<T>(marbles: string, values?: any, error?: any): Observable<T> creates a cold observable

My first attempt was to create a simple hot or cold observable and play with it:

/*eslint-env jasmine*/

import Rx from 'rxjs';

fdescribe('TestScheduler', () => {
  
  describe('createHotObservable()', () => {
    it('should create a hot observable emmiting the values given as marble strings', () => {
      //given
      const scheduler = new Rx.TestScheduler(null);
      const source = scheduler.createHotObservable('--a---b-cd-|');
      const expected = ['ab', 'cd'];
  
      //when
      source
        .bufferCount(2)
        .map((array) => array.join(''))
        .subscribe((val) => {
          expect(val).toEqual(expected.shift());
        });
        
      scheduler.flush();
      expect(expected.length).toBe(0);
    });
  });
  
  describe('createColdObservable()', () => {
    it('should create a cold observable emmiting the values given as marble strings', () => {
      //given
      const scheduler = new Rx.TestScheduler(null);
      const source = scheduler.createColdObservable('--a---b|');
      const expected = ['a', 'b'];
  
      //when
      source
        .subscribe((val) => {
          expect(val).toEqual(expected.shift());
        });
        
      scheduler.flush();
      expect(expected.length).toBe(0);
    });
  });
});

Being happy about the first achievement, let's try to get one level up and subscribe to an observable after a period of time. Speaking in marbles, let's say that I subscribe to the following sequence: --a-^-b-c-|. That means that subscription should be notified only for b and c:

it('should schedule the subscription on the right time', () => {
    //given
    const scheduler = new Rx.TestScheduler(null);
    const source = scheduler.createHotObservable('--a-^-b-c|');
    const results = [];

    //when
    source.subscribe((val) => {
        results.push(val);
    }, null, () => {
        results.push('done');
    });

    scheduler.flush();

    //then
    expect(results).toEqual(['b', 'c', 'done']);
});

Guess what. This does not work. TestScheduler initialization alone can not cope with time. After some searching arround I found out how to let the scheduler schedule stuff and that is the schedule function. This accepts a closure where your time based part should be placed. Let's see how to achieve that:

/*eslint-env jasmine*/

import Rx from 'rxjs';

describe('TestScheduler', () => {
  describe('schedule()', () => {
    it('should schedule the subscription on the right time', () => {
      //given
      const scheduler = new Rx.TestScheduler(null);
      const source = scheduler.createHotObservable('--a-^--b-c|');
      const results = [];

      //when
      scheduler.schedule(() => {
        source.subscribe((val) => {
          results.push(val);
        }, null, () => {
          results.push('done');  
        });
      });
              
      scheduler.flush();
      expect(results).toEqual(['b', 'c', 'done']);
    });
    
    it('should allow scheduling of non ending streams', () => {
      //given
      const scheduler = new Rx.TestScheduler(null);
      const source = scheduler.createHotObservable('--a-^--b-c');
      const results = [];

      //when
      scheduler.schedule(() => {
        source.subscribe((val) => {
          results.push(val);
        }, null, () => {
          results.push('done');  
        });
      });
              
      scheduler.flush();
      expect(results).toEqual(['b', 'c']);
    });
  });
});

Simplifying the expectations

Although straight forward, it is a bit of code that you have to write in order to test a sequence of events and we haven't a way of testing the actual time (frame) that an event took place. In the docu, you see the existence of expectObservable function, but still no info on where to find it. This time I went straight to TestScheduler and it looked like it had what I needed:

  • expectObservable(observable: Observable<any>, unsubscriptionMarbles: string = null): ({ toBe: observableToBeFn })

With that we can write expectations with marble strings just like we create the hot/cold observables, making testing much more easier.

It is though a little bit trickier to use, as we should first instanciate our scheduler with an assertion function. This will be used in order to assert the sequence of the events when flushing the scheduler. That means that it should make our test fail. So either throw an exception or just use your testing framework:

function assertEquals (actual, expected) {
    //we will use jasmine's api for the assertion:
    expect(actual).toEqual(expected);
}

const scheduler = new Rx.TestScheduler(assertEquals);

a full example would look like the following:

/*eslint-env jasmine*/

import Rx from 'rxjs';

function assertEquals (actual, expected) {
    //we will use jasmine's api for the assertion:
    expect(actual).toEqual(expected);
}

describe('TestScheduler', () => {
  describe('expectObservable()', () => {
    it('should simplify the result check of given observables', () => {
      //given
      const scheduler = new Rx.TestScheduler(assertEquals);
      const source = scheduler.createHotObservable('--a-^--b-c');

      //then
      scheduler.expectObservable(source).toBe('---bc');
      
      scheduler.flush();
    });
  });
});

Experimenting

Finally if you feel like experimenting, which by the way is the only way of getting used to the library this jsbin should get you up and running as it contains all the test cases we've discussed on this post. If you are only interesting in the code, here it is:

describe('rxjs TestScheduler', function () {
  var rx = Rx.KitchenSink;
  
  it('should be part of the Rx namespace', function () {
    expect(rx).toBeDefined();
    expect(rx.TestScheduler).toBeDefined();
  });
  
  describe('createHotObservable()', function () {
    it('should create a hot observable emmiting the values given as marble strings', function () {
      //given
      var scheduler = new rx.TestScheduler(null);
      var source = scheduler.createHotObservable('--a---b-cd-|');
      var results = [];

      //when
      source
        .bufferCount(2)
        .map(function (array) {
          return array.join('');
        })
        .subscribe(function (val) {
          results.push(val);
        }, null, function () {
          results.push('done');  
        });
        
      scheduler.flush();
      expect(results).toEqual(['ab', 'cd', 'done']);
    });
    
    it('should create a hot observable emmiting the values given as marble strings', function () {
      //given
      var scheduler = new rx.TestScheduler(null);
      var source = scheduler.createHotObservable('--a-^--b-c|');
      var results = [];

      //when
      scheduler.schedule(function () {
        source.subscribe(function (val) {
          results.push(val);
        }, null, function () {
          results.push('done');  
        });
      });
              
      scheduler.flush();
      expect(results).toEqual(['b', 'c', 'done']);
    });
  });
  
  
  describe('createColdObservable()', function () {
    it('should create a cold observable emmiting the values given as marble strings', function () {
      //given
      var scheduler = new rx.TestScheduler(null);
      var source = scheduler.createColdObservable('--a---b|');
      var results = [];

      //when
      source.subscribe(function (val) {
        results.push(val);
      });
        
      scheduler.flush();
      expect(results).toEqual(['a', 'b']);
    });
  });
  
  
  it('should be reusable with time related operators', function () {
    //given
    var scheduler = new rx.TestScheduler(null);
    var source = scheduler.createHotObservable('a----bcd-f|');
    var results = [];
   
    //when
    source
      .bufferTime(40, null, scheduler)
      .map(function (buffer) {
        return buffer.join('');
      })
      .subscribe(function (val) {
        results.push(val);
      });
      
    scheduler.flush();
    
    //then
    expect(results).toEqual(['a', 'bcd', 'f']);
  });
  
  
  describe('schedule()', function () {
    it('should schedule the subscription on the right time', function () {
      //given
      var scheduler = new rx.TestScheduler(null);
      var source = scheduler.createHotObservable('--a-^--b-c|');
      var results = [];

      //when
      scheduler.schedule(function  () {
        source.subscribe(function (val) {
          results.push(val);
        }, null, function () {
          results.push('done');  
        });
      });
              
      scheduler.flush();
      expect(results).toEqual(['b', 'c', 'done']);
    });
    
    it('should allow scheduling of non ending streams', function () {
      //given
      var scheduler = new rx.TestScheduler(null);
      var source = scheduler.createHotObservable('--a-^--b-c');
      var results = [];

      //when
      scheduler.schedule(function () {
        source.subscribe(function (val) {
          results.push(val);
        }, null, function () {
          results.push('done');  
        });
      });
              
      scheduler.flush();
      expect(results).toEqual(['b', 'c']);
    });
  });
  
      
  function assertEquals (actual, expected) {
    expect(actual).toEqual(expected);
  }
  
  describe('expectObservable()', function () {
    it('should simplify the result check of given observables', function () {
      //given
      var scheduler = new rx.TestScheduler(assertEquals);
      var source = scheduler.createHotObservable('--a-^--b-c');

      //then
      scheduler.expectObservable(source).toBe('---b-c');
      
      scheduler.flush();
    });
  });
  
  describe('operators', function () {
    describe('fromPromise()', function () {
      it('should transform a Promise to an Observable', function (done) {
        var promise = Promise.resolve('promise value');
        rx.Observable.fromPromise(promise)
          .subscribe(function (e) {
            expect(e).toEqual('promise value');
          }, done.fail, done);
      });
    });
    
    describe('mergeMap()', function () {
      it('should transform a Promise to an Observable', function () {
        //given
        var scheduler = new rx.TestScheduler(assertEquals);
        var source = scheduler.createHotObservable('--a-^--b-c');
        var other = scheduler.createColdObservable('-d--e');

        //then
        scheduler.expectObservable(source.mergeMap(function () {
          return other;
        })).toBe('----d-de-e');
      
        scheduler.flush();
      });
    });
  });
});

References

valotas.com v3.6.2 © Georgios Valotasios - CSS inspired by Adam Wathan's blog