#pragma once #include #include typedef void(*cb)(const char*, size_t); class NatsConnect { natsConnection* m_con = nullptr; cb m_cb; public: NatsConnect() = default; explicit NatsConnect(const char* url) { connect(url); } ~NatsConnect(); void connect(const char* url) noexcept; void disconnect() noexcept; void reconnect() noexcept; void publish(const char* subject, const char* data) noexcept; operator void*() const { return m_con; } void subscribe(const char* subject, cb f) noexcept; static void handle(const char* msg, size_t len, NatsConnect* obj); }; inline NatsConnect::~NatsConnect() { if (m_con) { natsConnection_Close(m_con); natsConnection_Destroy(m_con); } } inline void NatsConnect::connect(const char* url) noexcept { nats_CheckCompatibility(); const natsStatus s = natsConnection_ConnectTo(&m_con, url); if (s != NATS_OK) { printf("Error: %s\n", natsStatus_GetText(s)); return; } } inline void NatsConnect::subscribe(const char *subject, cb f) noexcept { m_cb = f; natsSubscription *sub = nullptr; const natsStatus s = natsConnection_Subscribe(&sub, m_con, subject, [](natsConnection *nc, natsSubscription *sub, natsMsg *msg, void *closure) { handle(natsMsg_GetData(msg), natsMsg_GetDataLength(msg), (NatsConnect*)closure); }, this); if (s != NATS_OK) { printf("Error: %s\n", natsStatus_GetText(s)); return; } natsSubscription_SetPendingLimits(sub, 1024, 1024); } inline void NatsConnect::handle(const char *msg, size_t len, NatsConnect *obj) { obj->m_cb(msg, len); }