Commit f4cbc4db by 李维杰

为了测试turn 81.70.124.49 及 纯P2P通讯(不走中转)而修改代码

parent f5787584
#include "cmd_processer_server.h" #include "cmd_processer_server.h"
#include "mqtt_request.h" #include "mqtt_request.h"
#include "json/json.h"
namespace offcn
{ namespace offcn
static const std::string kGlobalTopic = "kcp_server_mqtt"; {
static const std::string kLocalID = "kcp_server_mqtt"; static const std::string kGlobalTopic = "topic_for_turn_test";
static const std::string kLocalID = "mqtt_server_for_turn_test";
CmdProcesserServer::CmdProcesserServer()
:mqtt_wrapper_(NULL) CmdProcesserServer::CmdProcesserServer()
{ :mqtt_wrapper_(NULL)
mqtt_wrapper_ = new offcn::MqttWrapper(this); {
} mqtt_wrapper_ = new offcn::MqttWrapper(this);
CmdProcesserServer::~CmdProcesserServer() }
{ CmdProcesserServer::~CmdProcesserServer()
if (mqtt_wrapper_) {
{ if (mqtt_wrapper_)
delete mqtt_wrapper_; {
mqtt_wrapper_ = NULL; delete mqtt_wrapper_;
} mqtt_wrapper_ = NULL;
} }
}
bool CmdProcesserServer::Connect()
{ bool CmdProcesserServer::Connect()
return mqtt_wrapper_->ConnectMqttServer(kLocalID); {
} return mqtt_wrapper_->ConnectMqttServer(kLocalID);
bool CmdProcesserServer::SubscribeTopic() }
{ bool CmdProcesserServer::SubscribeTopic()
return mqtt_wrapper_->SubscribeTopic(kGlobalTopic); {
} return mqtt_wrapper_->SubscribeTopic(kGlobalTopic);
}
/**
* CmdObserver /**
*/ * CmdObserver
void CmdProcesserServer::OnServerConnectSuccess() */
{ void CmdProcesserServer::OnServerConnectSuccess()
printf("connect mqtt server success!\n"); {
printf("connect mqtt server success!\n");
SubscribeTopic();
} SubscribeTopic();
void CmdProcesserServer::OnServerConnectFailure() }
{ void CmdProcesserServer::OnServerConnectFailure()
printf("connect mqtt server failure!\n"); {
} printf("connect mqtt server failure!\n");
void CmdProcesserServer::OnServerSubscribeSuccess() }
{ void CmdProcesserServer::OnServerSubscribeSuccess()
printf("subscribe topic success!\n"); {
} printf("subscribe topic success!\n");
void CmdProcesserServer::OnServerSubscribeFailure() }
{ void CmdProcesserServer::OnServerSubscribeFailure()
printf("subscribe topic failure!\n"); {
} printf("subscribe topic failure!\n");
void CmdProcesserServer::OnServerMessageArrived(std::string topic, std::string message) }
{ void CmdProcesserServer::OnServerMessageArrived(std::string topic, std::string message)
printf("\nreceive message : topic = %s, context : %s\n", topic.c_str(), message.c_str()); {
printf("\nreceive message : topic = %s, context : %s\n", topic.c_str(), message.c_str());
std::string method;
std::string error; std::string method;
ReqValue values; std::string error;
if (!MqttRequest::ParseRequestType(message, method, error)) ReqValue values;
{ if (!MqttRequest::ParseRequestType(message, method, error))
printf("Error : Parse request error\n"); {
return; printf("Error : Parse request error\n");
} return;
}
if (method.compare("join") == 0)
{ if (method.compare("login") == 0)
if (MqttRequest::ParseJoinRequest(message, values, error)) {
{ if (MqttRequest::ParseJoinRequest(message, values, error))
OnParseJoinSuccess(values["message_id"], values["from_id"]); {
} OnParseJoinSuccess(values["id"]);
else }
{ else
OnParseFailure(values["message_id"], values["from_id"], error); {
} OnParseFailure(values["id"], values["id"], error);
} }
else if (method.compare("update") == 0) }
{ }
if (MqttRequest::ParseUpdateRequest(message, values, error))
{ void CmdProcesserServer::OnParseJoinSuccess(std::string from_id)
OnParseUpdateSuccess(values["message_id"], values["from_id"], values["pull_type"]); {
} std::string response;
else if (client_lists_.find(from_id) != client_lists_.end())
{ {
OnParseFailure(values["message_id"], values["from_id"], error); response = MqttRequest::BuildResponse(from_id, "id exist");
} mqtt_wrapper_->SendRequest(from_id, response);
}
} return;
}
void CmdProcesserServer::OnParseJoinSuccess(std::string message_id, std::string from_id) else
{ {
std::string response; client_lists_.insert(std::pair<std::string, std::string>(from_id, from_id));
if (client_lists_.find(from_id) != client_lists_.end())
{ Json::Value root;
response = MqttRequest::BuildResponse(message_id, "id exist"); Json::Value root1;
mqtt_wrapper_->SendRequest(from_id, response); Json::Value ids;
Json::StreamWriterBuilder wBuilder;
return;
} root["method"] = "publishers";
else
{ root1["method"] = "online";
if (client_lists_.size() == 0) root1["id"] = from_id;
{
client_lists_.insert(std::pair<std::string, std::string>(from_id, "rtmp")); printf("\n----------------online list--------------------\n");
} std::map<std::string, std::string>::iterator itor;
else for (itor = client_lists_.begin(); itor != client_lists_.end(); itor++)
{ {
client_lists_.insert(std::pair<std::string, std::string>(from_id, "null")); if (itor->first.compare(from_id) != 0)
} {
//response = MqttRequest::PublishersRequest("sdf", from_id, itor->first);
//response //mqtt_wrapper_->SendRequest(itor->first, response);
response = MqttRequest::BuildResponse(message_id, "ok");
mqtt_wrapper_->SendRequest(from_id, response); ids.append(itor->first);
}
printf("\n------------------------------------\n"); printf("id = %s, pull type = %s\n", itor->first.c_str(), itor->second.c_str());
std::map<std::string, std::string>::iterator itor; }
for (itor = client_lists_.begin(); itor != client_lists_.end(); itor++) printf("\n-----------------------------------------------\n");
{
if (itor->first.compare(from_id) != 0 && itor->second.compare("rtmp") == 0) root["ids"] = ids;
{
response = MqttRequest::PublishersRequest("sdf", from_id, itor->first); std::string req = Json::writeString(wBuilder, root);
mqtt_wrapper_->SendRequest(from_id, response); mqtt_wrapper_->SendRequest(from_id, req);
}
printf("id = %s, pull type = %s\n", itor->first.c_str(), itor->second.c_str()); //req = Json::writeString(wBuilder, root1);
} //std::string topic = kGlobalTopic;
printf("------------------------------------\n"); //mqtt_wrapper_->SendRequest(topic, req);
} }
} }
void CmdProcesserServer::OnParseFailure(std::string message_id, std::string from_id, std::string error) void CmdProcesserServer::OnParseFailure(std::string message_id, std::string from_id, std::string error)
{ {
std::string response = MqttRequest::BuildResponse(message_id, error); std::string response = MqttRequest::BuildResponse(message_id, error);
mqtt_wrapper_->SendRequest(from_id, response); mqtt_wrapper_->SendRequest(from_id, response);
} }
void CmdProcesserServer::OnParseUpdateSuccess(std::string message_id, std::string from_id, std::string pull_type) void CmdProcesserServer::OnParseUpdateSuccess(std::string message_id, std::string from_id, std::string pull_type)
{ {
std::string response; std::string response;
if (client_lists_.find(from_id) == client_lists_.end()) if (client_lists_.find(from_id) == client_lists_.end())
{ {
response = MqttRequest::BuildResponse(message_id, "id is not exist"); response = MqttRequest::BuildResponse(message_id, "id is not exist");
mqtt_wrapper_->SendRequest(from_id, response); mqtt_wrapper_->SendRequest(from_id, response);
return; return;
} }
else else
{ {
client_lists_.insert(std::pair<std::string, std::string>(from_id, pull_type)); client_lists_.insert(std::pair<std::string, std::string>(from_id, pull_type));
response = MqttRequest::BuildResponse(message_id, "ok"); response = MqttRequest::BuildResponse(message_id, "ok");
printf("\n------------------------------------\n"); printf("\n------------------------------------\n");
std::map<std::string, std::string>::iterator itor; std::map<std::string, std::string>::iterator itor;
for (itor = client_lists_.begin(); itor != client_lists_.end(); itor++) for (itor = client_lists_.begin(); itor != client_lists_.end(); itor++)
{ {
printf("id = %s, pull type = %s\n", itor->first.c_str(), itor->second.c_str()); printf("id = %s, pull type = %s\n", itor->first.c_str(), itor->second.c_str());
} }
printf("------------------------------------\n"); printf("------------------------------------\n");
} }
} }
} }
\ No newline at end of file
#pragma once #pragma once
#include "mqtt_wrapper.h" #include "mqtt_wrapper.h"
#include <map> #include <map>
namespace offcn namespace offcn
{ {
class CmdProcesserServer : public CmdObserver class CmdProcesserServer : public CmdObserver
{ {
public: public:
CmdProcesserServer(); CmdProcesserServer();
~CmdProcesserServer(); ~CmdProcesserServer();
public: public:
bool Connect(); bool Connect();
bool SubscribeTopic(); bool SubscribeTopic();
public: public:
/** /**
* CmdObserver * CmdObserver
*/ */
virtual void OnServerConnectSuccess(); virtual void OnServerConnectSuccess();
virtual void OnServerConnectFailure(); virtual void OnServerConnectFailure();
virtual void OnServerSubscribeSuccess(); virtual void OnServerSubscribeSuccess();
virtual void OnServerSubscribeFailure(); virtual void OnServerSubscribeFailure();
virtual void OnServerMessageArrived(std::string topic, std::string message); virtual void OnServerMessageArrived(std::string topic, std::string message);
private: private:
void OnParseJoinSuccess(std::string message_id, std::string from_id); void OnParseJoinSuccess(std::string from_id);
void OnParseFailure(std::string message_id, std::string from_id, std::string error); void OnParseFailure(std::string message_id, std::string from_id, std::string error);
void OnParseUpdateSuccess(std::string message_id, std::string from_id, std::string pull_type); void OnParseUpdateSuccess(std::string message_id, std::string from_id, std::string pull_type);
private: private:
MqttWrapper *mqtt_wrapper_; MqttWrapper *mqtt_wrapper_;
private: private:
std::map<std::string, std::string> client_lists_; std::map<std::string, std::string> client_lists_;
}; };
} }
\ No newline at end of file
#include "mqtt_wrapper.h" #include "mqtt_wrapper.h"
#include <stdlib.h> #include <stdlib.h>
namespace offcn namespace offcn
{ {
static const std::string kMqttUrl = "101.200.63.143";//"192.168.43.12"; static const std::string kMqttUrl = "test-mqq.offcncloud.com";//"192.168.43.12";
static const std::string kUserName = "admin"; static const std::string kUserName = "admin";
static const std::string kPassWord = "public"; static const std::string kPassWord = "public";
static struct Options static struct Options
{ {
char *connection; char *connection;
int verbose; int verbose;
int test_no; int test_no;
int size; int size;
int MQTTVersion; int MQTTVersion;
int iterations; int iterations;
}options; }options;
MqttWrapper::MqttWrapper(CmdObserver *observer) MqttWrapper::MqttWrapper(CmdObserver *observer)
:mqtt_client_(NULL), :mqtt_client_(NULL),
observer_(observer) observer_(observer)
{ {
} }
bool MqttWrapper::ConnectMqttServer(std::string localID) bool MqttWrapper::ConnectMqttServer(std::string localID)
{ {
options.connection = (char *)malloc(128); options.connection = (char *)malloc(128);
memset(options.connection, 0, 128); memset(options.connection, 0, 128);
memcpy(options.connection, kMqttUrl.c_str(), kMqttUrl.size()); memcpy(options.connection, kMqttUrl.c_str(), kMqttUrl.size());
options.verbose = 0; options.verbose = 0;
options.test_no = -1; options.test_no = -1;
options.size = 10000; options.size = 10000;
options.MQTTVersion = MQTTVERSION_3_1; options.MQTTVersion = MQTTVERSION_3_1;
options.iterations = 1; options.iterations = 1;
MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer; MQTTAsync_willOptions wopts = MQTTAsync_willOptions_initializer;
std::string clientid = localID; std::string clientid = localID;
int nRet = MQTTAsync_create(&mqtt_client_, options.connection, clientid.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL); int nRet = MQTTAsync_create(&mqtt_client_, options.connection, clientid.c_str(), MQTTCLIENT_PERSISTENCE_NONE, NULL);
if (nRet != MQTTASYNC_SUCCESS) if (nRet != MQTTASYNC_SUCCESS)
{ {
return false; return false;
} }
nRet = MQTTAsync_setCallbacks(mqtt_client_, this, NULL, mqttMessageCallback, NULL); nRet = MQTTAsync_setCallbacks(mqtt_client_, this, NULL, mqttMessageCallback, NULL);
std::string new_token = kPassWord; std::string new_token = kPassWord;
opts.keepAliveInterval = 20; opts.keepAliveInterval = 20;
opts.cleansession = 1; opts.cleansession = 1;
opts.username = kUserName.c_str(); opts.username = kUserName.c_str();
opts.password = kPassWord.c_str(); opts.password = kPassWord.c_str();
opts.MQTTVersion = options.MQTTVersion; opts.MQTTVersion = options.MQTTVersion;
opts.cleansession = true; opts.cleansession = true;
opts.will = NULL; opts.will = NULL;
opts.onSuccess = mqttConnectSuccess; opts.onSuccess = mqttConnectSuccess;
opts.onFailure = mqttConnectFailure; opts.onFailure = mqttConnectFailure;
opts.context = this; opts.context = this;
opts.connectTimeout = 5; opts.connectTimeout = 5;
nRet = MQTTAsync_connect(mqtt_client_, &opts); nRet = MQTTAsync_connect(mqtt_client_, &opts);
if (nRet != MQTTASYNC_SUCCESS) if (nRet != MQTTASYNC_SUCCESS)
{ {
MQTTAsync_destroy(&mqtt_client_); MQTTAsync_destroy(&mqtt_client_);
mqtt_client_ = NULL; mqtt_client_ = NULL;
return false; return false;
} }
return true; return true;
} }
bool MqttWrapper::SubscribeTopic(std::string topic) bool MqttWrapper::SubscribeTopic(std::string topic)
{ {
dst_topic_ = topic; dst_topic_ = topic;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer; MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
opts.onSuccess = mqttSubscribeSuccess; opts.onSuccess = mqttSubscribeSuccess;
opts.onFailure = mqttSubscribeFailure; opts.onFailure = mqttSubscribeFailure;
opts.context = this; opts.context = this;
int nRet = MQTTAsync_subscribe(mqtt_client_, topic.c_str(), 2, &opts); int nRet = MQTTAsync_subscribe(mqtt_client_, topic.c_str(), 2, &opts);
if (nRet != MQTTASYNC_SUCCESS) if (nRet != MQTTASYNC_SUCCESS)
{ {
return false; return false;
} }
return true; return true;
} }
int MqttWrapper::SendRequest(std::string &topic, std::string &request) int MqttWrapper::SendRequest(std::string &topic, std::string &request)
{ {
MQTTAsync_message msg = MQTTAsync_message_initializer; MQTTAsync_message msg = MQTTAsync_message_initializer;
msg.payload = (void *)request.c_str(); msg.payload = (void *)request.c_str();
msg.payloadlen = (int)request.length(); msg.payloadlen = (int)request.length();
msg.qos = 2; msg.qos = 2;
msg.retained = 0; msg.retained = 0;
int nRet = MQTTAsync_send(mqtt_client_, topic.c_str(), msg.payloadlen, msg.payload, msg.qos, msg.retained, NULL); int nRet = MQTTAsync_send(mqtt_client_, topic.c_str(), msg.payloadlen, msg.payload, msg.qos, msg.retained, NULL);
if (nRet == MQTTASYNC_SUCCESS) if (nRet == MQTTASYNC_SUCCESS)
{ {
return 0; return 0;
} }
return -1; return -1;
} }
void MqttWrapper::OnMqttConnectSuccess(MQTTAsync_successData *response) void MqttWrapper::OnMqttConnectSuccess(MQTTAsync_successData *response)
{ {
if (observer_) if (observer_)
{ {
observer_->OnServerConnectSuccess(); observer_->OnServerConnectSuccess();
} }
} }
void MqttWrapper::OnMqttConnectFailure(MQTTAsync_failureData *response) void MqttWrapper::OnMqttConnectFailure(MQTTAsync_failureData *response)
{ {
if (observer_) if (observer_)
{ {
observer_->OnServerConnectFailure(); observer_->OnServerConnectFailure();
} }
} }
void MqttWrapper::OnMqttSubscribeSuccess(MQTTAsync_successData *response) void MqttWrapper::OnMqttSubscribeSuccess(MQTTAsync_successData *response)
{ {
if (observer_) if (observer_)
{ {
observer_->OnServerSubscribeSuccess(); observer_->OnServerSubscribeSuccess();
} }
} }
void MqttWrapper::OnMqttSubscribeFailure(MQTTAsync_failureData *response) void MqttWrapper::OnMqttSubscribeFailure(MQTTAsync_failureData *response)
{ {
if (observer_) if (observer_)
{ {
observer_->OnServerSubscribeFailure(); observer_->OnServerSubscribeFailure();
} }
} }
int MqttWrapper::mqttRecvMessage(char *topicName, int topicNameLen, MQTTAsync_message *message) int MqttWrapper::mqttRecvMessage(char *topicName, int topicNameLen, MQTTAsync_message *message)
{ {
if (dst_topic_.compare(topicName) != 0) return 1; if (dst_topic_.compare(topicName) != 0) return 1;
if (message->payloadlen <= 0) return 1; if (message->payloadlen <= 0) return 1;
std::string topic; std::string topic;
topic.append(topicName, topicNameLen); topic.append(topicName, topicNameLen);
std::string msg; std::string msg;
msg.append((char *)message->payload, message->payloadlen); msg.append((char *)message->payload, message->payloadlen);
if (observer_) if (observer_)
{ {
observer_->OnServerMessageArrived(topic, msg); observer_->OnServerMessageArrived(topic, msg);
} }
MQTTAsync_freeMessage(&message); MQTTAsync_freeMessage(&message);
MQTTAsync_free(topicName); MQTTAsync_free(topicName);
return 1; return 1;
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment