Commit ff491e70 by 李维杰

增加请求解析

parent 0da033fc
......@@ -53,5 +53,64 @@ namespace offcn
void CmdProcesserServer::OnServerMessageArrived(std::string topic, std::string message)
{
printf("\nreceive message : topic = %s, context : %s\n", topic.c_str(), message.c_str());
std::string path;
ReqValue values;
if (!MqttRequest::ParseRequest(message, path, values))
{
printf("Error : Parse request error\n");
return;
}
std::string fid = values["fid"];
std::string pull_type;
if (path.compare("join") == 0)
{
pull_type = values["pull_type"];
OnRequstJoin(fid, pull_type);
}
}
void CmdProcesserServer::OnRequstJoin(std::string fid, std::string pull_type)
{
if (client_lists_.find(fid) != client_lists_.end())
{
printf("Waring : fid = %s already exist\n", fid.c_str());
return;
}
std::string dest_id;
std::map<std::string, std::string>::iterator itor;
for (itor = client_lists_.begin(); itor != client_lists_.end(); itor++)
{
if (itor->second.compare("rtmp") == 0)
{
dest_id = itor->first;
break;
}
}
std::string response;
if (itor != client_lists_.end())
{
client_lists_.insert(std::pair<std::string, std::string>(fid, "kcp"));
response = MqttRequest::JoinResponse(dest_id, "success");
}
else
{
client_lists_.insert(std::pair<std::string, std::string>(fid, "rtmp"));
response = MqttRequest::JoinResponse("", "success");
}
mqtt_wrapper_->SendRequest(fid, response);
printf("\n------------------------------------\n");
for (itor = client_lists_.begin(); itor != client_lists_.end(); itor++)
{
printf("id = %s, pull type = %s\n", itor->first.c_str(), itor->second.c_str());
}
printf("------------------------------------\n");
}
}
\ No newline at end of file
......@@ -26,6 +26,9 @@ namespace offcn
virtual void OnServerMessageArrived(std::string topic, std::string message);
private:
void OnRequstJoin(std::string fid, std::string pull_type);
private:
MqttWrapper *mqtt_wrapper_;
private:
......
......@@ -6,7 +6,7 @@ namespace offcn
/*
{
"path": "join",
"rid" : "xxxid",
"fid" : "xxxid",
"data" :
{
"pull_type" : "null"
......@@ -19,7 +19,7 @@ namespace offcn
Json::StreamWriterBuilder wBuilder;
root["path"] = "join";
root["rid"] = local_id;
root["fid"] = local_id;
Json::Value data;
data["pull_type"] = "null";
......@@ -31,7 +31,7 @@ namespace offcn
/*
{
"path": "update",
"rid" : "xxxid",
"fid" : "xxxid",
"data" :
{
"pull_type" : "rtmp" or "null" or "kcp"
......@@ -44,7 +44,7 @@ namespace offcn
Json::StreamWriterBuilder wBuilder;
root["path"] = "update";
root["rid"] = local_id;
root["fid"] = local_id;
Json::Value data;
data["pull_type"] = pull_type;
......@@ -56,7 +56,7 @@ namespace offcn
/*
{
"path": "offer/answer",
"rid" : "xxxid",
"fid" : "xxxid",
"tid" : "remoteid",
"candidate" : ""
}
......@@ -67,7 +67,7 @@ namespace offcn
Json::StreamWriterBuilder wBuilder;
root["path"] = type;
root["rid"] = local_id;
root["fid"] = local_id;
root["tid"] = remote_id;
root["candidate"] = candidate;
......@@ -94,6 +94,26 @@ namespace offcn
if (!GetJsonValue(req, root)) return false;
if (!root["path"] || !root["fid"]) return false;
path = root["path"].asString();
std::string fid = root["fid"].asString();
if (path.compare("join") == 0 ||
path.compare("update") == 0)
{
values["fid"] = fid;
values["pull_type"] = root["pull_type"].asString();
}
else if (path.compare("offer") == 0 ||
path.compare("answer") == 0)
{
values["fid"] = fid;
values["tid"] = root["tid"].asString();
values["candidate"] = root["candidate"].asString();
}
return true;
}
std::string MqttRequest::JoinResponse(std::string ids, std::string result)
......@@ -102,6 +122,7 @@ namespace offcn
Json::StreamWriterBuilder wBuilder;
root["path"] = "join";
root["ids"] = ids;
root["result"] = result;
return Json::writeString(wBuilder, root);
......
......@@ -4,9 +4,9 @@
namespace offcn
{
static const std::string kMqttUrl = "192.168.43.12";
static const std::string kMqttUrl = "101.200.63.143";//"192.168.43.12";
static const std::string kUserName = "admin";
static const std::string kPassWord = "admin";
static const std::string kPassWord = "public";
static struct Options
{
......@@ -59,7 +59,7 @@ namespace offcn
opts.onSuccess = mqttConnectSuccess;
opts.onFailure = mqttConnectFailure;
opts.context = this;
opts.connectTimeout = 10;
opts.connectTimeout = 5;
nRet = MQTTAsync_connect(mqtt_client_, &opts);
if (nRet != MQTTASYNC_SUCCESS)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment