From 7b6b1baf1b39fcdede849d56ae30ab57be44c041 Mon Sep 17 00:00:00 2001 From: BlueMatthew Date: Thu, 23 Nov 2023 20:32:26 +0800 Subject: [PATCH] =?UTF-8?q?=E9=95=BF=E6=B3=B0=E6=9C=80=E7=BB=88=E7=89=88?= =?UTF-8?q?=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Makefile | 11 +- include/HTDataStruct.h | 5 +- include/HTGlobal.h | 1 + include/HTTcpSocket.h | 2 +- src/HTGlobal.cpp | 2 + src/HTIEC104.cpp | 350 ++++++++++++++++++++++++++--------------- src/HTService.cpp | 14 ++ src/HTTcpSocket.cpp | 33 +++- 8 files changed, 284 insertions(+), 134 deletions(-) diff --git a/Makefile b/Makefile index d926e0f..09916c0 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,11 @@ MYSQL_LIB_PATH = /usr/lib64/mysql MYSQL_LIB_FILE = mysqlclient #CFLAGS = -c -Werror -g -D_REENTRANT -Wformat -Wsign-compare -O3 -ansi -CFLAGS = -c -Werror -g -D_DEBUG -D_REENTRANT -Wformat -Wsign-compare -O3 -ansi +ifeq ($(MODE), DEBUG) +CFLAGS = -c -Werror -g -D_DEBUG -D_REENTRANT -Wformat -Wsign-compare -O0 -ansi +else +CFLAGS = -c -Werror -g -D_REENTRANT -Wformat -Wsign-compare -O3 -ansi +endif LIBS = -L$(MYSQL_LIB_PATH) -L/usr/local/lib64 -l$(MYSQL_LIB_FILE) -lopencv_core -lopencv_imgproc -lopencv_highgui -lopencv_videoio -lopencv_imgcodecs -lz -ldl -lpng -lnsl -luuid -lpthread -lm -lc -lstdc++ INCPATH = -I./include -I$(MYSQL_PATH)/include @@ -36,8 +40,13 @@ TARGET = ./bin/iecserver #TARGET = ./bin/IMGService # for link +ifeq ($(MODE), DEBUG) +$(TARGET):$(OBJS) + $(CC) -Wall -O0 -o $(TARGET) $(OBJS) $(LIBS) +else $(TARGET):$(OBJS) $(CC) -Wall -O3 -o $(TARGET) $(OBJS) $(LIBS) +endif $(OBJS_PATH)/%.o : ./src/%.cpp $(CC) $(CFLAGS) $(INCPATH) $< -o $@ diff --git a/include/HTDataStruct.h b/include/HTDataStruct.h index 98dc0ee..7c7d0da 100644 --- a/include/HTDataStruct.h +++ b/include/HTDataStruct.h @@ -36,7 +36,10 @@ typedef struct { typedef struct { bool isConnect; // true: connected false: not connected int listenid; // listen handle - int sockid; // socket connect handle + int sockid; // socket connect handle +#ifndef _WIN32 + int pipeFds[2]; // When new connection arrived, wake socket up with the pipe +#endif time_t last_time; // last send/recv time bool is_yk_ack ; // 是否收到遥控信号 diff --git a/include/HTGlobal.h b/include/HTGlobal.h index 76f3081..9bb41d6 100644 --- a/include/HTGlobal.h +++ b/include/HTGlobal.h @@ -119,6 +119,7 @@ extern pthread_t thread_handle_gishold; // 断路器断开数据入库线程 extern pthread_t thread_handle_active; // 心跳线程 extern pthread_t thread_handle_linkmgr; // 链路管理线程 +extern pthread_t thread_handle_recv; // 链路管理线程 extern pthread_t thread_handle_cache; // 内存同步线程句柄 extern pthread_t thread_handle_opecvimg; // 图片识别线程句柄 diff --git a/include/HTTcpSocket.h b/include/HTTcpSocket.h index 1674e90..edb8e0b 100644 --- a/include/HTTcpSocket.h +++ b/include/HTTcpSocket.h @@ -62,7 +62,7 @@ public: int getLocalSocketInfo(int sockid, unsigned char *sip, unsigned short *sport); int tcpRecvBuffer(int sockid, char* pBuffer,int nMaxLength, int timeout=6); - int tcpSendBuffer(int sockid, const char *pBuffer, int length); + int tcpSendBuffer(int sockid, const char *pBuffer, int length, int pipeFd); int tcpAcceptSocket(int sockid); int tcpCloseSocket(int sockfd); diff --git a/src/HTGlobal.cpp b/src/HTGlobal.cpp index e341c11..f3437f9 100644 --- a/src/HTGlobal.cpp +++ b/src/HTGlobal.cpp @@ -77,6 +77,8 @@ pthread_t thread_handle_busi_data; // 业务数据如何线程句柄 pthread_t thread_handle_pingce; // IEC104评测数据入库线程句柄 pthread_t thread_handle_gishold; // 断路器断开数据入库线程句柄 +pthread_t thread_handle_recv; // + pthread_t thread_handle_active; // 心跳线程 pthread_t thread_handle_linkmgr; // 链路管理线程 pthread_t thread_handle_cache; // 内存同步线程句柄 diff --git a/src/HTIEC104.cpp b/src/HTIEC104.cpp index 4696b36..8a51de7 100644 --- a/src/HTIEC104.cpp +++ b/src/HTIEC104.cpp @@ -732,6 +732,11 @@ static void showIEC104Conf() void InitIECENV() { memset(&g_IecCtrl, 0x00, sizeof(ST_IEC104_CTRL)); + g_IecCtrl.sockid = -1; +#ifndef _WIN32 + g_IecCtrl.pipeFds[0] = -1; + g_IecCtrl.pipeFds[1] = -1; +#endif g_IecCtrl.t0 = (int)g_TConfig.getTimeout0(); g_IecCtrl.t1 = (int)g_TConfig.getTimeout1(); g_IecCtrl.t2 = (int)g_TConfig.getTimeout2(); @@ -742,6 +747,10 @@ void InitIECENV() g_IecCtrl.timer_U_Testflag = true; // 初始化为真,如果过程中有超时,置为false g_IecCtrl.m_gis_change = false; g_IecCtrl.time_action = time(NULL); // 总召间隔时间更新 + +#ifndef _WIN32 + pipe(g_IecCtrl.pipeFds); +#endif } void IEC104EnvLoad() { @@ -770,6 +779,20 @@ void IEC104EnvLoad() void IEC104EnvFree() { +#ifndef _WIN32 + if (g_IecCtrl.pipeFds[1] != -1) + { + int data = -1; + write(g_IecCtrl.pipeFds[1], &data, sizeof(data)); + close(g_IecCtrl.pipeFds[1]); + g_IecCtrl.pipeFds[1] = -1; + } + if (g_IecCtrl.pipeFds[0] != -1) + { + close(g_IecCtrl.pipeFds[0]); + g_IecCtrl.pipeFds[0] = -1; + } +#endif mutex_close(g_list_pack_mutex); mutex_close(g_list_dbset_mutex); mutex_close(g_sendno_mutex); @@ -925,7 +948,11 @@ int SendMsgFormatU(int cmd) header.cntl3 = 0; // g_IecCtrl.usRecvNo & 0xFE; header.cntl4 = 0; //(g_IecCtrl.usRecvNo >> 8) & 0xFF; - iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&header, sizeof(ST_APCI)); +#ifndef _WIN32 + iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&header, sizeof(ST_APCI), g_IecCtrl.pipeFds[0]); +#else + iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&header, sizeof(ST_APCI), -1); +#endif if (iRet == ErrException) { vPrtLogMsg(LOG_ERROR, iRet, "send mesg failed, sockid:%d msg:%s will close socket", g_IecCtrl.sockid, strerror(errno)); @@ -961,13 +988,20 @@ int SendMsgFormatS(unsigned short sendno) //header.cntl3 = (g_IecCtrl.TxCounter & 0xFE); // S-Format //header.cntl4 = (g_IecCtrl.TxCounter >> 8) & 0xFF; //S格式确认包,cnt1=0x01, cnt2=0x00,是固定不变的。 - - if ((iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&header, sizeof(ST_APCI))) < 0) + int sockid = g_IecCtrl.sockid; +#ifndef _WIN32 + if ((iRet = g_Tcp.tcpSendBuffer(sockid, (const char*)&header, sizeof(ST_APCI), g_IecCtrl.pipeFds[0])) < 0) +#else + if ((iRet = g_Tcp.tcpSendBuffer(sockid, (const char*)&header, sizeof(ST_APCI), -1)) < 0) +#endif { vPrtLogMsg(LOG_WARNG, iRet, "send Format S failed with error: %s, errno=%d will close socket", strerror(errno), errno); - g_Tcp.clear_tcp_buffer(g_IecCtrl.sockid, MAX_SBUFF_TCP); - g_Tcp.tcpCloseSocket(g_IecCtrl.sockid); - g_IecCtrl.isConnect = false; + if (sockid == g_IecCtrl.sockid) + { + g_Tcp.clear_tcp_buffer(g_IecCtrl.sockid, MAX_SBUFF_TCP); + g_Tcp.tcpCloseSocket(g_IecCtrl.sockid); + g_IecCtrl.isConnect = false; + } return iRet; } //g_IecCtrl.usSendNo++; @@ -994,9 +1028,15 @@ int SendMsgFormatI(unsigned char *msgbuf, unsigned int len) header->cntl4 = ((g_IecCtrl.usRecvNo >> 8) & 0xFF); memcpy(buf + sizeof(ST_APCI), msgbuf, len); - if ((iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)buf, len + sizeof(ST_APCI))) < 0) + int sockid = g_IecCtrl.sockid; +#ifndef _WIN32 + if ((iRet = g_Tcp.tcpSendBuffer(sockid, (const char*)buf, len + sizeof(ST_APCI), g_IecCtrl.pipeFds[0])) < 0) +#else + if ((iRet = g_Tcp.tcpSendBuffer(sockid, (const char*)buf, len + sizeof(ST_APCI), -1)) < 0) +#endif { vPrtLogMsg(LOG_WARNG, iRet, "send Format I failed with error: %s, errno=%d will close socket", strerror(errno), errno); + if (sockid == g_IecCtrl.sockid) g_Tcp.clear_tcp_buffer(g_IecCtrl.sockid, MAX_SBUFF_TCP); g_Tcp.tcpCloseSocket(g_IecCtrl.sockid); g_IecCtrl.isConnect = false; @@ -1066,20 +1106,34 @@ void* thread_listen_proc(void * arg) continue; } mutex_lock(g_IecCtrl_mutex); - if (g_Tcp.tcpIsConnected(g_IecCtrl.sockid)) - { - mutex_unlock(g_IecCtrl_mutex); - vPrtLogMsg(LOG_DEBUG, errno,"Previous link is connected, won't accept accept_fd=%d", accept_fd); - _SLEEP(1000); - continue; - } + int org_socket = g_IecCtrl.sockid; + if (org_socket != -1) + { + // Will close it, wake it up + +#ifndef _WIN32 + if (g_IecCtrl.pipeFds[1] != -1) + { + write(g_IecCtrl.pipeFds[1], &org_socket, sizeof(org_socket)); + vPrtLogMsg(LOG_ERROR, errno, "Wake up socket select via pipe,sockid:%d", org_socket); + } +#endif + } + // g_Tcp.tcpCloseSocket(org_socket); + + mutex_unlock(g_IecCtrl_mutex); + _SLEEP(1000); + + mutex_lock(g_IecCtrl_mutex); g_IecCtrl.last_time = time(NULL); g_IecCtrl.last_yx_time = time(NULL); g_IecCtrl.last_yc_time = time(NULL); g_IecCtrl.sockid = accept_fd; g_IecCtrl.isConnect = true; + mutex_unlock(g_IecCtrl_mutex); + pthread_t thread_handle_minor_recv = -1; pthread_t thread_handle_minor_active = -1; _SLEEP(3000); // 给主链路时间,来处理登录应答. @@ -1090,11 +1144,11 @@ void* thread_listen_proc(void * arg) #endif g_Tcp.tcpSetSockID(accept_fd); vPrtLogMsg(LOG_DEBUG, errno, "connect link the recv socket fd is: %d", g_IecCtrl.sockid); - ht_pthread_create_background(&thread_handle_minor_recv, thread_recv_proc, NULL); - ht_pthread_create_background(&thread_handle_minor_active, thread_active_proc, NULL); + //ht_pthread_create_background(&thread_handle_minor_recv, thread_recv_proc, NULL); + //ht_pthread_create_background(&thread_handle_minor_active, thread_active_proc, NULL); #ifndef _WIN32 - pthread_detach(thread_handle_minor_recv); - pthread_detach(thread_handle_minor_active); + //pthread_detach(thread_handle_minor_recv); + //ptread_detach(thread_handle_minor_active); #endif _SLEEP(1000); } @@ -1257,51 +1311,83 @@ void *thread_client_proc(void *arg) void* thread_recv_proc(void * arg) { unsigned char szBuf[MAX_SBUFF_TCP] = { 0 }; - int recv_buflen = 0, pkgLen = 0; + int recv_buflen = 0, pkgLen = 0; + int recv_offset = 0; + int bytes_in_buf = 0; + int offset = 0; vPrtLogMsg(LOG_DEBUG, 0, "thread_recv_proc = %d startup, sockid=%d...", GETTID(), g_IecCtrl.sockid); - while (g_Running && g_IecCtrl.isConnect) - { - pthread_testcancels(); - memset(szBuf, 0x00, sizeof(szBuf)); - recv_buflen = -1; - pkgLen = 0; - recv_buflen = g_Tcp.tcpRecvBuffer(g_IecCtrl.sockid, (char*)szBuf, MAX_SBUFF_TCP, 5); // t1 = 15s - if (recv_buflen == ErrException || (recv_buflen == 0 && errno == 2)) { - vPrtLogMsg(LOG_WARNG, errno, "socket link exceptiond, do close sockid=%d, thread_recv_proc pthread_exit.", g_IecCtrl.sockid); - g_Tcp.clear_tcp_buffer(g_IecCtrl.sockid, MAX_SBUFF_TCP); + while (g_Running ) + { + pthread_testcancels(); + if (!g_IecCtrl.isConnect) + { + _SLEEP(1000); + continue; + } + memset(szBuf, 0x00, sizeof(szBuf)); + recv_buflen = -1; + int sockid = g_IecCtrl.sockid; + recv_buflen = g_Tcp.tcpRecvBuffer(sockid, (char*)szBuf, MAX_SBUFF_TCP, 5); // t1 = 15s + if (recv_buflen == ErrException || (recv_buflen == 0 && errno == 2)) { + vPrtLogMsg(LOG_WARNG, errno, "socket link exceptiond, do close sockid=%d, thread_recv_proc pthread_exit.", g_IecCtrl.sockid); + g_Tcp.clear_tcp_buffer(sockid, MAX_SBUFF_TCP); + + + mutex_lock(g_IecCtrl_mutex); + if (sockid == g_IecCtrl.sockid) + { + g_IecCtrl.isConnect = false; + g_IecCtrl.sockid = -1; + } + mutex_unlock(g_IecCtrl_mutex); + break; + } + else if (recv_buflen <= 0) { + vPrtLogMsg(LOG_WARNG, RET_OK, "socket link recv data length: %d,sockid=%d,errno=%d, sleep and continue", recv_buflen, g_IecCtrl.sockid, errno); + _SLEEP(1000); + continue; + } - mutex_lock(g_IecCtrl_mutex); - g_IecCtrl.isConnect = false; - g_IecCtrl.sockid = -1; - mutex_unlock(g_IecCtrl_mutex); - break; - } - else if (recv_buflen <= 0) { - vPrtLogMsg(LOG_WARNG, RET_OK,"socket link recv data length: %d,sockid=%d,errno=%d, sleep and continue",recv_buflen,g_IecCtrl.sockid,errno); - _SLEEP(1000); - continue; - } + mutex_lock(g_IecCtrl_mutex); + g_IecCtrl.last_time = time(NULL); + mutex_unlock(g_IecCtrl_mutex); - mutex_lock(g_IecCtrl_mutex); - g_IecCtrl.last_time = time(NULL); - mutex_unlock(g_IecCtrl_mutex); + bytes_in_buf = recv_buflen; - if (recv_buflen < (int)sizeof(ST_APCI)) // < 6byte - { - _SLEEP(1000); - continue; - } - if (recv_buflen >= (int)sizeof(ST_APCI) && (szBuf[0] & 0xff) == CMD_START_HEAD) // 0x68H - { - pkgLen = 1 + 1 + (szBuf[1] & 0xff); - if (recv_buflen < pkgLen) - continue; - } - AddParserList(szBuf, pkgLen); - vPrtLogHex(LOG_PACK, g_IecCtrl.sockid, PRT_PACK_RECV, (unsigned char*)szBuf, pkgLen); - g_Tcp.clear_tcp_buffer(g_IecCtrl.sockid, pkgLen); + offset = 0; + pkgLen = 0; + while (offset < bytes_in_buf) + { + // Find HEAD + if ((szBuf[offset] & 0xff) != CMD_START_HEAD) + { + // Ignore it + offset++; + continue; + } + + if (bytes_in_buf - offset < (int)sizeof(ST_APCI)) // < 6byte + { + break; + } + + pkgLen = 1 + 1 + (szBuf[offset + 1] & 0xff); + if (bytes_in_buf - offset < pkgLen) + { + break; + } + + AddParserList(szBuf + offset, pkgLen); + vPrtLogHex(LOG_PACK, g_IecCtrl.sockid, PRT_PACK_RECV, (unsigned char*)szBuf, pkgLen); + offset += pkgLen; + } + + if (offset > 0) + { + g_Tcp.clear_tcp_buffer(g_IecCtrl.sockid, offset); + } } vPrtLogMsg(LOG_DEBUG, RET_OK, "link exceptiond, thread_recv_proc pthread_exit."); return NULL; @@ -1343,7 +1429,7 @@ bool bSetPointTableValueYX(unsigned char v, unsigned int adr) if (((m_pIter->second).stype & 0xff) == 0x01) { (m_pIter->second).dtime = time(NULL)-1; - // 遥信量 + // 遥信量tcpAcceptSocket if ((v & 0xff) != ((m_pIter->second).cval & 0xff)) { ovl = ((m_pIter->second).cval & 0xff); // 上一次的状态值 @@ -1396,6 +1482,54 @@ bool bSetPointTableValueYX(const std::vector& values) //遥测值入map bool bSetPointTableValueYC(float v, unsigned int adr) { + // Save the origin data into database first + IEC_OBJVAL_NEW objVal = { 0 }; + objVal.stype = 2; + objVal.dtime = time(NULL); + objVal.fval = v; + objVal.sadr = adr; + + addOriginDataListNew(objVal); + + if (g_TConfig.shouldParseBusiData() != 0) + { + // time_t ts = 0; + std::map::const_iterator itPoint; + map::iterator itCachedDev; + + bool assigned = false; + + list::iterator itBusiData; + mutex_lock(g_map_iec_mutex_new); + itPoint = g_map_iec_new.find(adr); + if (itPoint != g_map_iec_new.end() || strlen((const char*)itPoint->second.fieldName) > 0) + { + itCachedDev = g_map_dev_data.find(itPoint->second.sensor_id); + if (itCachedDev != g_map_dev_data.end()) + { + assigned = AssignValueToDeviceData(itCachedDev->second, objVal); + if (assigned) + { + if (itCachedDev->second.assignedFields == itCachedDev->second.fields.size()) + { + std::string sql = BuildSqlForDeviceData(itCachedDev->second); + + ResetCachedDeviceData(itCachedDev->second); + + mutex_lock(g_list_busi_data_mutex); + itBusiData = g_list_busi_data.insert(g_list_busi_data.end(), std::string()); + itBusiData->swap(sql); + mutex_unlock(g_list_busi_data_mutex); + + vPrtLogMsg(LOG_DEBUG, 0, "Insert Busi Data into Table %s, sensor_id=%u", (const char*)itCachedDev->second.device->tableName, itCachedDev->second.device->sensor_id); + } + } + } + } + + mutex_unlock(g_map_iec_mutex_new); + } + map::iterator m_pIter; double oldval = 0.0; mutex_lock(g_map_iec_mutex); @@ -1454,61 +1588,10 @@ bool bSetPointTableValueYC(float v, unsigned int adr) bool bSetPointTableValueYC(const std::vector& values) { - // Save the origin data into database first - for (std::vector::const_iterator it = values.begin(); it != values.end(); ++it) - { - addOriginDataListNew(*it); - } - if (g_TConfig.shouldParseBusiData() != 0) - { - // time_t ts = 0; - std::map::const_iterator itPoint; - - bool assigned = false; - map::iterator itCachedDev; - std::string sql; - list::iterator itBusiData; - mutex_lock(g_map_iec_mutex_new); - for (std::vector::const_iterator it = values.begin(); it != values.end(); ++it) - { - if (it->stype == 1) - { - continue; - } - - itPoint = g_map_iec_new.find(it->sadr); - if (itPoint == g_map_iec_new.end() || strlen((const char*)itPoint->second.fieldName) == 0) - { - continue; - } - - itCachedDev = g_map_dev_data.find(itPoint->second.sensor_id); - if (itCachedDev == g_map_dev_data.end()) - { - continue; - } - - assigned = AssignValueToDeviceData(itCachedDev->second, *it); - if (assigned) - { - if (itCachedDev->second.assignedFields == itCachedDev->second.fields.size()) - { - sql = BuildSqlForDeviceData(itCachedDev->second); - - ResetCachedDeviceData(itCachedDev->second); - - mutex_lock(g_list_busi_data_mutex); - itBusiData = g_list_busi_data.insert(g_list_busi_data.end(), std::string()); - itBusiData->swap(sql); - mutex_unlock(g_list_busi_data_mutex); - - vPrtLogMsg(LOG_DEBUG, 0, "Insert Busi Data into Table %s, sensor_id=%u", (const char*)itCachedDev->second.device->tableName, itCachedDev->second.device->sensor_id); - } - } - } - mutex_unlock(g_map_iec_mutex_new); - } + + + /* IEC_OBJVAL_NEW objVal = { 0 }; @@ -1604,7 +1687,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen // 类型标识值<0>未用,在本配套标准中定义了 1 至 127 的值,128 至 255 未定义。136 // 至 255 可以由此标准的使用者彼此独立的进行定义,仅当使用具有类型标识号为 1 至 127 // 的范围的应用服务数据单元才能达到全部互操作。 - std::vector ycItems; + // std::vector ycItems; IEC_OBJVAL_NEW ycItem = { 2, 0 }; time_t ts = time(NULL); @@ -1682,7 +1765,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen ycItem.dtime = ts; ycItem.sadr = adr; ycItem.fval = *val; - ycItems.push_back(ycItem); + // ycItems.push_back(ycItem); vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%d OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl); pos += sizeof(SH104); @@ -1697,7 +1780,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen ycItem.dtime = ts; ycItem.sadr = adr; ycItem.fval = *val; - ycItems.push_back(ycItem); + // ycItems.push_back(ycItem); if (*val) vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%d OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pfv->qds.ov, pfv->qds.iv, pfv->qds.bl); @@ -1717,7 +1800,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen ycItem.dtime = ts; ycItem.sadr = adr; ycItem.fval = *val; - ycItems.push_back(ycItem); + // ycItems.push_back(ycItem); vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%d OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl); pos += sizeof(SH104); @@ -1737,7 +1820,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen ycItem.dtime = ts; ycItem.sadr = adr; ycItem.fval = *val; - ycItems.push_back(ycItem); + // ycItems.push_back(ycItem); vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%-.4f OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl); pos += sizeof(SFP104); @@ -1752,7 +1835,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen ycItem.dtime = ts; ycItem.sadr = adr; ycItem.fval = *val; - ycItems.push_back(ycItem); + // ycItems.push_back(ycItem); vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%-.4f OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pfv->qds.ov, pfv->qds.iv, pfv->qds.bl); pos += sizeof(SFP104V); @@ -1771,7 +1854,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen ycItem.dtime = ts; ycItem.sadr = adr; ycItem.fval = *val; - ycItems.push_back(ycItem); + // ycItems.push_back(ycItem); vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%-.4f OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl); pos += sizeof(SFP104); @@ -1817,7 +1900,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen ycItem.dtime = t56.GetTime(); ycItem.sadr = adr; ycItem.fval = *val; - ycItems.push_back(ycItem); + // ycItems.push_back(ycItem); char buf[32]; t56.GetTimeString(buf, sizeof(buf)); @@ -1879,9 +1962,9 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen break; } - if (!ycItems.empty()) + // if (!ycItems.empty()) { - bSetPointTableValueYC(ycItems); + // bSetPointTableValueYC(ycItems); } return RET_OK; } @@ -1908,7 +1991,11 @@ int SendMsgFormatIAction(unsigned char cmd) stPack.asdu.header.commom_asdu2 = (g_iec_conf.iec_global_addr >> 8) & 0xFF; stPack.asdu.data[3] = 0x14; // 全召 - iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&stPack, stPack.apci.len + 2); +#ifndef _WIN32 + iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&stPack, stPack.apci.len + 2, g_IecCtrl.pipeFds[0]); +#else + iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&stPack, stPack.apci.len + 2, -1); +#endif if (iRet == ErrException) { vPrtLogMsg(LOG_ERROR, iRet, "send mesg failed, sockid:%d msg:%s will close socket", g_IecCtrl.sockid, strerror(errno)); @@ -1996,13 +2083,14 @@ void * thread_parser_proc(void * arg) continue; } memcpy(&pData, &(g_list_pack.back()), sizeof(ST_RECVPKG)); // 由尾取出 + g_list_pack.pop_back(); // 由尾删除 + mutex_unlock(g_list_pack_mutex); + if (pData.pszBuff == NULL) { - mutex_unlock(g_list_pack_mutex); + _SLEEP(MAX_SLEEP_EMPTY * 10); continue; } - g_list_pack.pop_back(); // 由尾删除 - mutex_unlock(g_list_pack_mutex); // 解析104报文 if (pData.iLength >= MIN_APDU_LENGTH && (pData.pszBuff[0] & 0xff) == CMD_START_HEAD) // 0x68H @@ -3697,14 +3785,16 @@ void *thread_Timer_proc(void *arg) time_t timeOrigin = time(NULL); // 定期总召 + /* if ((g_IecCtrl.time_action != -1) && ((timeOrigin - g_IecCtrl.time_action) > g_iec_conf.action_interval)) { vPrtLogMsg(LOG_DEBUG, RET_OK, "Send Activation"); SendMsgFormatIAction(CMD_CTL_64H); // 发送总召激活 } + * */ _SLEEP(1000); - continue; + // continue; while ((time(NULL) - timeOrigin) < 1000) _SLEEP(1000); @@ -3789,7 +3879,11 @@ void *thread_active_proc(void *arg) header.cntl3 = g_IecCtrl.usRecvNo & 0xFE; header.cntl4 = (g_IecCtrl.usRecvNo >> 8) & 0xFF; - g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&header, 6); +#ifndef _WIN32 + g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&header, 6, g_IecCtrl.pipeFds[0]); +#else + g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&header, 6, -1); +#endif vPrtLogHex(LOG_PACK, g_IecCtrl.sockid, PRT_PACK_SEND, (unsigned char*)&header, sizeof(ST_APCI)); mutex_lock(g_IecCtrl_mutex); g_IecCtrl.last_time = time(NULL); diff --git a/src/HTService.cpp b/src/HTService.cpp index 48692af..9a1205c 100644 --- a/src/HTService.cpp +++ b/src/HTService.cpp @@ -32,6 +32,13 @@ static void cancel_thread_work(void) } vPrtLogMsg(LOG_DEBUG, RET_OK,"thread_handle_linkmgr shutdown.") ; + if(thread_handle_recv) { + pthread_cancel(thread_handle_recv); + pthread_join(thread_handle_recv, NULL); + pthread_kill(thread_handle_recv, SIGURG); + } + vPrtLogMsg(LOG_DEBUG, RET_OK,"thread_handle_recv shutdown.") ; + if(thread_handle_active) { pthread_cancel(thread_handle_active); pthread_join(thread_handle_active, NULL); @@ -99,6 +106,10 @@ void sig_chld(int signo) // 创建守护进程环境 static int HTStartBackgroundProcess() { +#ifdef __NETBEANS__ + // If in Netbeans IDE + return 0; +#endif #ifndef _WIN32 pid_t pid; @@ -325,6 +336,9 @@ int main(int argc, char* argv[]) ht_pthread_create_background(&thread_handle_gishold, thread_gis_hold_proc, NULL); // 评测数据入库线程 ht_pthread_create_background(&thread_handle_linkmgr, thread_listen_proc, NULL); // 链路管理线程 + + ht_pthread_create_background(&thread_handle_recv, thread_recv_proc, NULL); + // ht_pthread_create_background(&thread_handle_linkmgr, thread_client_proc, NULL); // 链路管理线程 ht_pthread_create_background(&thread_handle_parse, thread_parser_proc, NULL); // 104报文解析线程 ht_pthread_create_background(&thread_handle_timer, thread_Timer_proc, NULL); // 104链路超时管理线程 diff --git a/src/HTTcpSocket.cpp b/src/HTTcpSocket.cpp index 8bdf5a0..138382e 100644 --- a/src/HTTcpSocket.cpp +++ b/src/HTTcpSocket.cpp @@ -257,7 +257,7 @@ int TTcpSocket::tcpAcceptSocket(int sockid) ** Return Code : >0 send data length ** <0 error ************************************************************/ -int TTcpSocket::tcpSendBuffer(int sockid, const char *pBuffer, int length) +int TTcpSocket::tcpSendBuffer(int sockid, const char *pBuffer, int length, int pipeFd) { if ((sockid <= 0) || (pBuffer == NULL) ) return -1 ; @@ -265,7 +265,7 @@ int TTcpSocket::tcpSendBuffer(int sockid, const char *pBuffer, int length) int iSendLen = 0,iLen=0,i=0,iCnt=0,len=length ; char *p = (char *)pBuffer; - fd_set fd_Write, fd_Except ; + fd_set fd_Write, fd_Except, fd_read; struct timeval st_TimeOut ; if(length > MAX_SBUFF_TCP) @@ -275,6 +275,7 @@ int TTcpSocket::tcpSendBuffer(int sockid, const char *pBuffer, int length) for(;;) { FD_ZERO(&fd_Write) ; + FD_ZERO(&fd_read); FD_ZERO(&fd_Except) ; FD_SET((unsigned int)sockid, &fd_Write) ; FD_SET((unsigned int)sockid, &fd_Except) ; @@ -282,12 +283,38 @@ int TTcpSocket::tcpSendBuffer(int sockid, const char *pBuffer, int length) st_TimeOut.tv_sec = 0 ; st_TimeOut.tv_usec = 10 ; - iSendLen = select (sockid+1 , NULL, &fd_Write, &fd_Except, &st_TimeOut) ; + int seledtFd = (pipeFd > sockid ? pipeFd : sockid) + 1; + if (pipeFd != -1) + { + FD_SET(pipeFd, &fd_read); + iSendLen = select(seledtFd, &fd_read, &fd_Write, &fd_Except, &st_TimeOut); + } + else + { + iSendLen = select(seledtFd, NULL, &fd_Write, &fd_Except, &st_TimeOut); + } if((iSendLen == 0 && errno == EPIPE) || (iSendLen < 0)) { // EPIPE =32 Broken pipe EBADF=9 Bad file number vPrtLogMsg(LOG_ERROR,errno, "socket execption close,sockid:%d ret:%d msg:%s", sockid, iSendLen, strerror(errno)); //add_monitor_mesg(len, 0x00, (unsigned char*)pBuffer); // 添加监控消息到监控发送队列 return ErrException; } + if (pipeFd != -1 && FD_ISSET(pipeFd, &fd_read)) + { + int sid = -1; +#ifndef _WIN32 + read(pipeFd, &sid, sizeof(sid)); // read from the read end of the pipe +#else +#endif + vPrtLogMsg(LOG_ERROR, errno, "socket closed,sockid:%d sid from pipe=%d", sockid, sid); + if (sockid == sid) + { + return ErrSendFail; //socket exception + } + else + { + continue; + } + } if( FD_ISSET(sockid, &fd_Except) ) { vPrtLogMsg(LOG_ERROR,errno, "socket execption,sockid:%d msg:%s", sockid, strerror(errno));