Commit 40cf7643 by 李维杰

重构请求相关逻辑

parent f134760d
...@@ -54,63 +54,93 @@ namespace offcn ...@@ -54,63 +54,93 @@ namespace offcn
{ {
printf("\nreceive message : topic = %s, context : %s\n", topic.c_str(), message.c_str()); printf("\nreceive message : topic = %s, context : %s\n", topic.c_str(), message.c_str());
std::string path; std::string method;
std::string error;
ReqValue values; ReqValue values;
if (!MqttRequest::ParseRequest(message, path, values)) if (!MqttRequest::ParseRequestType(message, method, error))
{ {
printf("Error : Parse request error\n"); printf("Error : Parse request error\n");
return; return;
} }
std::string fid = values["fid"]; ReqValue valuse;
std::string pull_type;
if (path.compare("join") == 0) if (method.compare("join") == 0)
{
if (MqttRequest::ParseJoinRequest(message, valuse, error))
{
OnParseJoinSuccess(values["message_id"], valuse["from_id"]);
}
else
{ {
pull_type = values["pull_type"]; OnParseFailure(values["message_id"], valuse["from_id"], error);
OnRequstJoin(fid, pull_type); }
}
else if (method.compare("update") == 0)
{
if (MqttRequest::ParseUpdateRequest(message, valuse, error))
{
OnParseUpdateSuccess(valuse["message_id"], values["from_id"], values["pull_type"]);
}
else
{
OnParseFailure(valuse["message_id"], values["from_id"], error);
}
} }
} }
void CmdProcesserServer::OnRequstJoin(std::string fid, std::string pull_type) void CmdProcesserServer::OnParseJoinSuccess(std::string message_id, std::string from_id)
{ {
if (client_lists_.find(fid) != client_lists_.end()) std::string response;
if (client_lists_.find(from_id) != client_lists_.end())
{ {
printf("Waring : fid = %s already exist\n", fid.c_str()); response = MqttRequest::BuildResponse(message_id, "id exist");
mqtt_wrapper_->SendRequest(from_id, response);
return; return;
} }
else
{
client_lists_.insert(std::pair<std::string, std::string>(from_id, "null"));
response = MqttRequest::BuildResponse(message_id, "ok");
std::string dest_id; printf("\n------------------------------------\n");
std::map<std::string, std::string>::iterator itor; std::map<std::string, std::string>::iterator itor;
for (itor = client_lists_.begin(); itor != client_lists_.end(); itor++) for (itor = client_lists_.begin(); itor != client_lists_.end(); itor++)
{ {
if (itor->second.compare("rtmp") == 0) printf("id = %s, pull type = %s\n", itor->first.c_str(), itor->second.c_str());
{
dest_id = itor->first;
break;
} }
printf("------------------------------------\n");
}
}
void CmdProcesserServer::OnParseFailure(std::string message_id, std::string from_id, std::string error)
{
std::string response = MqttRequest::BuildResponse(message_id, error);
mqtt_wrapper_->SendRequest(from_id, response);
} }
void CmdProcesserServer::OnParseUpdateSuccess(std::string message_id, std::string from_id, std::string pull_type)
{
std::string response; std::string response;
if (client_lists_.find(from_id) == client_lists_.end())
if (itor != client_lists_.end())
{ {
client_lists_.insert(std::pair<std::string, std::string>(fid, "kcp")); response = MqttRequest::BuildResponse(message_id, "id is not exist");
response = MqttRequest::JoinResponse(dest_id, "success"); mqtt_wrapper_->SendRequest(from_id, response);
return;
} }
else else
{ {
client_lists_.insert(std::pair<std::string, std::string>(fid, "rtmp")); client_lists_.insert(std::pair<std::string, std::string>(from_id, pull_type));
response = MqttRequest::JoinResponse("", "success"); response = MqttRequest::BuildResponse(message_id, "ok");
}
mqtt_wrapper_->SendRequest(fid, response);
printf("\n------------------------------------\n"); printf("\n------------------------------------\n");
std::map<std::string, std::string>::iterator itor;
for (itor = client_lists_.begin(); itor != client_lists_.end(); itor++) for (itor = client_lists_.begin(); itor != client_lists_.end(); itor++)
{ {
printf("id = %s, pull type = %s\n", itor->first.c_str(), itor->second.c_str()); printf("id = %s, pull type = %s\n", itor->first.c_str(), itor->second.c_str());
} }
printf("------------------------------------\n"); printf("------------------------------------\n");
} }
}
} }
\ No newline at end of file
...@@ -26,7 +26,10 @@ namespace offcn ...@@ -26,7 +26,10 @@ namespace offcn
virtual void OnServerMessageArrived(std::string topic, std::string message); virtual void OnServerMessageArrived(std::string topic, std::string message);
private: private:
void OnRequstJoin(std::string fid, std::string pull_type); void OnParseJoinSuccess(std::string message_id, std::string from_id);
void OnParseFailure(std::string message_id, std::string from_id, std::string error);
void OnParseUpdateSuccess(std::string message_id, std::string from_id, std::string pull_type);
private: private:
MqttWrapper *mqtt_wrapper_; MqttWrapper *mqtt_wrapper_;
......
#include "cmd_processer_client.h" #include "cmd_processer_client.h"
#include "mqtt_request.h" #include "mqtt_request.h"
#include <time.h>
namespace offcn namespace offcn
{ {
static const std::string kGlobalTopic = "kcp_server_mqtt"; static const std::string kGlobalTopic = "kcp_server_mqtt";
static const std::string kLocalID = "mqtt_client"; static const std::string kLocalID = "mqtt_client";
static const std::string kDstTopic = kLocalID; static const std::string kDstTopic = kLocalID;
static std::string clientMessageID()
{
char str[12] = { 0 };
int len = 12;
srand((unsigned int)time(NULL));
int i;
for (i = 0; i < len; ++i)
{
switch ((rand() % 3))
{
case 1:
str[i] = 'A' + rand() % 26;
break;
case 2:
str[i] = 'a' + rand() % 26;
break;
default:
str[i] = '0' + rand() % 10;
break;
}
}
str[++i] = '\0';
return str;
}
CmdProcesserClient::CmdProcesserClient() CmdProcesserClient::CmdProcesserClient()
:mqtt_wrapper_(NULL) :mqtt_wrapper_(NULL),
observer_(NULL)
{ {
local_id_ = kLocalID; local_id_ = kLocalID;
...@@ -23,9 +52,10 @@ namespace offcn ...@@ -23,9 +52,10 @@ namespace offcn
} }
} }
void CmdProcesserClient::SetLocalID(std::string id) void CmdProcesserClient::SetLocalID(std::string id, CmdProcesserObserver *observer)
{ {
local_id_ = id; local_id_ = id;
observer_ = observer;
} }
bool CmdProcesserClient::Connect() bool CmdProcesserClient::Connect()
{ {
...@@ -37,7 +67,16 @@ namespace offcn ...@@ -37,7 +67,16 @@ namespace offcn
} }
bool CmdProcesserClient::SendLocalCandidate(std::string tid, std::string type, std::string candidate) bool CmdProcesserClient::SendLocalCandidate(std::string tid, std::string type, std::string candidate)
{ {
std::string req = MqttRequest::Candidate(local_id_, tid, type, candidate); std::string req;
if (type.compare("offer") == 0)
{
req = MqttRequest::OfferRequest(clientMessageID(), local_id_, tid, candidate);
}
else if (type.compare("answer") == 0)
{
req = MqttRequest::AnswerRequest(clientMessageID(), local_id_, tid, candidate);
}
return mqtt_wrapper_->SendRequest(tid, req); return mqtt_wrapper_->SendRequest(tid, req);
} }
...@@ -58,7 +97,7 @@ namespace offcn ...@@ -58,7 +97,7 @@ namespace offcn
printf("subscribe topic success!\n"); printf("subscribe topic success!\n");
std::string topic = kGlobalTopic; std::string topic = kGlobalTopic;
std::string req = MqttRequest::JoinRequest(local_id_);; std::string req = MqttRequest::JoinRequest(clientMessageID(), local_id_);
mqtt_wrapper_->SendRequest(topic, req); mqtt_wrapper_->SendRequest(topic, req);
} }
...@@ -68,22 +107,71 @@ namespace offcn ...@@ -68,22 +107,71 @@ namespace offcn
} }
void CmdProcesserClient::OnServerMessageArrived(std::string topic, std::string message) void CmdProcesserClient::OnServerMessageArrived(std::string topic, std::string message)
{ {
std::string path; std::string method;
std::string error;
ReqValue values; ReqValue values;
if (!MqttRequest::ParseResponse(message, path, values)) if (!MqttRequest::ParseRequestType(message, method, error))
{ {
printf("Error : Parse request error\n"); printf("Error : Parse request error\n");
return; return;
} }
if (path.compare("join") == 0) if (method.compare("publishers") == 0)
{
if (MqttRequest::ParsePublishersRequest(message, values, error))
{
OnParsePublishersSuccess(values["message_id"], values["publishers_id"]);
}
else
{ {
std::string ids = values["ids"]; OnParseFailure(values["message_id"], values["publishers_id"], error);
std::string result = values["result"]; }
if (ids.length() > 0) }
else if (method.compare("offer") == 0)
{ {
publisher_list_.insert(std::pair<std::string, std::string>(ids, ids)); if (MqttRequest::ParseOfferRequest(message, values, error))
{
OnParseOfferOrAnswerSuccess(values["message_id"], "offer", values["from_id"], values["candidate"]);
} }
else
{
OnParseFailure(values["message_id"], values["from_id"], error);
}
}
else if (method.compare("answer") == 0)
{
if (MqttRequest::ParseAnswerRequest(message, values, error))
{
OnParseOfferOrAnswerSuccess(values["message_id"], "answer", values["from_id"], values["candidate"]);
}
else
{
OnParseFailure(values["message_id"], values["from_id"], error);
}
}
}
void CmdProcesserClient::OnParsePublishersSuccess(std::string message_id, std::string publishers_id)
{
publisher_list_.insert(std::pair<std::string, std::string>(publishers_id, publishers_id));
std::string req = MqttRequest::BuildResponse(message_id, "ok");
std::string topic = kGlobalTopic;
mqtt_wrapper_->SendRequest(topic, req);
}
void CmdProcesserClient::OnParseFailure(std::string message_id, std::string from_id, std::string error)
{
std::string response = MqttRequest::BuildResponse(message_id, error);
mqtt_wrapper_->SendRequest(from_id, response);
}
void CmdProcesserClient::OnParseOfferOrAnswerSuccess(std::string message_id, std::string type, std::string to_id, std::string candidate)
{
std::string req = MqttRequest::BuildResponse(message_id, "ok");
mqtt_wrapper_->SendRequest(to_id, req);
if (observer_)
{
observer_->OnReceiveRemoteCandidate(to_id, type, candidate);
} }
} }
} }
\ No newline at end of file
...@@ -5,6 +5,12 @@ ...@@ -5,6 +5,12 @@
namespace offcn namespace offcn
{ {
class CmdProcesserObserver
{
public:
virtual void OnReceiveRemoteCandidate(std::string &remote_id, std::string &type, std::string &candidate) = 0;
};
class CmdProcesserClient : public CmdObserver class CmdProcesserClient : public CmdObserver
{ {
public: public:
...@@ -12,7 +18,7 @@ namespace offcn ...@@ -12,7 +18,7 @@ namespace offcn
~CmdProcesserClient(); ~CmdProcesserClient();
public: public:
void SetLocalID(std::string id); void SetLocalID(std::string id, CmdProcesserObserver *observer);
bool Connect(); bool Connect();
bool SubscribeTopic(); bool SubscribeTopic();
/** /**
...@@ -30,6 +36,12 @@ namespace offcn ...@@ -30,6 +36,12 @@ namespace offcn
virtual void OnServerSubscribeFailure(); virtual void OnServerSubscribeFailure();
virtual void OnServerMessageArrived(std::string topic, std::string message); virtual void OnServerMessageArrived(std::string topic, std::string message);
public:
void OnParsePublishersSuccess(std::string message_id, std::string publishers_id);
void OnParseFailure(std::string message_id, std::string from_id, std::string error);
void OnParseOfferOrAnswerSuccess(std::string message_id, std::string type, std::string to_id, std::string candidate);
private: private:
MqttWrapper *mqtt_wrapper_; MqttWrapper *mqtt_wrapper_;
private: private:
...@@ -37,5 +49,7 @@ namespace offcn ...@@ -37,5 +49,7 @@ namespace offcn
std::map<std::string, std::string> subscriber_list_; std::map<std::string, std::string> subscriber_list_;
private: private:
std::string local_id_; std::string local_id_;
private:
CmdProcesserObserver *observer_;
}; };
} }
\ No newline at end of file
...@@ -3,154 +3,208 @@ ...@@ -3,154 +3,208 @@
namespace offcn namespace offcn
{ {
/* static bool GetJsonValue(std::string &response, Json::Value &root)
{ {
"path": "join", Json::CharReaderBuilder readerBuilder;
"fid" : "xxxid", std::unique_ptr<Json::CharReader> const reader(readerBuilder.newCharReader());
"data" : const char *start_pos = response.c_str();
std::string err;
if (!reader->parse(start_pos, start_pos + response.length(), &root, &err))
{ {
"pull_type" : "null" return false;
} }
return true;
} }
*/
std::string MqttRequest::JoinRequest(std::string local_id) std::string MqttRequest::JoinRequest(std::string message_id, std::string from_id)
{ {
Json::Value root; Json::Value root;
Json::StreamWriterBuilder wBuilder; Json::StreamWriterBuilder wBuilder;
root["path"] = "join"; root["message_id"] = message_id;
root["fid"] = local_id; root["method"] = "join";
root["from_id"] = from_id;
Json::Value data;
data["pull_type"] = "null";
root["data"] = data;
return Json::writeString(wBuilder, root); return Json::writeString(wBuilder, root);
} }
/* std::string MqttRequest::UpdateRequest(std::string message_id, std::string from_id, std::string pull_type)
{
"path": "update",
"fid" : "xxxid",
"data" :
{
"pull_type" : "rtmp" or "null" or "kcp"
}
}
*/
std::string MqttRequest::UpdateRequest(std::string local_id, std::string pull_type)
{ {
Json::Value root; Json::Value root;
Json::StreamWriterBuilder wBuilder; Json::StreamWriterBuilder wBuilder;
root["path"] = "update"; root["message_id"] = message_id;
root["fid"] = local_id; root["method"] = "update";
root["from_id"] = from_id;
root["pull_type"] = pull_type;
Json::Value data; return Json::writeString(wBuilder, root);
data["pull_type"] = pull_type; }
std::string MqttRequest::OfferRequest(std::string message_id, std::string from_id, std::string to_id, std::string candidate)
{
Json::Value root;
Json::StreamWriterBuilder wBuilder;
root["data"] = data; root["message_id"] = message_id;
root["method"] = "offer";
root["from_id"] = from_id;
root["to_id"] = to_id;
root["candidate"] = candidate;
return Json::writeString(wBuilder, root); return Json::writeString(wBuilder, root);
} }
/* std::string MqttRequest::AnswerRequest(std::string message_id, std::string from_id, std::string to_id, std::string candidate)
{ {
"path": "offer/answer", Json::Value root;
"fid" : "xxxid", Json::StreamWriterBuilder wBuilder;
"tid" : "remoteid",
"candidate" : "" root["message_id"] = message_id;
root["method"] = "answer";
root["from_id"] = from_id;
root["to_id"] = to_id;
root["candidate"] = candidate;
return Json::writeString(wBuilder, root);
} }
*/ std::string MqttRequest::PublishersRequest(std::string message_id, std::string to_id, std::string publisher_id)
std::string MqttRequest::Candidate(std::string local_id, std::string remote_id, std::string type, std::string candidate)
{ {
Json::Value root; Json::Value root;
Json::StreamWriterBuilder wBuilder; Json::StreamWriterBuilder wBuilder;
root["path"] = type; root["message_id"] = message_id;
root["fid"] = local_id; root["method"] = "publishers";
root["tid"] = remote_id; root["publishers_id"] = publisher_id;
root["candidate"] = candidate; root["to_id"] = to_id;
return Json::writeString(wBuilder, root); return Json::writeString(wBuilder, root);
} }
static bool GetJsonValue(std::string &response, Json::Value &root) bool MqttRequest::ParseRequestType(std::string request, std::string &type, std::string &error)
{ {
Json::CharReaderBuilder readerBuilder; Json::Value root;
std::unique_ptr<Json::CharReader> const reader(readerBuilder.newCharReader()); if (!GetJsonValue(request, root))
const char *start_pos = response.c_str();
std::string err;
if (!reader->parse(start_pos, start_pos + response.length(), &root, &err))
{ {
error = "json format error";
return false;
}
if (!root["method"])
{
error = "param error";
return false; return false;
} }
return true; return true;
} }
bool MqttRequest::ParseRequest(std::string &req, std::string &path, ReqValue &values) bool MqttRequest::ParseJoinRequest(std::string request, ReqValue &values, std::string &error)
{ {
Json::Value root; Json::Value root;
if (!GetJsonValue(request, root))
{
error = "json format error";
goto Error;
}
if (!GetJsonValue(req, root)) return false; if (!root["message_id"] || !root["method"] || !root["from_id"])
{
error = "param error";
goto Error;
}
values["message_id"] = root["message_id"].asString();
values["method"] = root["method"].asString();
values["from_id"] = root["from_id"].asString();
if (!root["path"] || !root["fid"]) return false; return true;
path = root["path"].asString(); Error:
std::string fid = root["fid"].asString(); values["message_id"] = "";
values["method"] = "";
values["from_id"] = "";
if (path.compare("join") == 0 || return false;
path.compare("update") == 0) }
bool MqttRequest::ParseUpdateRequest(std::string request, ReqValue &values, std::string &error)
{ {
values["fid"] = fid; Json::Value root;
values["pull_type"] = root["pull_type"].asString(); if (!GetJsonValue(request, root))
{
error = "json format error";
return false;
} }
else if (path.compare("offer") == 0 ||
path.compare("answer") == 0) if (!root["message_id"] || !root["method"] || !root["from_id"] || !root["pull_type"])
{ {
values["fid"] = fid; error = "param error";
values["tid"] = root["tid"].asString(); return false;
values["candidate"] = root["candidate"].asString();
} }
values["message_id"] = root["message_id"].asString();
values["method"] = root["method"].asString();
values["from_id"] = root["from_id"].asString();
values["pull_type"] = root["pull_type"].asString();
return true; return true;
} }
bool MqttRequest::ParseResponse(std::string &response, std::string &path, ReqValue &values)
bool MqttRequest::ParseOfferRequest(std::string request, ReqValue &values, std::string &error)
{ {
Json::Value root; Json::Value root;
if (!GetJsonValue(request, root))
{
error = "json format error";
return false;
}
if (!GetJsonValue(response, root)) return false; if (!root["message_id"] || !root["method"] || !root["from_id"] || !root["to_id"] || !root["candidate"])
if (!root["path"] || !root["fid"]) return false;
path = root["path"].asString();
std::string fid = root["fid"].asString();
if (path.compare("join") == 0)
{ {
values["ids"] = root["ids"].asString(); error = "param error";
values["result"] = root["result"].asString(); return false;
} }
values["message_id"] = root["message_id"].asString();
values["method"] = root["method"].asString();
values["from_id"] = root["from_id"].asString();
values["to_id"] = root["from_id"].asString();
values["candidate"] = root["candidate"].asString();
return true; return true;
} }
bool MqttRequest::ParseAnswerRequest(std::string request, ReqValue &values, std::string &error)
{
return ParseOfferRequest(request, values, error);
}
std::string MqttRequest::JoinResponse(std::string ids, std::string result) bool MqttRequest::ParsePublishersRequest(std::string request, ReqValue &values, std::string &error)
{ {
Json::Value root; Json::Value root;
Json::StreamWriterBuilder wBuilder; if (!GetJsonValue(request, root))
{
error = "json format error";
return false;
}
root["path"] = "join"; if (!root["message_id"] || !root["method"] || !root["publishers"] || !root["to_id"])
root["ids"] = ids; {
root["result"] = result; error = "param error";
return false;
}
return Json::writeString(wBuilder, root); values["message_id"] = root["message_id"].asString();
values["method"] = root["method"].asString();
values["publishers_id"] = root["publishers_id"].asString();
values["to_id"] = root["to_id"].asString();
return true;
} }
std::string MqttRequest::UpdateResponse(std::string result) std::string MqttRequest::BuildResponse(std::string message_id, std::string result_code)
{ {
Json::Value root; Json::Value root;
Json::StreamWriterBuilder wBuilder; Json::StreamWriterBuilder wBuilder;
root["path"] = "update"; root["message_id"] = message_id;
root["result"] = result; root["result_code"] = result_code;
return Json::writeString(wBuilder, root); return Json::writeString(wBuilder, root);
} }
......
...@@ -3,6 +3,38 @@ ...@@ -3,6 +3,38 @@
#include <string> #include <string>
#include <map> #include <map>
/*
Request format
{
"message_id":"xxxx",
"method":"join",
"from_id":"xxxx",
"to_id":"xxxx"
"data":
{}
}
Response format
{
"message_id":"xxxx",
"result_code":"ok" or
"result_code":"no auth"
"data":
{}
}
client to server:
join
update
client to client:
offer/answer
server to client:
publishers
*/
namespace offcn namespace offcn
{ {
typedef std::map<std::string, std::string> ReqValue; typedef std::map<std::string, std::string> ReqValue;
...@@ -10,14 +42,44 @@ namespace offcn ...@@ -10,14 +42,44 @@ namespace offcn
class MqttRequest class MqttRequest
{ {
public: public:
static std::string JoinRequest(std::string local_id); /**
static std::string UpdateRequest(std::string local_id, std::string pull_type); * client to server
static std::string Candidate(std::string local_id, std::string remote_id, std::string type, std::string candidate); */
static std::string JoinRequest(std::string message_id, std::string from_id);
static std::string UpdateRequest(std::string message_id, std::string from_id, std::string pull_type);
/**
* client to client
*/
static std::string OfferRequest(std::string message_id, std::string from_id, std::string to_id, std::string candidate);
static std::string AnswerRequest(std::string message_id, std::string from_id, std::string to_id, std::string candidate);
/**
* server to client
*/
static std::string PublishersRequest(std::string message_id, std::string to_id, std::string publisher_id);
/**
* public
*/
static bool ParseRequestType(std::string request, std::string &type, std::string &error);
static bool ParseRequest(std::string &req, std::string &path, ReqValue &values); /**
static bool ParseResponse(std::string &response, std::string &path, ReqValue &values); * client to server
*/
static bool ParseJoinRequest(std::string request, ReqValue &values, std::string &error);
static bool ParseUpdateRequest(std::string request, ReqValue &values, std::string &error);
/**
* client to client
*/
static bool ParseOfferRequest(std::string request, ReqValue &values, std::string &error);
static bool ParseAnswerRequest(std::string request, ReqValue &values, std::string &error);
/**
* server to client
*/
static bool ParsePublishersRequest(std::string request, ReqValue &values, std::string &error);
static std::string JoinResponse(std::string ids, std::string result); /**
static std::string UpdateResponse(std::string result); * build response
*/
static std::string BuildResponse(std::string message_id, std::string result_code);
}; };
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment