You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

213 lines
5.4 KiB
C++

9 months ago
#include "CacService.h"
#include <json/json.h>
#include <iostream>
#include "Utils.h"
#define KEEP_ALIVE 60
void CCacService::cac_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
((CCacService*)userdata)->MessageCallback(mosq, message);
}
void CCacService::cac_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
((CCacService*)userdata)->ConnectCallback(mosq, result);
}
void CCacService::cac_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
((CCacService*)userdata)->SubscribeCallback(mosq, mid, qos_count, granted_qos);
}
void CCacService::cac_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
((CCacService*)userdata)->LogCallback(mosq, level, str);
}
CCacService::CCacService() : m_session(true), m_mosq(NULL), m_conn(NULL)
{
}
CCacService::~CCacService()
{
if (m_mosq != NULL)
{
mosquitto_destroy(m_mosq);
m_mosq = NULL;
}
if (m_conn != NULL)
{
delete m_conn;
}
}
void CCacService::MessageCallback(struct mosquitto *mosq, const struct mosquitto_message *message)
{
if (message->payloadlen) {
printf("%s %s", message->topic, (char *)(message->payload));
}
else {
printf("%s (null)\n", message->topic);
}
fflush(stdout);
}
void CCacService::ConnectCallback(struct mosquitto *mosq, int result)
{
int i;
if (!result)
{
/* Subscribe to broker information topics on successful connect. */
// mosquitto_subscribe(mosq, NULL, "Gaiү:", 2);
}
else
{
fprintf(stderr, "Connect failed\n");
}
}
void CCacService::SubscribeCallback(struct mosquitto *mosq, int mid, int qos_count, const int *granted_qos)
{
int i;
printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for (i = 1; i < qos_count; i++)
{
printf(", %d", granted_qos[i]);
}
printf("\n");
}
void CCacService::LogCallback(struct mosquitto *mosq, int level, const char *str)
{
/* Pring all log messages regardless of level. */
printf("%s\n", str);
}
void CCacService::MosquittoLoop()
{
mosquitto_loop_forever(m_mosq, -1, 1);
int aa = 0;
}
void CCacService::ExportLoop(CMySQLAdo* conn)
{
while (1)
{
for (std::map<std::string, CSyncTable>::iterator it = m_syncTables.begin(); it != m_syncTables.end(); ++it)
{
Json::Value jonsObj = Json::objectValue;
jonsObj["AssetList"] = Json::arrayValue;
m_conn->ProcessTable(it->second, jonsObj["AssetList"]);
int mid = 0;
std::string data = CvtJSONToString(jonsObj);
#ifdef _DEBUG
std::cout << data << std::endl;
writeFile("D://testJson.txt", (const unsigned char *)data.c_str(), data.size());
#endif
if (m_mosq != NULL)
{
int res = mosquitto_publish(m_mosq, &mid, "external/data", data.size(), (void *)(data.c_str()), SYNC_QOS_LEVEL_0, false);
if (res == MOSQ_ERR_SUCCESS)
{
// Update time
int aa = 0;
}
else
{
if (MOSQ_ERR_ERRNO == res)
{
int errorno = errno;
char* errmsg = strerror(errorno);
int aa = 0;
}
int bbb = 0;
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
bool CCacService::Start(const std::map<std::string, std::string>& arguments)
{
//create mosquitto client
m_mosq = mosquitto_new(NULL, m_session, NULL);
if (!m_mosq)
{
// printf("create client failed..\n");
// mosquitto_lib_cleanup();
return false;
}
// Callback
mosquitto_user_data_set(m_mosq, (void *)this);
// mosquitto_log_callback_set(mosq, cac_log_callback);
mosquitto_connect_callback_set(m_mosq, cac_connect_callback);
mosquitto_message_callback_set(m_mosq, cac_message_callback);
// mosquitto_subscribe_callback_set(mosq, cac_subscribe_callback);
// connect to server
std::string mqttServer = FindArgument(arguments, ARG_KEY_MQTT_HOST);
std::string mqttPort = FindArgument(arguments, ARG_KEY_MQTT_PORT);
std::string mqttUser = FindArgument(arguments, ARG_KEY_MQTT_USER);
std::string mqttPwd = FindArgument(arguments, ARG_KEY_MQTT_PASSWORD);
mqttPwd = "AliOS%1688";
std::string dbServer = FindArgument(arguments, ARG_KEY_DB_HOST);
unsigned short dbPort = 3306; // arguments["mqttport"];
std::string dbName = FindArgument(arguments, ARG_KEY_DB_NAME);
// std::string dbName = FindArgument(arguments, ARG_KEY_DB_NAME);
std::string dbUser = FindArgument(arguments, ARG_KEY_DB_USER);
std::string dbPwd = FindArgument(arguments, ARG_KEY_DB_PASSWORD);
mosquitto_username_pw_set(m_mosq, mqttUser.c_str(), mqttPwd.c_str());
if (mosquitto_connect(m_mosq, mqttServer.c_str(), atoi(mqttPort.c_str()), KEEP_ALIVE))
{
mosquitto_destroy(m_mosq);
m_mosq = NULL;
fprintf(stderr, "Unable to connect.\n");
#ifndef _DEBUG
return false;
#endif
}
m_conn = new CMySQLAdo();
if (!m_conn->Connect(dbServer, dbUser, dbPwd, dbName))
{
return false;
}
m_conn->loadSyncTables(SYNC_CLIENT_CAC_MQTT, m_syncTables);
//ѭ<><D1AD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ
// mosquitto_loop_forever(mosq, -1, 1);
if (m_mosq != NULL)
{
std::thread th1(std::mem_fun(&CCacService::MosquittoLoop), this);
m_mosqThread.swap(th1);
}
std::thread th2(std::mem_fun(&CCacService::ExportLoop), this, m_conn);
m_exportThread.swap(th2);
return true;
}
void CCacService::Join()
{
if (m_mosqThread.joinable()) m_mosqThread.join();
if (m_exportThread.joinable()) m_exportThread.join();
}
// unsigned int CCacService::ProcessTable(const std::string& tableName, const CSyncRecord& syncRecord, const CSyncTableFields& syncTable)
// {
// return 0;
// }