#include "CacService.h" #include #include #include "Utils.h" #define KEEP_ALIVE 60 void CCacService::cac_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { ((CCacService*)userdata)->MessageCallback(mosq, message); } void CCacService::cac_connect_callback(struct mosquitto *mosq, void *userdata, int result) { ((CCacService*)userdata)->ConnectCallback(mosq, result); } void CCacService::cac_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { ((CCacService*)userdata)->SubscribeCallback(mosq, mid, qos_count, granted_qos); } void CCacService::cac_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str) { ((CCacService*)userdata)->LogCallback(mosq, level, str); } CCacService::CCacService() : m_session(true), m_mosq(NULL), m_conn(NULL) { } CCacService::~CCacService() { if (m_mosq != NULL) { mosquitto_destroy(m_mosq); m_mosq = NULL; } if (m_conn != NULL) { delete m_conn; } } void CCacService::MessageCallback(struct mosquitto *mosq, const struct mosquitto_message *message) { if (message->payloadlen) { printf("%s %s", message->topic, (char *)(message->payload)); } else { printf("%s (null)\n", message->topic); } fflush(stdout); } void CCacService::ConnectCallback(struct mosquitto *mosq, int result) { int i; if (!result) { /* Subscribe to broker information topics on successful connect. */ // mosquitto_subscribe(mosq, NULL, "Gai爷:", 2); } else { fprintf(stderr, "Connect failed\n"); } } void CCacService::SubscribeCallback(struct mosquitto *mosq, int mid, int qos_count, const int *granted_qos) { int i; printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); for (i = 1; i < qos_count; i++) { printf(", %d", granted_qos[i]); } printf("\n"); } void CCacService::LogCallback(struct mosquitto *mosq, int level, const char *str) { /* Pring all log messages regardless of level. */ printf("%s\n", str); } void CCacService::MosquittoLoop() { mosquitto_loop_forever(m_mosq, -1, 1); int aa = 0; } void CCacService::ExportLoop(CMySQLAdo* conn) { while (1) { for (std::map::iterator it = m_syncTables.begin(); it != m_syncTables.end(); ++it) { Json::Value jonsObj = Json::objectValue; jonsObj["AssetList"] = Json::arrayValue; m_conn->ProcessTable(it->second, jonsObj["AssetList"]); int mid = 0; std::string data = CvtJSONToString(jonsObj); #ifdef _DEBUG std::cout << data << std::endl; writeFile("D://testJson.txt", (const unsigned char *)data.c_str(), data.size()); #endif if (m_mosq != NULL) { int res = mosquitto_publish(m_mosq, &mid, "external/data", data.size(), (void *)(data.c_str()), SYNC_QOS_LEVEL_0, false); if (res == MOSQ_ERR_SUCCESS) { // Update time int aa = 0; } else { if (MOSQ_ERR_ERRNO == res) { int errorno = errno; char* errmsg = strerror(errorno); int aa = 0; } int bbb = 0; } } } std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } bool CCacService::Start(const std::map& arguments) { //create mosquitto client m_mosq = mosquitto_new(NULL, m_session, NULL); if (!m_mosq) { // printf("create client failed..\n"); // mosquitto_lib_cleanup(); return false; } // Callback mosquitto_user_data_set(m_mosq, (void *)this); // mosquitto_log_callback_set(mosq, cac_log_callback); mosquitto_connect_callback_set(m_mosq, cac_connect_callback); mosquitto_message_callback_set(m_mosq, cac_message_callback); // mosquitto_subscribe_callback_set(mosq, cac_subscribe_callback); // connect to server std::string mqttServer = FindArgument(arguments, ARG_KEY_MQTT_HOST); std::string mqttPort = FindArgument(arguments, ARG_KEY_MQTT_PORT); std::string mqttUser = FindArgument(arguments, ARG_KEY_MQTT_USER); std::string mqttPwd = FindArgument(arguments, ARG_KEY_MQTT_PASSWORD); mqttPwd = "AliOS%1688"; std::string dbServer = FindArgument(arguments, ARG_KEY_DB_HOST); unsigned short dbPort = 3306; // arguments["mqttport"]; std::string dbName = FindArgument(arguments, ARG_KEY_DB_NAME); // std::string dbName = FindArgument(arguments, ARG_KEY_DB_NAME); std::string dbUser = FindArgument(arguments, ARG_KEY_DB_USER); std::string dbPwd = FindArgument(arguments, ARG_KEY_DB_PASSWORD); mosquitto_username_pw_set(m_mosq, mqttUser.c_str(), mqttPwd.c_str()); if (mosquitto_connect(m_mosq, mqttServer.c_str(), atoi(mqttPort.c_str()), KEEP_ALIVE)) { mosquitto_destroy(m_mosq); m_mosq = NULL; fprintf(stderr, "Unable to connect.\n"); #ifndef _DEBUG return false; #endif } m_conn = new CMySQLAdo(); if (!m_conn->Connect(dbServer, dbUser, dbPwd, dbName)) { return false; } m_conn->loadSyncTables(SYNC_CLIENT_CAC_MQTT, m_syncTables); //循环处理网络消息 // mosquitto_loop_forever(mosq, -1, 1); if (m_mosq != NULL) { std::thread th1(std::mem_fun(&CCacService::MosquittoLoop), this); m_mosqThread.swap(th1); } std::thread th2(std::mem_fun(&CCacService::ExportLoop), this, m_conn); m_exportThread.swap(th2); return true; } void CCacService::Join() { if (m_mosqThread.joinable()) m_mosqThread.join(); if (m_exportThread.joinable()) m_exportThread.join(); } // unsigned int CCacService::ProcessTable(const std::string& tableName, const CSyncRecord& syncRecord, const CSyncTableFields& syncTable) // { // return 0; // }