|
|
#include "CacService.h"
|
|
|
#include <json/json.h>
|
|
|
#include <iostream>
|
|
|
|
|
|
#include "Utils.h"
|
|
|
|
|
|
#define KEEP_ALIVE 60
|
|
|
|
|
|
void CCacService::cac_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
|
|
|
{
|
|
|
((CCacService*)userdata)->MessageCallback(mosq, message);
|
|
|
}
|
|
|
|
|
|
void CCacService::cac_connect_callback(struct mosquitto *mosq, void *userdata, int result)
|
|
|
{
|
|
|
((CCacService*)userdata)->ConnectCallback(mosq, result);
|
|
|
}
|
|
|
|
|
|
void CCacService::cac_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
|
|
|
{
|
|
|
((CCacService*)userdata)->SubscribeCallback(mosq, mid, qos_count, granted_qos);
|
|
|
}
|
|
|
|
|
|
void CCacService::cac_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
|
|
|
{
|
|
|
((CCacService*)userdata)->LogCallback(mosq, level, str);
|
|
|
}
|
|
|
|
|
|
CCacService::CCacService() : m_session(true), m_mosq(NULL), m_conn(NULL)
|
|
|
{
|
|
|
}
|
|
|
|
|
|
CCacService::~CCacService()
|
|
|
{
|
|
|
if (m_mosq != NULL)
|
|
|
{
|
|
|
mosquitto_destroy(m_mosq);
|
|
|
m_mosq = NULL;
|
|
|
}
|
|
|
if (m_conn != NULL)
|
|
|
{
|
|
|
delete m_conn;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void CCacService::MessageCallback(struct mosquitto *mosq, const struct mosquitto_message *message)
|
|
|
{
|
|
|
if (message->payloadlen) {
|
|
|
printf("%s %s", message->topic, (char *)(message->payload));
|
|
|
}
|
|
|
else {
|
|
|
printf("%s (null)\n", message->topic);
|
|
|
}
|
|
|
fflush(stdout);
|
|
|
}
|
|
|
|
|
|
void CCacService::ConnectCallback(struct mosquitto *mosq, int result)
|
|
|
{
|
|
|
int i;
|
|
|
if (!result)
|
|
|
{
|
|
|
/* Subscribe to broker information topics on successful connect. */
|
|
|
// mosquitto_subscribe(mosq, NULL, "Gaiү:", 2);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
fprintf(stderr, "Connect failed\n");
|
|
|
}
|
|
|
}
|
|
|
|
|
|
void CCacService::SubscribeCallback(struct mosquitto *mosq, int mid, int qos_count, const int *granted_qos)
|
|
|
{
|
|
|
int i;
|
|
|
printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
|
|
|
for (i = 1; i < qos_count; i++)
|
|
|
{
|
|
|
printf(", %d", granted_qos[i]);
|
|
|
}
|
|
|
printf("\n");
|
|
|
}
|
|
|
|
|
|
void CCacService::LogCallback(struct mosquitto *mosq, int level, const char *str)
|
|
|
{
|
|
|
/* Pring all log messages regardless of level. */
|
|
|
printf("%s\n", str);
|
|
|
}
|
|
|
|
|
|
void CCacService::MosquittoLoop()
|
|
|
{
|
|
|
mosquitto_loop_forever(m_mosq, -1, 1);
|
|
|
int aa = 0;
|
|
|
}
|
|
|
|
|
|
void CCacService::ExportLoop(CMySQLAdo* conn)
|
|
|
{
|
|
|
while (1)
|
|
|
{
|
|
|
for (std::map<std::string, CSyncTable>::iterator it = m_syncTables.begin(); it != m_syncTables.end(); ++it)
|
|
|
{
|
|
|
Json::Value jonsObj = Json::objectValue;
|
|
|
jonsObj["AssetList"] = Json::arrayValue;
|
|
|
|
|
|
m_conn->ProcessTable(it->second, jonsObj["AssetList"]);
|
|
|
|
|
|
int mid = 0;
|
|
|
std::string data = CvtJSONToString(jonsObj);
|
|
|
#ifdef _DEBUG
|
|
|
std::cout << data << std::endl;
|
|
|
writeFile("D://testJson.txt", (const unsigned char *)data.c_str(), data.size());
|
|
|
#endif
|
|
|
if (m_mosq != NULL)
|
|
|
{
|
|
|
int res = mosquitto_publish(m_mosq, &mid, "external/data", data.size(), (void *)(data.c_str()), SYNC_QOS_LEVEL_0, false);
|
|
|
if (res == MOSQ_ERR_SUCCESS)
|
|
|
{
|
|
|
// Update time
|
|
|
int aa = 0;
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
if (MOSQ_ERR_ERRNO == res)
|
|
|
{
|
|
|
int errorno = errno;
|
|
|
char* errmsg = strerror(errorno);
|
|
|
|
|
|
int aa = 0;
|
|
|
}
|
|
|
int bbb = 0;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
std::this_thread::sleep_for(std::chrono::milliseconds(200));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
bool CCacService::Start(const std::map<std::string, std::string>& arguments)
|
|
|
{
|
|
|
//create mosquitto client
|
|
|
m_mosq = mosquitto_new(NULL, m_session, NULL);
|
|
|
if (!m_mosq)
|
|
|
{
|
|
|
// printf("create client failed..\n");
|
|
|
// mosquitto_lib_cleanup();
|
|
|
return false;
|
|
|
}
|
|
|
// Callback
|
|
|
mosquitto_user_data_set(m_mosq, (void *)this);
|
|
|
// mosquitto_log_callback_set(mosq, cac_log_callback);
|
|
|
mosquitto_connect_callback_set(m_mosq, cac_connect_callback);
|
|
|
mosquitto_message_callback_set(m_mosq, cac_message_callback);
|
|
|
// mosquitto_subscribe_callback_set(mosq, cac_subscribe_callback);
|
|
|
// connect to server
|
|
|
std::string mqttServer = FindArgument(arguments, ARG_KEY_MQTT_HOST);
|
|
|
std::string mqttPort = FindArgument(arguments, ARG_KEY_MQTT_PORT);
|
|
|
|
|
|
std::string mqttUser = FindArgument(arguments, ARG_KEY_MQTT_USER);
|
|
|
std::string mqttPwd = FindArgument(arguments, ARG_KEY_MQTT_PASSWORD);
|
|
|
|
|
|
mqttPwd = "AliOS%1688";
|
|
|
|
|
|
std::string dbServer = FindArgument(arguments, ARG_KEY_DB_HOST);
|
|
|
unsigned short dbPort = 3306; // arguments["mqttport"];
|
|
|
std::string dbName = FindArgument(arguments, ARG_KEY_DB_NAME);
|
|
|
// std::string dbName = FindArgument(arguments, ARG_KEY_DB_NAME);
|
|
|
std::string dbUser = FindArgument(arguments, ARG_KEY_DB_USER);
|
|
|
std::string dbPwd = FindArgument(arguments, ARG_KEY_DB_PASSWORD);
|
|
|
|
|
|
mosquitto_username_pw_set(m_mosq, mqttUser.c_str(), mqttPwd.c_str());
|
|
|
|
|
|
if (mosquitto_connect(m_mosq, mqttServer.c_str(), atoi(mqttPort.c_str()), KEEP_ALIVE))
|
|
|
{
|
|
|
mosquitto_destroy(m_mosq);
|
|
|
m_mosq = NULL;
|
|
|
fprintf(stderr, "Unable to connect.\n");
|
|
|
#ifndef _DEBUG
|
|
|
return false;
|
|
|
#endif
|
|
|
}
|
|
|
|
|
|
m_conn = new CMySQLAdo();
|
|
|
if (!m_conn->Connect(dbServer, dbUser, dbPwd, dbName))
|
|
|
{
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
m_conn->loadSyncTables(SYNC_CLIENT_CAC_MQTT, m_syncTables);
|
|
|
|
|
|
//ѭ<><D1AD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ
|
|
|
// mosquitto_loop_forever(mosq, -1, 1);
|
|
|
if (m_mosq != NULL)
|
|
|
{
|
|
|
std::thread th1(std::mem_fun(&CCacService::MosquittoLoop), this);
|
|
|
m_mosqThread.swap(th1);
|
|
|
}
|
|
|
|
|
|
std::thread th2(std::mem_fun(&CCacService::ExportLoop), this, m_conn);
|
|
|
m_exportThread.swap(th2);
|
|
|
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
void CCacService::Join()
|
|
|
{
|
|
|
if (m_mosqThread.joinable()) m_mosqThread.join();
|
|
|
if (m_exportThread.joinable()) m_exportThread.join();
|
|
|
}
|
|
|
|
|
|
// unsigned int CCacService::ProcessTable(const std::string& tableName, const CSyncRecord& syncRecord, const CSyncTableFields& syncTable)
|
|
|
// {
|
|
|
// return 0;
|
|
|
// }
|