diff --git a/Rx/v2/examples/doxygen/throttle.cpp b/Rx/v2/examples/doxygen/throttle.cpp new file mode 100644 index 000000000..e2b0232e0 --- /dev/null +++ b/Rx/v2/examples/doxygen/throttle.cpp @@ -0,0 +1,20 @@ +#include "rxcpp/rx.hpp" + +#include "rxcpp/rx-test.hpp" +#include "catch.hpp" + +SCENARIO("throttle sample"){ + printf("//! [throttle sample]\n"); + using namespace std::chrono; + auto scheduler = rxcpp::identity_current_thread(); + auto start = scheduler.now(); + auto period = milliseconds(10); + auto values = rxcpp::observable<>::interval(start, period, scheduler). + take(4). + throttle(period); + values. + subscribe( + [](long v) { printf("OnNext: %ld\n", v); }, + []() { printf("OnCompleted\n"); }); + printf("//! [throttle sample]\n"); +} \ No newline at end of file diff --git a/Rx/v2/src/rxcpp/operators/rx-throttle.hpp b/Rx/v2/src/rxcpp/operators/rx-throttle.hpp new file mode 100644 index 000000000..4b89c26d7 --- /dev/null +++ b/Rx/v2/src/rxcpp/operators/rx-throttle.hpp @@ -0,0 +1,271 @@ +// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. + +#pragma once + +/*! \file rx-throttle.hpp + + \brief Return an observable that emits a value from the source and then ignores any following items until a particular timespan has passed before emitting another value. + + \tparam Duration the type of the time interval + \tparam Coordination the type of the scheduler + + \param period the period of time to suppress any emitted items after the first emission + \param coordination the scheduler to manage timeout for each event + + \return Observable that emits a value from the source and then ignores any following items until a particular timespan has passed before emitting another value. + + \sample + \snippet throttle.cpp throttle sample + \snippet output.txt throttle sample +*/ + +#if !defined(RXCPP_OPERATORS_RX_THROTTLE_HPP) +#define RXCPP_OPERATORS_RX_THROTTLE_HPP + +#include "../rx-includes.hpp" + +#include + +namespace rxcpp { + +namespace operators { + +namespace detail { + +template +struct throttle_invalid_arguments {}; + +template +struct throttle_invalid : public rxo::operator_base> { + using type = observable, throttle_invalid>; +}; +template +using throttle_invalid_t = typename throttle_invalid::type; + +template +struct throttle +{ + typedef rxu::decay_t source_value_type; + typedef rxu::decay_t coordination_type; + typedef typename coordination_type::coordinator_type coordinator_type; + typedef rxu::decay_t duration_type; + + struct throttle_values + { + throttle_values(duration_type p, coordination_type c) + : period(p) + , coordination(c) + { + } + + duration_type period; + coordination_type coordination; + }; + throttle_values initial; + + throttle(duration_type period, coordination_type coordination) + : initial(period, coordination) + { + } + + template + struct throttle_observer + { + typedef throttle_observer this_type; + typedef rxu::decay_t value_type; + typedef rxu::decay_t dest_type; + typedef observer observer_type; + + struct throttle_subscriber_values : public throttle_values + { + throttle_subscriber_values(composite_subscription cs, dest_type d, throttle_values v, coordinator_type c) + : throttle_values(v) + , cs(std::move(cs)) + , dest(std::move(d)) + , coordinator(std::move(c)) + , worker(coordinator.get_worker()) + , throttled(false) + { + } + + composite_subscription cs; + dest_type dest; + coordinator_type coordinator; + rxsc::worker worker; + mutable bool throttled; + }; + typedef std::shared_ptr state_type; + state_type state; + + throttle_observer(composite_subscription cs, dest_type d, throttle_values v, coordinator_type c) + : state(std::make_shared(throttle_subscriber_values(std::move(cs), std::move(d), v, std::move(c)))) + { + auto localState = state; + + auto disposer = [=](const rxsc::schedulable&){ + localState->cs.unsubscribe(); + localState->dest.unsubscribe(); + localState->worker.unsubscribe(); + }; + auto selectedDisposer = on_exception( + [&](){ return localState->coordinator.act(disposer); }, + localState->dest); + if (selectedDisposer.empty()) { + return; + } + + localState->dest.add([=](){ + localState->worker.schedule(selectedDisposer.get()); + }); + localState->cs.add([=](){ + localState->worker.schedule(selectedDisposer.get()); + }); + } + + static std::function reset_throttle(state_type state) { + auto reset = [state](const rxsc::schedulable&) { + state->throttled = false; + }; + + auto selectedReset = on_exception( + [&](){ return state->coordinator.act(reset); }, + state->dest); + if (selectedReset.empty()) { + return std::function(); + } + + return std::function(selectedReset.get()); + } + + void on_next(T v) const { + auto localState = state; + + const auto tp = localState->worker.now().time_since_epoch(); + std::cout << "on_next(" << v << ") at " << tp.count() / 1000000 << " throttled: " << (localState->throttled ? "true" : "false") << std::endl; + + if (!localState->throttled) { + localState->throttled = true; + + state->dest.on_next(v); + + auto work = [v, localState](const rxsc::schedulable&) { + auto produce_time = localState->worker.now() + localState->period; + + std::cout << "scheduling unthrottle for " << (produce_time.time_since_epoch().count() / 1000000) << std::endl; + + localState->worker.schedule(produce_time, reset_throttle(localState)); + }; + auto selectedWork = on_exception( + [&](){return localState->coordinator.act(work);}, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + } + + void on_error(rxu::error_ptr e) const { + auto localState = state; + auto work = [e, localState](const rxsc::schedulable&) { + localState->dest.on_error(e); + }; + auto selectedWork = on_exception( + [&](){ return localState->coordinator.act(work); }, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + + void on_completed() const { + auto localState = state; + auto work = [localState](const rxsc::schedulable&) { + localState->dest.on_completed(); + }; + auto selectedWork = on_exception( + [&](){ return localState->coordinator.act(work); }, + localState->dest); + if (selectedWork.empty()) { + return; + } + localState->worker.schedule(selectedWork.get()); + } + + static subscriber make(dest_type d, throttle_values v) { + auto cs = composite_subscription(); + auto coordinator = v.coordination.create_coordinator(); + + return make_subscriber(cs, observer_type(this_type(cs, std::move(d), std::move(v), std::move(coordinator)))); + } + }; + + template + auto operator()(Subscriber dest) const + -> decltype(throttle_observer::make(std::move(dest), initial)) { + return throttle_observer::make(std::move(dest), initial); + } +}; + +} + +/*! @copydoc rx-throttle.hpp +*/ +template +auto throttle(AN&&... an) + -> operator_factory { + return operator_factory(std::make_tuple(std::forward(an)...)); +} + +} + +template<> +struct member_overload +{ + template, + rxu::is_duration>, + class SourceValue = rxu::value_type_t, + class Throttle = rxo::detail::throttle, identity_one_worker>> + static auto member(Observable&& o, Duration&& d) + -> decltype(o.template lift(Throttle(std::forward(d), identity_current_thread()))) { + return o.template lift(Throttle(std::forward(d), identity_current_thread())); + } + + template, + is_coordination, + rxu::is_duration>, + class SourceValue = rxu::value_type_t, + class Throttle = rxo::detail::throttle, rxu::decay_t>> + static auto member(Observable&& o, Coordination&& cn, Duration&& d) + -> decltype(o.template lift(Throttle(std::forward(d), std::forward(cn)))) { + return o.template lift(Throttle(std::forward(d), std::forward(cn))); + } + + template, + is_coordination, + rxu::is_duration>, + class SourceValue = rxu::value_type_t, + class Throttle = rxo::detail::throttle, rxu::decay_t>> + static auto member(Observable&& o, Duration&& d, Coordination&& cn) + -> decltype(o.template lift(Throttle(std::forward(d), std::forward(cn)))) { + return o.template lift(Throttle(std::forward(d), std::forward(cn))); + } + + template + static operators::detail::throttle_invalid_t member(const AN&...) { + std::terminate(); + return {}; + static_assert(sizeof...(AN) == 10000, "throttle takes (optional Coordination, required Duration) or (required Duration, optional Coordination)"); + } +}; + +} + +#endif diff --git a/Rx/v2/src/rxcpp/rx-includes.hpp b/Rx/v2/src/rxcpp/rx-includes.hpp index 1eb47db5e..9d7ed0db9 100644 --- a/Rx/v2/src/rxcpp/rx-includes.hpp +++ b/Rx/v2/src/rxcpp/rx-includes.hpp @@ -259,6 +259,7 @@ #include "operators/rx-take_until.hpp" #include "operators/rx-take_while.hpp" #include "operators/rx-tap.hpp" +#include "operators/rx-throttle.hpp" #include "operators/rx-time_interval.hpp" #include "operators/rx-timeout.hpp" #include "operators/rx-timestamp.hpp" diff --git a/Rx/v2/src/rxcpp/rx-observable.hpp b/Rx/v2/src/rxcpp/rx-observable.hpp index 4f420076e..4fba8122c 100644 --- a/Rx/v2/src/rxcpp/rx-observable.hpp +++ b/Rx/v2/src/rxcpp/rx-observable.hpp @@ -1417,6 +1417,17 @@ class observable return observable_member(take_while_tag{}, *this, std::forward(an)...); } + /*! @copydoc rx-throttle.hpp + */ + template + auto throttle(AN&&... an) const + /// \cond SHOW_SERVICE_MEMBERS + -> decltype(observable_member(throttle_tag{}, *(this_type*)nullptr, std::forward(an)...)) + /// \endcond + { + return observable_member(throttle_tag{}, *this, std::forward(an)...); + } + /*! @copydoc rx-repeat.hpp */ template diff --git a/Rx/v2/src/rxcpp/rx-operators.hpp b/Rx/v2/src/rxcpp/rx-operators.hpp index aa7d7f4ae..ad069e4ba 100644 --- a/Rx/v2/src/rxcpp/rx-operators.hpp +++ b/Rx/v2/src/rxcpp/rx-operators.hpp @@ -462,6 +462,13 @@ struct tap_tag { }; }; +struct throttle_tag { + template + struct include_header{ + static_assert(Included::value, "missing include: please #include "); + }; +}; + struct timeout_tag { template struct include_header{ diff --git a/Rx/v2/test/CMakeLists.txt b/Rx/v2/test/CMakeLists.txt index c2d153069..6c3613406 100644 --- a/Rx/v2/test/CMakeLists.txt +++ b/Rx/v2/test/CMakeLists.txt @@ -79,6 +79,7 @@ set(TEST_SOURCES ${TEST_DIR}/operators/take_until.cpp ${TEST_DIR}/operators/take_while.cpp ${TEST_DIR}/operators/tap.cpp + ${TEST_DIR}/operators/throttle.cpp ${TEST_DIR}/operators/time_interval.cpp ${TEST_DIR}/operators/timeout.cpp ${TEST_DIR}/operators/timestamp.cpp diff --git a/Rx/v2/test/operators/throttle.cpp b/Rx/v2/test/operators/throttle.cpp new file mode 100644 index 000000000..8cda077b1 --- /dev/null +++ b/Rx/v2/test/operators/throttle.cpp @@ -0,0 +1,216 @@ +#include "../test.h" +#include + +using namespace std::chrono; + +SCENARIO("throttle - never", "[throttle][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1) + }); + + WHEN("values are throttled"){ + + auto res = w.start( + [so, xs]() { + return xs | rxo::throttle(so, milliseconds(10)); + } + ); + + THEN("the output is empty"){ + auto required = std::vector::recorded_type>(); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 1001) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + } + } +} + +SCENARIO("throttle - empty", "[throttle][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.completed(250) + }); + + WHEN("values are throttled"){ + + auto res = w.start( + [so, xs]() { + return xs.throttle(milliseconds(10), so); + } + ); + + THEN("the output only contains complete message"){ + auto required = rxu::to_vector({ + on.completed(251) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("throttle - no overlap", "[throttle][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(210, 2), + on.next(240, 3), + on.completed(300) + }); + + WHEN("values are throtteled"){ + + auto res = w.start( + [so, xs]() { + return xs.throttle(milliseconds(10), so); + } + ); + + THEN("the output only contains throttled items sent while subscribed"){ + auto required = rxu::to_vector({ + on.next(210, 2), + on.next(240, 3), + on.completed(301) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 300) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("throttle - overlap", "[throttle][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages on; + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.next(215, 2), + on.next(225, 3), + on.next(235, 4), + on.next(247, 5), + on.next(255, 6), + on.next(265, 7), + on.next(500, 8), + on.completed(600) + }); + + WHEN("values are throttled"){ + + auto res = w.start( + [so, xs]() { + return xs.throttle(milliseconds(30), so); + } + ); + + THEN("the output only contains throttled items sent while subscribed"){ + auto required = rxu::to_vector({ + on.next(215, 2), + on.next(247, 5), + on.next(500, 8), + on.completed(601) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 600) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} + +SCENARIO("throttle - throw", "[throttle][operators]"){ + GIVEN("a source"){ + auto sc = rxsc::make_test(); + auto so = rx::synchronize_in_one_worker(sc); + auto w = sc.create_worker(); + const rxsc::test::messages on; + + std::runtime_error ex("throttle on_error from source"); + + auto xs = sc.make_hot_observable({ + on.next(150, 1), + on.error(250, ex) + }); + + WHEN("values are throttled"){ + + auto res = w.start( + [so, xs]() { + return xs.throttle(milliseconds(10), so); + } + ); + + THEN("the output only contains only error"){ + auto required = rxu::to_vector({ + on.error(251, ex) + }); + auto actual = res.get_observer().messages(); + REQUIRE(required == actual); + } + + THEN("there was 1 subscription/unsubscription to the source"){ + auto required = rxu::to_vector({ + on.subscribe(200, 250) + }); + auto actual = xs.subscriptions(); + REQUIRE(required == actual); + } + + } + } +} diff --git a/projects/doxygen/CMakeLists.txt b/projects/doxygen/CMakeLists.txt index 1c1ba4caf..6d0950246 100644 --- a/projects/doxygen/CMakeLists.txt +++ b/projects/doxygen/CMakeLists.txt @@ -106,6 +106,7 @@ if(DOXYGEN_FOUND) ${DOXY_EXAMPLES_SRC_DIR}/take_until.cpp ${DOXY_EXAMPLES_SRC_DIR}/take_while.cpp ${DOXY_EXAMPLES_SRC_DIR}/tap.cpp + ${DOXY_EXAMPLES_SRC_DIR}/throttle.cpp ${DOXY_EXAMPLES_SRC_DIR}/time_interval.cpp ${DOXY_EXAMPLES_SRC_DIR}/timeout.cpp ${DOXY_EXAMPLES_SRC_DIR}/timer.cpp