You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

213 lines
5.4 KiB
C++

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#include "CacService.h"
#include <json/json.h>
#include <iostream>
#include "Utils.h"
#define KEEP_ALIVE 60
void CCacService::cac_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
((CCacService*)userdata)->MessageCallback(mosq, message);
}
void CCacService::cac_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
((CCacService*)userdata)->ConnectCallback(mosq, result);
}
void CCacService::cac_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
((CCacService*)userdata)->SubscribeCallback(mosq, mid, qos_count, granted_qos);
}
void CCacService::cac_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
((CCacService*)userdata)->LogCallback(mosq, level, str);
}
CCacService::CCacService() : m_session(true), m_mosq(NULL), m_conn(NULL)
{
}
CCacService::~CCacService()
{
if (m_mosq != NULL)
{
mosquitto_destroy(m_mosq);
m_mosq = NULL;
}
if (m_conn != NULL)
{
delete m_conn;
}
}
void CCacService::MessageCallback(struct mosquitto *mosq, const struct mosquitto_message *message)
{
if (message->payloadlen) {
printf("%s %s", message->topic, (char *)(message->payload));
}
else {
printf("%s (null)\n", message->topic);
}
fflush(stdout);
}
void CCacService::ConnectCallback(struct mosquitto *mosq, int result)
{
int i;
if (!result)
{
/* Subscribe to broker information topics on successful connect. */
// mosquitto_subscribe(mosq, NULL, "Gaiү:", 2);
}
else
{
fprintf(stderr, "Connect failed\n");
}
}
void CCacService::SubscribeCallback(struct mosquitto *mosq, int mid, int qos_count, const int *granted_qos)
{
int i;
printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for (i = 1; i < qos_count; i++)
{
printf(", %d", granted_qos[i]);
}
printf("\n");
}
void CCacService::LogCallback(struct mosquitto *mosq, int level, const char *str)
{
/* Pring all log messages regardless of level. */
printf("%s\n", str);
}
void CCacService::MosquittoLoop()
{
mosquitto_loop_forever(m_mosq, -1, 1);
int aa = 0;
}
void CCacService::ExportLoop(CMySQLAdo* conn)
{
while (1)
{
for (std::map<std::string, CSyncTable>::iterator it = m_syncTables.begin(); it != m_syncTables.end(); ++it)
{
Json::Value jonsObj = Json::objectValue;
jonsObj["AssetList"] = Json::arrayValue;
m_conn->ProcessTable(it->second, jonsObj["AssetList"]);
int mid = 0;
std::string data = CvtJSONToString(jonsObj);
#ifdef _DEBUG
std::cout << data << std::endl;
writeFile("D://testJson.txt", (const unsigned char *)data.c_str(), data.size());
#endif
if (m_mosq != NULL)
{
int res = mosquitto_publish(m_mosq, &mid, "external/data", data.size(), (void *)(data.c_str()), SYNC_QOS_LEVEL_0, false);
if (res == MOSQ_ERR_SUCCESS)
{
// Update time
int aa = 0;
}
else
{
if (MOSQ_ERR_ERRNO == res)
{
int errorno = errno;
char* errmsg = strerror(errorno);
int aa = 0;
}
int bbb = 0;
}
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
bool CCacService::Start(const std::map<std::string, std::string>& arguments)
{
//create mosquitto client
m_mosq = mosquitto_new(NULL, m_session, NULL);
if (!m_mosq)
{
// printf("create client failed..\n");
// mosquitto_lib_cleanup();
return false;
}
// Callback
mosquitto_user_data_set(m_mosq, (void *)this);
// mosquitto_log_callback_set(mosq, cac_log_callback);
mosquitto_connect_callback_set(m_mosq, cac_connect_callback);
mosquitto_message_callback_set(m_mosq, cac_message_callback);
// mosquitto_subscribe_callback_set(mosq, cac_subscribe_callback);
// connect to server
std::string mqttServer = FindArgument(arguments, ARG_KEY_MQTT_HOST);
std::string mqttPort = FindArgument(arguments, ARG_KEY_MQTT_PORT);
std::string mqttUser = FindArgument(arguments, ARG_KEY_MQTT_USER);
std::string mqttPwd = FindArgument(arguments, ARG_KEY_MQTT_PASSWORD);
mqttPwd = "AliOS%1688";
std::string dbServer = FindArgument(arguments, ARG_KEY_DB_HOST);
unsigned short dbPort = 3306; // arguments["mqttport"];
std::string dbName = FindArgument(arguments, ARG_KEY_DB_NAME);
// std::string dbName = FindArgument(arguments, ARG_KEY_DB_NAME);
std::string dbUser = FindArgument(arguments, ARG_KEY_DB_USER);
std::string dbPwd = FindArgument(arguments, ARG_KEY_DB_PASSWORD);
mosquitto_username_pw_set(m_mosq, mqttUser.c_str(), mqttPwd.c_str());
if (mosquitto_connect(m_mosq, mqttServer.c_str(), atoi(mqttPort.c_str()), KEEP_ALIVE))
{
mosquitto_destroy(m_mosq);
m_mosq = NULL;
fprintf(stderr, "Unable to connect.\n");
#ifndef _DEBUG
return false;
#endif
}
m_conn = new CMySQLAdo();
if (!m_conn->Connect(dbServer, dbUser, dbPwd, dbName))
{
return false;
}
m_conn->loadSyncTables(SYNC_CLIENT_CAC_MQTT, m_syncTables);
//ѭ<><D1AD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>Ϣ
// mosquitto_loop_forever(mosq, -1, 1);
if (m_mosq != NULL)
{
std::thread th1(std::mem_fun(&CCacService::MosquittoLoop), this);
m_mosqThread.swap(th1);
}
std::thread th2(std::mem_fun(&CCacService::ExportLoop), this, m_conn);
m_exportThread.swap(th2);
return true;
}
void CCacService::Join()
{
if (m_mosqThread.joinable()) m_mosqThread.join();
if (m_exportThread.joinable()) m_exportThread.join();
}
// unsigned int CCacService::ProcessTable(const std::string& tableName, const CSyncRecord& syncRecord, const CSyncTableFields& syncTable)
// {
// return 0;
// }