From b6ffaff7c4fa986f7cb9a56ac628a58434791d4a Mon Sep 17 00:00:00 2001 From: Boudewijn van Groos Date: Thu, 16 Feb 2023 16:22:54 +0100 Subject: [PATCH] Add other starmap_indexed tests --- tests/test_observable/test_starmap.py | 172 ++++++++++++++++++++++++++ 1 file changed, 172 insertions(+) diff --git a/tests/test_observable/test_starmap.py b/tests/test_observable/test_starmap.py index e56ff488..6fd3ac76 100644 --- a/tests/test_observable/test_starmap.py +++ b/tests/test_observable/test_starmap.py @@ -405,6 +405,54 @@ def mapper(x, y): assert xs.subscriptions == [subscribe(200, 290)] assert invoked[0] == 3 + def test_starmap_with_index_throws(self): + with self.assertRaises(RxException): + mapper = ops.starmap_indexed(lambda x, y, index: x) + + return return_value((1, 10)).pipe(mapper).subscribe(lambda x: _raise("ex")) + + with self.assertRaises(RxException): + return ( + throw("ex").pipe(mapper).subscribe(lambda x: x, lambda ex: _raise(ex)) + ) + + with self.assertRaises(RxException): + return ( + empty() + .pipe(mapper) + .subscribe(lambda x: x, lambda ex: None, lambda: _raise("ex")) + ) + + with self.assertRaises(RxException): + return create(lambda o, s: _raise("ex")).pipe(mapper).subscribe() + + def test_starmap_with_index_dispose_inside_mapper(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(100, (4, 40)), on_next(200, (3, 30)), on_next(500, (2, 20)), on_next(600, (1, 10)) + ) + invoked = [0] + results = scheduler.create_observer() + d = SerialDisposable() + + def projection(x, y, index): + invoked[0] += 1 + if scheduler.clock > 400: + d.dispose() + + return x + y + index * 100 + + d.disposable = xs.pipe(ops.starmap_indexed(projection)).subscribe(results) + + def action(scheduler, state): + return d.dispose() + + scheduler.schedule_absolute(disposed, action) + scheduler.start() + assert results.messages == [on_next(100, 44), on_next(200, 133)] + assert xs.subscriptions == [subscribe(0, 500)] + assert invoked[0] == 3 + def test_starmap_with_index_completed(self): scheduler = TestScheduler() invoked = [0] @@ -438,6 +486,130 @@ def projection(x, y, index): assert xs.subscriptions == [subscribe(200, 400)] assert invoked[0] == 4 + def test_starmap_with_index_default_mapper(self): + scheduler = TestScheduler() + xs = scheduler.create_hot_observable( + on_next(180, (5, 50)), + on_next(210, (4, 40)), + on_next(240, (3, 30)), + on_next(290, (2, 20)), + on_next(350, (1, 10)), + on_completed(400), + on_next(410, (-1, -10)), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + return xs.pipe(ops.starmap_indexed()) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, (4, 40)), + on_next(240, (3, 30)), + on_next(290, (2, 20)), + on_next(350, (1, 10)), + on_completed(400), + ] + + assert xs.subscriptions == [subscribe(200, 400)] + + def test_starmap_with_index_not_completed(self): + scheduler = TestScheduler() + invoked = [0] + xs = scheduler.create_hot_observable( + on_next(180, (5, 50)), + on_next(210, (4, 40)), + on_next(240, (3, 30)), + on_next(290, (2, 20)), + on_next(350, (1, 10)), + ) + + def factory(): + def projection(x, y, index): + invoked[0] += 1 + return (x + 1) + (y + 10) + (index * 100) + + return xs.pipe(ops.starmap_indexed(projection)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 55), + on_next(240, 144), + on_next(290, 233), + on_next(350, 322), + ] + assert xs.subscriptions == [subscribe(200, 1000)] + assert invoked[0] == 4 + + def test_starmap_with_index_error(self): + scheduler = TestScheduler() + ex = "ex" + invoked = [0] + xs = scheduler.create_hot_observable( + on_next(180, (5, 50)), + on_next(210, (4, 40)), + on_next(240, (3, 30)), + on_next(290, (2, 20)), + on_next(350, (1, 10)), + on_error(400, ex), + on_next(410, (-1, -10)), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + def projection(x, y, index): + invoked[0] += 1 + return (x + 1) + (y + 10) + (index * 100) + + return xs.pipe(ops.starmap_indexed(projection)) + + results = scheduler.start(factory) + + assert results.messages == [ + on_next(210, 55), + on_next(240, 144), + on_next(290, 233), + on_next(350, 322), + on_error(400, ex), + ] + assert xs.subscriptions == [subscribe(200, 400)] + + def test_starmap_with_index_mapper_throws(self): + scheduler = TestScheduler() + invoked = [0] + ex = "ex" + xs = scheduler.create_hot_observable( + on_next(180, (5, 50)), + on_next(210, (4, 40)), + on_next(240, (3, 30)), + on_next(290, (2, 20)), + on_next(350, (1, 10)), + on_completed(400), + on_next(410, (-1, -10)), + on_completed(420), + on_error(430, "ex"), + ) + + def factory(): + def projection(x, y, index): + invoked[0] += 1 + if invoked[0] == 3: + raise Exception(ex) + return (x + 1) + (y + 10) + (index * 100) + + return xs.pipe(ops.starmap_indexed(projection)) + + results = scheduler.start(factory) + assert results.messages == [ + on_next(210, 55), + on_next(240, 144), + on_error(290, ex), + ] + assert xs.subscriptions == [subscribe(200, 290)] + assert invoked[0] == 3 + if __name__ == "__main__": unittest.main()