delayする実験
#include <iostream> #include "rx-includes.hpp" class Man { public: Man(); ~Man(); void hello(); private: int age; std::string name; }; template <typename T> class AbstBase { public: T hoge; }; class FuncClass { public: FuncClass() : connectStateOb(std::shared_ptr<rxcpp::subjects::behavior<bool>>::make_shared(true)){ }; ~FuncClass(){} std::shared_ptr<rxcpp::subjects::behavior<bool>> connectStateOb; void poi(); rxcpp::observable<std::string> connect(); rxcpp::observable<std::string> disscconnect(); }; class Base : public AbstBase<Man>, public FuncClass { public: Base() : FuncClass(){}; ~Base(){}; }; class Conc: Base { public: Conc(): Base(){ }; ~Conc(){ std::cout << " ** deconstruction Conc **" << std::endl; }; rxcpp::observable<std::string> rxtest(); void log(std::string txt); };
#include "man.hpp" #include <chrono> Man::Man(): name("hoge"), age(0) { } Man::~Man() { std::cout << " ** desconstructe Man **" << std::endl; } void Man::hello() { std::cout << " ** hello **" << std::endl; } void FuncClass::poi() { std::cout << " ** poi **" << std::endl; } rxcpp::observable<std::string> FuncClass::connect() { rxcpp::observable<int> a = rxcpp::observable<>::just(0) .delay(std::chrono::milliseconds(1000)); rxcpp::observable<std::string> b = a.flat_map([=](auto x) { rxcpp::observable<std::string> c = rxcpp::observable<>::just<std::string>(std::string("event!")); return c; }).as_dynamic(); return b; } rxcpp::observable<std::string> FuncClass::disscconnect() { auto v = rxcpp::observable<>::interval(std::chrono::milliseconds(1000), rxcpp::observe_on_new_thread()) .map([=](long x) { return std::to_string(x); }) .take(2); return v; } void Conc::log(std::string txt) { std::cout << txt << std::endl; } rxcpp::observable<std::string> Conc::rxtest() { auto o = rxcpp::observable<>::just(10).as_dynamic(); rxcpp::observable<std::string> s = o.flat_map([=](int x) { return connect(); }) .as_dynamic() .flat_map([=](std::string c) { std::cout << "==>" << c << std::endl; log("log1"); return disscconnect(); }) .as_dynamic() .flat_map([=](std::string d) { log("log2"); log(d); this->hoge.hello(); return rxcpp::observable<>::just(d); }) .as_dynamic(); return s; }
操作を遅らせたりして動作を確認していた。
void rxtest2() { std::shared_ptr<Conc> conc = std::shared_ptr<Conc>::make_shared(); rxcpp::observable<std::string> x = conc->rxtest(); auto l = x.subscribe([=](auto r) { std::cout << "** get event **" << std::endl; std::cout << r << std::endl; }); conc.reset(); std::async(std::launch::async, [l]() { usleep(5 * 1000 * 1000); std::cout << "** unsubscribe **" << std::endl; l.unsubscribe(); usleep(5 * 1000 * 1000); std::cout << "** end **" << std::endl; }); }
色々やっているうちにコードが無駄に複雑化したがオブジェクトのデストラクタのタイミング等が少し理解できた。