MsgTopic.hpp
1 //
2 // Created by xiong on 2020/10/23.
3 //
4 
5 #ifndef EMX_MSGTOPIC_HPP
6 #define EMX_MSGTOPIC_HPP
7 
8 #include <vector>
9 #include "core/EmxTypeDef.hpp"
10 #include "core/EmxUV.hpp"
11 #include "core/utils/Socket.hpp"
12 
13 namespace Emx {
19  class MsgTopic {
20  public:
21 
23  enum class ClientTypeE : uint8_t {
24  Pub = 0,
25  Sub
26  };
27 
29  enum class MsgTypeE : uint8_t {
30  Transfer = 0,
31  Register,
32  };
33 
35  static const int32_t MaxTopicNum = 64;
36 
38  struct Header {
39  uint32_t sync;
41  uint32_t size;
42  };
43 
49  struct SubRegister {
51  bool sendByPack;
52  int32_t maxSend;
53  uint32_t topicArray[MaxTopicNum];
54  };
55 
59  struct Transfer {
61  uint32_t topic;
62  };
63  };
64 
69  public:
73  using Callback = std::function<void()>;
74 
78  struct Config {
82  };
83 
85  m_connected = false;
86  }
87 
94 
99 
107  ErrCodeE Publish(uint32_t topic, const char *data, int32_t size);
108 
109  protected:
110  void OnConnect() override {
111  m_connected = true;
112  if (m_cbConnected)m_cbConnected();
113  }
114 
115  void OnDisConnect() override {
116  m_connected = false;
117  if (m_cbDisConnected)m_cbDisConnected();
118  }
119 
120  private:
121  bool m_connected;
122  Callback m_cbConnected;
123  Callback m_cbDisConnected;
124  };
125 
130  public:
139  static ErrCodeE Publish(uint32_t topic, const char *data, int32_t size, int32_t timeoutMs = 1000);
140  };
141 
146  public:
151  ErrCodeE Create(int32_t timeoutMs = 3000);
152 
156  void Destroy();
157 
166  ErrCodeE Publish(uint32_t topic, const char *data, int32_t size, int32_t timeoutMs = 3000);
167  private:
168  SocketUnStream m_sock;
169  char m_name[EMX_MAX_PATH_SIZE];
170  };
171 
176  public:
180  using Callback = std::function<void()>;
181 
188  using RecvMsgCallback = std::function<void(uint32_t topic, const char *data, int32_t size)>;
189 
195  struct Config {
197  bool sendByPack;
198  int32_t maxSend;
199  std::vector<uint32_t> topicArray;
200  char *buffer;
201  int32_t size;
205  };
206 
208  m_waitSize = sizeof(MsgTopic::Header);
209  m_waitHeader = true;
210  m_reg = nullptr;
211  }
212 
219 
224 
225  protected:
226  void OnConnect() override;
227 
228  void OnDisConnect() override {
229  if (m_cbDisConnected)m_cbDisConnected();
230  }
231 
232  void OnRecv(const char *data, int32_t size) override;
233 
234  private:
235  bool m_waitHeader;
236  uint32_t m_waitSize;
237  MsgTopic::Header m_header;
238  std::string m_data;
239  std::shared_ptr<std::string> m_reg;
240  Callback m_cbConnected;
241  Callback m_cbDisConnected;
242  RecvMsgCallback m_cbRecvMsg;
243  };
244 
249  public:
250 
254  struct Config {
255  bool sendByPack;
256  int32_t maxSend;
257  std::vector<uint32_t> topicArray;
258  };
259 
261 
268  ErrCodeE Connect(const Config &cfg, int32_t timeoutMs);
269 
277  ErrCodeE GetMsg(uint32_t &topic, std::string &data, int32_t timeoutMs);
278 
282  void DisConnect();
283 
288  const char *GetSockName() { return m_path; }
289 
290  private:
291  SocketUnStream m_sock;
292  char m_path[EMX_MAX_PATH_SIZE];
293  };
295 }
296 
297 #endif //EMX_MSGTOPIC_HPP
实现多路复用循环的主体,所有基于EuvLoop的事件都应绑定到一个EuvLoop上
Definition: EuvLoop.hpp:18
使用EuvPipe作为EuvStreamInf,并且继承EuvStmClientBase, 形成以Pipe为底层通信机制面向连接的客户端
Definition: EuvPipe.hpp:103
void Destroy()
断开链接并销毁客户端
用于Topic的异步发布
Definition: MsgTopic.hpp:68
MsgTopicPublisherAsync()
Definition: MsgTopic.hpp:84
void OnDisConnect() override
Definition: MsgTopic.hpp:115
void OnConnect() override
Definition: MsgTopic.hpp:110
ErrCodeE Publish(uint32_t topic, const char *data, int32_t size)
发布一条Topic
void DestroyMsgTopicPublisherAsync()
销毁异步Topic发布
Definition: MsgTopic.hpp:98
ErrCodeE CreateMsgTopicPublisherAsync(const Config &cfg)
创建异步Topic发布
std::function< void()> Callback
连接状态回调
Definition: MsgTopic.hpp:73
同步阻塞发布消息,分离socket的创建和销毁,用于需要同步频繁发布消息的场景
Definition: MsgTopic.hpp:145
void Destroy()
销毁发布连接
ErrCodeE Create(int32_t timeoutMs=3000)
创建发布连接
ErrCodeE Publish(uint32_t topic, const char *data, int32_t size, int32_t timeoutMs=3000)
发送同步阻塞消息,比较耗时耗资源,尽量使用MsgTopicPublisherAsync异步请求
同步阻塞发布消息
Definition: MsgTopic.hpp:129
static ErrCodeE Publish(uint32_t topic, const char *data, int32_t size, int32_t timeoutMs=1000)
发送同步阻塞消息,比较耗时耗资源,尽量使用MsgTopicPublisherAsync异步请求
用于Topic的异步订阅
Definition: MsgTopic.hpp:175
std::function< void()> Callback
连接状态回调
Definition: MsgTopic.hpp:180
MsgTopicSubscriberAsync()
Definition: MsgTopic.hpp:207
void OnRecv(const char *data, int32_t size) override
void OnDisConnect() override
Definition: MsgTopic.hpp:228
std::function< void(uint32_t topic, const char *data, int32_t size)> RecvMsgCallback
接收到订阅消息的回调函数
Definition: MsgTopic.hpp:188
ErrCodeE CreateMsgTopicSubscriberAsync(const Config &cfg)
创建异步订阅
void DestroyMsgTopicSubscriberAsync()
销毁异步订阅
Definition: MsgTopic.hpp:223
同步阻塞订阅
Definition: MsgTopic.hpp:248
MsgTopicSubscriberSync()
Definition: MsgTopic.hpp:260
ErrCodeE GetMsg(uint32_t &topic, std::string &data, int32_t timeoutMs)
获取订阅的消息
const char * GetSockName()
获取本地地址
Definition: MsgTopic.hpp:288
ErrCodeE Connect(const Config &cfg, int32_t timeoutMs)
连接TopicServer
void DisConnect()
断开与TopicServer的链接
用于Topic通信的数据类型定义
Definition: MsgTopic.hpp:19
MsgTypeE
客户端消息类型
Definition: MsgTopic.hpp:29
@ Register
客户端注册
static const int32_t MaxTopicNum
单个订阅端可订阅最大Topic数量
Definition: MsgTopic.hpp:35
ClientTypeE
Topic客户端类型
Definition: MsgTopic.hpp:23
面向连接的AF_UNIX操作接口
Definition: Socket.hpp:87
ErrCodeE
错误码定义
Definition: EmxTypeDef.hpp:29
Definition: EmxGpio.hpp:10
启动异步发布时携带的配置参数
Definition: MsgTopic.hpp:78
Callback OnConnected
注册一个当与MsgTopicServer成功建立连接后的回调,连接成功后才能发布消息
Definition: MsgTopic.hpp:80
EuvLoop * loop
需要绑定的loop
Definition: MsgTopic.hpp:79
Callback OnDisConnected
注册一个当与MsgTopicServer断开连接后的回调
Definition: MsgTopic.hpp:81
启动异步订阅时携带的配置参数
Definition: MsgTopic.hpp:195
RecvMsgCallback OnRecvMsg
注册一个用于接收响应消息的回调
Definition: MsgTopic.hpp:204
Callback OnConnected
注册一个当与MsgTopicServer成功建立连接后的回调
Definition: MsgTopic.hpp:202
int32_t size
buffer的大小
Definition: MsgTopic.hpp:201
EuvLoop * loop
需要绑定的loop
Definition: MsgTopic.hpp:196
char * buffer
需要提供一个buffer来接收数据
Definition: MsgTopic.hpp:200
bool sendByPack
true代表根据包的数量判断,false代表根据字节数判断
Definition: MsgTopic.hpp:197
Callback OnDisConnected
注册一个当与MsgTopicServer断开连接后的回调
Definition: MsgTopic.hpp:203
std::vector< uint32_t > topicArray
需要订阅的topic数组
Definition: MsgTopic.hpp:199
int32_t maxSend
TopicServer针对此订阅端缓存的最大数据量,单位由‘sendByPack’决定,-1表示无限制
Definition: MsgTopic.hpp:198
启动时携带的配置参数
Definition: MsgTopic.hpp:254
int32_t maxSend
TopicServer针对此订阅端缓存的最大数据量,单位由‘sendByPack’决定,-1表示无限制
Definition: MsgTopic.hpp:256
std::vector< uint32_t > topicArray
需要订阅的topic数组
Definition: MsgTopic.hpp:257
bool sendByPack
true代表根据包的数量判断,false代表根据字节数判断
Definition: MsgTopic.hpp:255
Topic数据协议头
Definition: MsgTopic.hpp:38
uint32_t size
Definition: MsgTopic.hpp:41
uint32_t sync
Definition: MsgTopic.hpp:39
MsgTypeE msgType
Definition: MsgTopic.hpp:40
订阅端注册用结构体
Definition: MsgTopic.hpp:49
bool sendByPack
true代表根据包的数量判断,false代表根据字节数判断
Definition: MsgTopic.hpp:51
uint32_t topicArray[MaxTopicNum]
需要订阅的topic,数组中以0作为有效topic的截止
Definition: MsgTopic.hpp:53
int32_t maxSend
TopicServer针对此订阅端缓存的最大数据量,单位由‘sendByPack’决定,-1表示无限制
Definition: MsgTopic.hpp:52
Header header
Topic数据协议头
Definition: MsgTopic.hpp:50
用于消息传输的消息头
Definition: MsgTopic.hpp:59
Header header
消息头
Definition: MsgTopic.hpp:60
uint32_t topic
传送的topic
Definition: MsgTopic.hpp:61