// View.h : interface of the CView class // ///////////////////////////////////////////////////////////////////////////// #pragma once #define SERVICES_COMMAND_TOPIC "/v1/devices/MSRDT-A/command" #define SERVICES_RESPONSE_TOPIC "/v1/devices/MSRDT-A/commandResponse" #define DEVICES_DATA_TOPIC "/v1/devices/MSRDT-A/datas" #define MQTT_SERVER "61.169.135.146" #define MQTT_PORT 51001 #define MQTT_USERNAME "test" #define MQTT_PASSWORD "123456" #define MQTT_KEEP_ALIVE 60 #define SYNC_CLIENT_CAC_MQTT 10 #define SYNC_QOS_LEVEL_0 0 #define WM_NEW_MSG (WM_USER + 2) class CView : public CDialogImpl, public CDialogResize { public: enum { IDD = IDD_MQTTCLIENT_FORM }; struct mosquitto *m_mosq; bool m_session; std::thread m_mqttThread; CString m_userName; CString m_password; BOOL PreTranslateMessage(MSG* pMsg) { return CWindow::IsDialogMessage(pMsg); } BEGIN_DLGRESIZE_MAP(CView) DLGRESIZE_CONTROL(IDC_SEND, DLSZ_MOVE_X) DLGRESIZE_CONTROL(IDC_CLEAR, DLSZ_MOVE_X) DLGRESIZE_CONTROL(IDC_TOPICES, DLSZ_SIZE_X) DLGRESIZE_CONTROL(IDC_TO_SEND, DLSZ_SIZE_X) DLGRESIZE_CONTROL(IDC_TO_RECV, DLSZ_SIZE_X | DLSZ_SIZE_Y) END_DLGRESIZE_MAP() BEGIN_MSG_MAP(CView) MESSAGE_HANDLER(WM_INITDIALOG, OnInitDialog) MESSAGE_HANDLER(WM_DESTROY, OnDestroy) MESSAGE_HANDLER(WM_NEW_MSG, OnNewMsgArrived) COMMAND_HANDLER(IDC_SEND, BN_CLICKED, OnSendCmd) COMMAND_HANDLER(IDC_CLEAR, BN_CLICKED, OnClearCmd) CHAIN_MSG_MAP(CDialogResize) END_MSG_MAP() // Handler prototypes (uncomment arguments if needed): // LRESULT MessageHandler(UINT /*uMsg*/, WPARAM /*wParam*/, LPARAM /*lParam*/, BOOL& /*bHandled*/) // LRESULT CommandHandler(WORD /*wNotifyCode*/, WORD /*wID*/, HWND /*hWndCtl*/, BOOL& /*bHandled*/) // LRESULT NotifyHandler(int /*idCtrl*/, LPNMHDR /*pnmh*/, BOOL& /*bHandled*/) LRESULT OnInitDialog(UINT /*uMsg*/, WPARAM /*wParam*/, LPARAM /*lParam*/, BOOL& /*bHandled*/) { DlgResize_Init(); m_userName = TEXT(MQTT_USERNAME); m_password = TEXT(MQTT_PASSWORD); LoadUserAndPassword(); CEdit edt = GetDlgItem(IDC_TO_RECV); edt.SetLimitText(-1); CComboBox cmb = GetDlgItem(IDC_TOPICES); cmb.AddString(TEXT(SERVICES_COMMAND_TOPIC)); cmb.AddString(TEXT(SERVICES_RESPONSE_TOPIC)); cmb.AddString(TEXT(DEVICES_DATA_TOPIC)); cmb.SetCurSel(0); m_mosq = NULL; m_session = true; mosquitto_lib_init(); //create mosquitto client const char * clientId = NULL; m_mosq = mosquitto_new(clientId, m_session, NULL); if (!m_mosq) { // printf("create client failed..\n"); // mosquitto_lib_cleanup(); return false; } // Callback mosquitto_user_data_set(m_mosq, (void *)this); // mosquitto_log_callback_set(mosq, cac_log_callback); mosquitto_connect_callback_set(m_mosq, cac_connect_callback); mosquitto_message_callback_set(m_mosq, cac_message_callback); mosquitto_subscribe_callback_set(m_mosq, cac_subscribe_callback); // connect to server CW2A userName(CT2W(m_userName), CP_UTF8); CW2A password(CT2W(m_password), CP_UTF8); mosquitto_username_pw_set(m_mosq, (LPCSTR)userName, (LPCSTR)password); if (mosquitto_connect(m_mosq, MQTT_SERVER, MQTT_PORT, MQTT_KEEP_ALIVE)) { mosquitto_destroy(m_mosq); m_mosq = NULL; fprintf(stderr, "Unable to connect.\n"); #ifndef _DEBUG return false; #endif } /* Subscribe to broker information topics on successful connect. */ std::thread th = std::thread(MQTTThreadProc, this); m_mqttThread.swap(th); int mid = 0; // mosquitto_subscribe(m_mosq, &mid, SERVICES_COMMAND_TOPIC, SYNC_QOS_LEVEL_0); // mosquitto_subscribe(m_mosq, &mid, SERVICES_RESPONSE_TOPIC, SYNC_QOS_LEVEL_0); // mosquitto_subscribe(m_mosq, &mid, DEVICES_DATA_TOPIC, SYNC_QOS_LEVEL_0); return TRUE; } LRESULT OnDestroy(UINT /*uMsg*/, WPARAM /*wParam*/, LPARAM /*lParam*/, BOOL& /*bHandled*/) { if (m_mosq != NULL) { int mid = 0; mosquitto_unsubscribe(m_mosq, &mid, SERVICES_COMMAND_TOPIC); mosquitto_unsubscribe(m_mosq, &mid, SERVICES_RESPONSE_TOPIC); mosquitto_unsubscribe(m_mosq, &mid, DEVICES_DATA_TOPIC); mosquitto_disconnect(m_mosq); mosquitto_loop_stop(m_mosq, false); if (m_mqttThread.joinable()) { m_mqttThread.join(); } mosquitto_destroy(m_mosq); m_mosq = NULL; } mosquitto_lib_cleanup(); return TRUE; } LRESULT OnClearCmd(WORD /*wNotifyCode*/, WORD wID, HWND hWndCtl, BOOL& /*bHandled*/) { CEdit edt = GetDlgItem(IDC_TO_RECV); edt.SetWindowText(TEXT("")); return 1; } LRESULT OnSendCmd(WORD /*wNotifyCode*/, WORD wID, HWND hWndCtl, BOOL& /*bHandled*/) { CString text; CEdit edt = GetDlgItem(IDC_TO_SEND); edt.GetWindowText(text); if (text.GetLength() == 0) { return TRUE; } CString topic; CComboBox cmb = GetDlgItem(IDC_TOPICES); cmb.GetWindowText(topic); if (topic.GetLength() == 0) { return TRUE; } CW2A utf8(CT2W(text), CP_UTF8); CW2A utf8Topic(CT2W(topic), CP_UTF8); int mid = 0; int res = mosquitto_publish(m_mosq, &mid, utf8Topic, strlen((LPCSTR)utf8), (void *)((LPCSTR)utf8), SYNC_QOS_LEVEL_0, false); if (res == MOSQ_ERR_SUCCESS) { // Update time int aa = 0; } else { if (MOSQ_ERR_ERRNO == res) { int errorno = errno; char* errmsg = strerror(errorno); int aa = 0; } int bbb = 0; } return TRUE; } LRESULT OnNewMsgArrived(UINT /*uMsg*/, WPARAM wParam, LPARAM lParam, BOOL& /*bHandled*/) { LPTSTR pszMsg = (LPTSTR)lParam; LPTSTR pszTopic = (LPTSTR)wParam; CString text; CEdit edt = GetDlgItem(IDC_TO_RECV); edt.GetWindowText(text); edt.AppendText(TEXT("\r\n")); if (pszTopic != NULL) { edt.AppendText(TEXT("\r\n")); edt.AppendText(FormatLocalTime()); edt.AppendText(TEXT(" ")); edt.AppendText(pszTopic); delete[] pszTopic; } if (pszMsg != NULL) { edt.AppendText(TEXT("\r\n")); edt.AppendText(pszMsg); delete[] pszMsg; } return TRUE; } int MQTTProc() { mosquitto_loop_forever(m_mosq, -1, 1); return 0; } static int MQTTThreadProc(CView *pThis); static void cac_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message); static void cac_connect_callback(struct mosquitto *mosq, void *userdata, int result); static void cac_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos); static void cac_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str); LPTSTR ConvertStr(const char* utf8) { CW2T str(CA2W(utf8, CP_UTF8)); size_t len = _tcslen((LPCTSTR)str); TCHAR *pszT = new TCHAR[len + 1]; pszT[len] = 0; _tcscpy(pszT, (LPCTSTR)str); return pszT; } void MessageCallback(struct mosquitto *mosq, const struct mosquitto_message *message) { if (message->payloadlen) { LPTSTR pszTopic = ConvertStr((char *)(message->topic)); LPTSTR pszMsg = NULL; bool isJson = false; char* sz = (char *)message->payload; if (sz[0] == '{') { Json::Value jsonObj; Json::CharReaderBuilder builder; std::unique_ptr reader(builder.newCharReader()); isJson = reader->parse(sz, sz + message->payloadlen, &jsonObj, NULL); if (isJson) { Json::StreamWriterBuilder writeBuilder; writeBuilder["indentation"] = " "; // assume default for comments is None writeBuilder["emitUTF8"] = true; std::string content = Json::writeString(writeBuilder, jsonObj); size_t pos = 0; std::string replace = "\r\n"; while ((pos = content.find('\n', pos)) != std::string::npos) { content.replace(pos, 1, replace); pos += replace.length(); } pszMsg = ConvertStr(content.c_str()); } } if (pszMsg == NULL) { pszMsg = ConvertStr((char *)(message->payload)); } ::PostMessage(m_hWnd, WM_NEW_MSG, (WPARAM)pszTopic, (LPARAM)pszMsg); } else { // LPTSTR pszMsg = ConvertStr((char *)(message->payload)); LPTSTR pszTopic = ConvertStr((char *)(message->topic)); ::PostMessage(m_hWnd, WM_NEW_MSG, (WPARAM)pszTopic, 0); } // fflush(stdout); } void ConnectCallback(struct mosquitto *mosq, int result) { int i; if (!result) { /* Subscribe to broker information topics on successful connect. */ // std::thread th = std::thread(MQTTThreadProc, this); // m_mqttThread.swap(th); int mid = 0; int res = 0; char* subs[] = { SERVICES_COMMAND_TOPIC, SERVICES_RESPONSE_TOPIC, DEVICES_DATA_TOPIC}; res = mosquitto_subscribe_multiple(m_mosq, &mid, sizeof(subs) / sizeof(char *), subs, SYNC_QOS_LEVEL_0, 0, NULL); if (res != MOSQ_ERR_SUCCESS) { int aa = 0; } // res = mosquitto_subscribe(m_mosq, &mid, SERVICES_COMMAND_TOPIC, SYNC_QOS_LEVEL_0); // res = mosquitto_subscribe(m_mosq, &mid, SERVICES_RESPONSE_TOPIC, SYNC_QOS_LEVEL_0); // res = mosquitto_subscribe(m_mosq, &mid, DEVICES_DATA_TOPIC, SYNC_QOS_LEVEL_0); if (res != MOSQ_ERR_SUCCESS) { int aa = 0; } } else { fprintf(stderr, "Connect failed\n"); } } void SubscribeCallback(struct mosquitto *mosq, int mid, int qos_count, const int *granted_qos) { int i; printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); for (i = 1; i < qos_count; i++) { printf(", %d", granted_qos[i]); } printf("\n"); } void LogCallback(struct mosquitto *mosq, int level, const char *str) { /* Pring all log messages regardless of level. */ printf("%s\n", str); } CString FormatLocalTime(time_t t = 0) { if (t == 0) { time(&t); } struct std::tm* tp = localtime(&t); CString str; str.Format(TEXT("%02d-%02d %02d:%02d:%02d"), tp->tm_mon + 1, tp->tm_mday, tp->tm_hour, tp->tm_min, tp->tm_sec); return str; } void LoadUserAndPassword() { TCHAR buf[MAX_PATH] = { 0 }; CRegKey key; if (key.Open(HKEY_CURRENT_USER, TEXT("Software\\XYPower\\MQTTClient"), KEY_READ) == ERROR_SUCCESS) { ULONG bufLen = MAX_PATH; if (key.QueryStringValue(TEXT("UserName"), buf, &bufLen) == ERROR_SUCCESS) { m_userName = buf; } bufLen = MAX_PATH; if (key.QueryStringValue(TEXT("Password"), buf, &bufLen) == ERROR_SUCCESS) { m_password = buf; } key.Close(); } } }; int CView::MQTTThreadProc(CView *pThis) { return pThis->MQTTProc(); } void CView::cac_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { ((CView*)userdata)->MessageCallback(mosq, message); } void CView::cac_connect_callback(struct mosquitto *mosq, void *userdata, int result) { ((CView*)userdata)->ConnectCallback(mosq, result); } void CView::cac_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { ((CView*)userdata)->SubscribeCallback(mosq, mid, qos_count, granted_qos); } void CView::cac_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str) { ((CView*)userdata)->LogCallback(mosq, level, str); }