长泰最终版本

main
BlueMatthew 2 years ago
parent c4b0d15c50
commit 7b6b1baf1b

@ -14,7 +14,11 @@ MYSQL_LIB_PATH = /usr/lib64/mysql
MYSQL_LIB_FILE = mysqlclient MYSQL_LIB_FILE = mysqlclient
#CFLAGS = -c -Werror -g -D_REENTRANT -Wformat -Wsign-compare -O3 -ansi #CFLAGS = -c -Werror -g -D_REENTRANT -Wformat -Wsign-compare -O3 -ansi
CFLAGS = -c -Werror -g -D_DEBUG -D_REENTRANT -Wformat -Wsign-compare -O3 -ansi ifeq ($(MODE), DEBUG)
CFLAGS = -c -Werror -g -D_DEBUG -D_REENTRANT -Wformat -Wsign-compare -O0 -ansi
else
CFLAGS = -c -Werror -g -D_REENTRANT -Wformat -Wsign-compare -O3 -ansi
endif
LIBS = -L$(MYSQL_LIB_PATH) -L/usr/local/lib64 -l$(MYSQL_LIB_FILE) -lopencv_core -lopencv_imgproc -lopencv_highgui -lopencv_videoio -lopencv_imgcodecs -lz -ldl -lpng -lnsl -luuid -lpthread -lm -lc -lstdc++ LIBS = -L$(MYSQL_LIB_PATH) -L/usr/local/lib64 -l$(MYSQL_LIB_FILE) -lopencv_core -lopencv_imgproc -lopencv_highgui -lopencv_videoio -lopencv_imgcodecs -lz -ldl -lpng -lnsl -luuid -lpthread -lm -lc -lstdc++
INCPATH = -I./include -I$(MYSQL_PATH)/include INCPATH = -I./include -I$(MYSQL_PATH)/include
@ -36,8 +40,13 @@ TARGET = ./bin/iecserver
#TARGET = ./bin/IMGService #TARGET = ./bin/IMGService
# for link # for link
ifeq ($(MODE), DEBUG)
$(TARGET):$(OBJS)
$(CC) -Wall -O0 -o $(TARGET) $(OBJS) $(LIBS)
else
$(TARGET):$(OBJS) $(TARGET):$(OBJS)
$(CC) -Wall -O3 -o $(TARGET) $(OBJS) $(LIBS) $(CC) -Wall -O3 -o $(TARGET) $(OBJS) $(LIBS)
endif
$(OBJS_PATH)/%.o : ./src/%.cpp $(OBJS_PATH)/%.o : ./src/%.cpp
$(CC) $(CFLAGS) $(INCPATH) $< -o $@ $(CC) $(CFLAGS) $(INCPATH) $< -o $@

@ -37,6 +37,9 @@ typedef struct {
bool isConnect; // true: connected false: not connected bool isConnect; // true: connected false: not connected
int listenid; // listen handle int listenid; // listen handle
int sockid; // socket connect handle int sockid; // socket connect handle
#ifndef _WIN32
int pipeFds[2]; // When new connection arrived, wake socket up with the pipe
#endif
time_t last_time; // last send/recv time time_t last_time; // last send/recv time
bool is_yk_ack ; // 是否收到遥控信号 bool is_yk_ack ; // 是否收到遥控信号

@ -119,6 +119,7 @@ extern pthread_t thread_handle_gishold; // 断路器断开数据入库线程
extern pthread_t thread_handle_active; // 心跳线程 extern pthread_t thread_handle_active; // 心跳线程
extern pthread_t thread_handle_linkmgr; // 链路管理线程 extern pthread_t thread_handle_linkmgr; // 链路管理线程
extern pthread_t thread_handle_recv; // 链路管理线程
extern pthread_t thread_handle_cache; // 内存同步线程句柄 extern pthread_t thread_handle_cache; // 内存同步线程句柄
extern pthread_t thread_handle_opecvimg; // 图片识别线程句柄 extern pthread_t thread_handle_opecvimg; // 图片识别线程句柄

@ -62,7 +62,7 @@ public:
int getLocalSocketInfo(int sockid, unsigned char *sip, unsigned short *sport); int getLocalSocketInfo(int sockid, unsigned char *sip, unsigned short *sport);
int tcpRecvBuffer(int sockid, char* pBuffer,int nMaxLength, int timeout=6); int tcpRecvBuffer(int sockid, char* pBuffer,int nMaxLength, int timeout=6);
int tcpSendBuffer(int sockid, const char *pBuffer, int length); int tcpSendBuffer(int sockid, const char *pBuffer, int length, int pipeFd);
int tcpAcceptSocket(int sockid); int tcpAcceptSocket(int sockid);
int tcpCloseSocket(int sockfd); int tcpCloseSocket(int sockfd);

@ -77,6 +77,8 @@ pthread_t thread_handle_busi_data; // 业务数据如何线程句柄
pthread_t thread_handle_pingce; // IEC104评测数据入库线程句柄 pthread_t thread_handle_pingce; // IEC104评测数据入库线程句柄
pthread_t thread_handle_gishold; // 断路器断开数据入库线程句柄 pthread_t thread_handle_gishold; // 断路器断开数据入库线程句柄
pthread_t thread_handle_recv; //
pthread_t thread_handle_active; // 心跳线程 pthread_t thread_handle_active; // 心跳线程
pthread_t thread_handle_linkmgr; // 链路管理线程 pthread_t thread_handle_linkmgr; // 链路管理线程
pthread_t thread_handle_cache; // 内存同步线程句柄 pthread_t thread_handle_cache; // 内存同步线程句柄

@ -732,6 +732,11 @@ static void showIEC104Conf()
void InitIECENV() void InitIECENV()
{ {
memset(&g_IecCtrl, 0x00, sizeof(ST_IEC104_CTRL)); memset(&g_IecCtrl, 0x00, sizeof(ST_IEC104_CTRL));
g_IecCtrl.sockid = -1;
#ifndef _WIN32
g_IecCtrl.pipeFds[0] = -1;
g_IecCtrl.pipeFds[1] = -1;
#endif
g_IecCtrl.t0 = (int)g_TConfig.getTimeout0(); g_IecCtrl.t0 = (int)g_TConfig.getTimeout0();
g_IecCtrl.t1 = (int)g_TConfig.getTimeout1(); g_IecCtrl.t1 = (int)g_TConfig.getTimeout1();
g_IecCtrl.t2 = (int)g_TConfig.getTimeout2(); g_IecCtrl.t2 = (int)g_TConfig.getTimeout2();
@ -742,6 +747,10 @@ void InitIECENV()
g_IecCtrl.timer_U_Testflag = true; // 初始化为真如果过程中有超时置为false g_IecCtrl.timer_U_Testflag = true; // 初始化为真如果过程中有超时置为false
g_IecCtrl.m_gis_change = false; g_IecCtrl.m_gis_change = false;
g_IecCtrl.time_action = time(NULL); // 总召间隔时间更新 g_IecCtrl.time_action = time(NULL); // 总召间隔时间更新
#ifndef _WIN32
pipe(g_IecCtrl.pipeFds);
#endif
} }
void IEC104EnvLoad() void IEC104EnvLoad()
{ {
@ -770,6 +779,20 @@ void IEC104EnvLoad()
void IEC104EnvFree() void IEC104EnvFree()
{ {
#ifndef _WIN32
if (g_IecCtrl.pipeFds[1] != -1)
{
int data = -1;
write(g_IecCtrl.pipeFds[1], &data, sizeof(data));
close(g_IecCtrl.pipeFds[1]);
g_IecCtrl.pipeFds[1] = -1;
}
if (g_IecCtrl.pipeFds[0] != -1)
{
close(g_IecCtrl.pipeFds[0]);
g_IecCtrl.pipeFds[0] = -1;
}
#endif
mutex_close(g_list_pack_mutex); mutex_close(g_list_pack_mutex);
mutex_close(g_list_dbset_mutex); mutex_close(g_list_dbset_mutex);
mutex_close(g_sendno_mutex); mutex_close(g_sendno_mutex);
@ -925,7 +948,11 @@ int SendMsgFormatU(int cmd)
header.cntl3 = 0; // g_IecCtrl.usRecvNo & 0xFE; header.cntl3 = 0; // g_IecCtrl.usRecvNo & 0xFE;
header.cntl4 = 0; //(g_IecCtrl.usRecvNo >> 8) & 0xFF; header.cntl4 = 0; //(g_IecCtrl.usRecvNo >> 8) & 0xFF;
iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&header, sizeof(ST_APCI)); #ifndef _WIN32
iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&header, sizeof(ST_APCI), g_IecCtrl.pipeFds[0]);
#else
iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&header, sizeof(ST_APCI), -1);
#endif
if (iRet == ErrException) if (iRet == ErrException)
{ {
vPrtLogMsg(LOG_ERROR, iRet, "send mesg failed, sockid:%d msg:%s will close socket", g_IecCtrl.sockid, strerror(errno)); vPrtLogMsg(LOG_ERROR, iRet, "send mesg failed, sockid:%d msg:%s will close socket", g_IecCtrl.sockid, strerror(errno));
@ -961,13 +988,20 @@ int SendMsgFormatS(unsigned short sendno)
//header.cntl3 = (g_IecCtrl.TxCounter & 0xFE); // S-Format //header.cntl3 = (g_IecCtrl.TxCounter & 0xFE); // S-Format
//header.cntl4 = (g_IecCtrl.TxCounter >> 8) & 0xFF; //S格式确认包cnt1=0x01, cnt2=0x00,是固定不变的。 //header.cntl4 = (g_IecCtrl.TxCounter >> 8) & 0xFF; //S格式确认包cnt1=0x01, cnt2=0x00,是固定不变的。
int sockid = g_IecCtrl.sockid;
if ((iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&header, sizeof(ST_APCI))) < 0) #ifndef _WIN32
if ((iRet = g_Tcp.tcpSendBuffer(sockid, (const char*)&header, sizeof(ST_APCI), g_IecCtrl.pipeFds[0])) < 0)
#else
if ((iRet = g_Tcp.tcpSendBuffer(sockid, (const char*)&header, sizeof(ST_APCI), -1)) < 0)
#endif
{ {
vPrtLogMsg(LOG_WARNG, iRet, "send Format S failed with error: %s, errno=%d will close socket", strerror(errno), errno); vPrtLogMsg(LOG_WARNG, iRet, "send Format S failed with error: %s, errno=%d will close socket", strerror(errno), errno);
g_Tcp.clear_tcp_buffer(g_IecCtrl.sockid, MAX_SBUFF_TCP); if (sockid == g_IecCtrl.sockid)
g_Tcp.tcpCloseSocket(g_IecCtrl.sockid); {
g_IecCtrl.isConnect = false; g_Tcp.clear_tcp_buffer(g_IecCtrl.sockid, MAX_SBUFF_TCP);
g_Tcp.tcpCloseSocket(g_IecCtrl.sockid);
g_IecCtrl.isConnect = false;
}
return iRet; return iRet;
} }
//g_IecCtrl.usSendNo++; //g_IecCtrl.usSendNo++;
@ -994,9 +1028,15 @@ int SendMsgFormatI(unsigned char *msgbuf, unsigned int len)
header->cntl4 = ((g_IecCtrl.usRecvNo >> 8) & 0xFF); header->cntl4 = ((g_IecCtrl.usRecvNo >> 8) & 0xFF);
memcpy(buf + sizeof(ST_APCI), msgbuf, len); memcpy(buf + sizeof(ST_APCI), msgbuf, len);
if ((iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)buf, len + sizeof(ST_APCI))) < 0) int sockid = g_IecCtrl.sockid;
#ifndef _WIN32
if ((iRet = g_Tcp.tcpSendBuffer(sockid, (const char*)buf, len + sizeof(ST_APCI), g_IecCtrl.pipeFds[0])) < 0)
#else
if ((iRet = g_Tcp.tcpSendBuffer(sockid, (const char*)buf, len + sizeof(ST_APCI), -1)) < 0)
#endif
{ {
vPrtLogMsg(LOG_WARNG, iRet, "send Format I failed with error: %s, errno=%d will close socket", strerror(errno), errno); vPrtLogMsg(LOG_WARNG, iRet, "send Format I failed with error: %s, errno=%d will close socket", strerror(errno), errno);
if (sockid == g_IecCtrl.sockid)
g_Tcp.clear_tcp_buffer(g_IecCtrl.sockid, MAX_SBUFF_TCP); g_Tcp.clear_tcp_buffer(g_IecCtrl.sockid, MAX_SBUFF_TCP);
g_Tcp.tcpCloseSocket(g_IecCtrl.sockid); g_Tcp.tcpCloseSocket(g_IecCtrl.sockid);
g_IecCtrl.isConnect = false; g_IecCtrl.isConnect = false;
@ -1066,20 +1106,34 @@ void* thread_listen_proc(void * arg)
continue; continue;
} }
mutex_lock(g_IecCtrl_mutex); mutex_lock(g_IecCtrl_mutex);
if (g_Tcp.tcpIsConnected(g_IecCtrl.sockid)) int org_socket = g_IecCtrl.sockid;
{ if (org_socket != -1)
mutex_unlock(g_IecCtrl_mutex); {
vPrtLogMsg(LOG_DEBUG, errno,"Previous link is connected, won't accept accept_fd=%d", accept_fd); // Will close it, wake it up
_SLEEP(1000);
continue; #ifndef _WIN32
} if (g_IecCtrl.pipeFds[1] != -1)
{
write(g_IecCtrl.pipeFds[1], &org_socket, sizeof(org_socket));
vPrtLogMsg(LOG_ERROR, errno, "Wake up socket select via pipe,sockid:%d", org_socket);
}
#endif
}
// g_Tcp.tcpCloseSocket(org_socket);
mutex_unlock(g_IecCtrl_mutex);
_SLEEP(1000);
mutex_lock(g_IecCtrl_mutex);
g_IecCtrl.last_time = time(NULL); g_IecCtrl.last_time = time(NULL);
g_IecCtrl.last_yx_time = time(NULL); g_IecCtrl.last_yx_time = time(NULL);
g_IecCtrl.last_yc_time = time(NULL); g_IecCtrl.last_yc_time = time(NULL);
g_IecCtrl.sockid = accept_fd; g_IecCtrl.sockid = accept_fd;
g_IecCtrl.isConnect = true; g_IecCtrl.isConnect = true;
mutex_unlock(g_IecCtrl_mutex); mutex_unlock(g_IecCtrl_mutex);
pthread_t thread_handle_minor_recv = -1; pthread_t thread_handle_minor_recv = -1;
pthread_t thread_handle_minor_active = -1; pthread_t thread_handle_minor_active = -1;
_SLEEP(3000); // 给主链路时间,来处理登录应答. _SLEEP(3000); // 给主链路时间,来处理登录应答.
@ -1090,11 +1144,11 @@ void* thread_listen_proc(void * arg)
#endif #endif
g_Tcp.tcpSetSockID(accept_fd); g_Tcp.tcpSetSockID(accept_fd);
vPrtLogMsg(LOG_DEBUG, errno, "connect link the recv socket fd is: %d", g_IecCtrl.sockid); vPrtLogMsg(LOG_DEBUG, errno, "connect link the recv socket fd is: %d", g_IecCtrl.sockid);
ht_pthread_create_background(&thread_handle_minor_recv, thread_recv_proc, NULL); //ht_pthread_create_background(&thread_handle_minor_recv, thread_recv_proc, NULL);
ht_pthread_create_background(&thread_handle_minor_active, thread_active_proc, NULL); //ht_pthread_create_background(&thread_handle_minor_active, thread_active_proc, NULL);
#ifndef _WIN32 #ifndef _WIN32
pthread_detach(thread_handle_minor_recv); //pthread_detach(thread_handle_minor_recv);
pthread_detach(thread_handle_minor_active); //ptread_detach(thread_handle_minor_active);
#endif #endif
_SLEEP(1000); _SLEEP(1000);
} }
@ -1257,51 +1311,83 @@ void *thread_client_proc(void *arg)
void* thread_recv_proc(void * arg) void* thread_recv_proc(void * arg)
{ {
unsigned char szBuf[MAX_SBUFF_TCP] = { 0 }; unsigned char szBuf[MAX_SBUFF_TCP] = { 0 };
int recv_buflen = 0, pkgLen = 0; int recv_buflen = 0, pkgLen = 0;
int recv_offset = 0;
int bytes_in_buf = 0;
int offset = 0;
vPrtLogMsg(LOG_DEBUG, 0, "thread_recv_proc = %d startup, sockid=%d...", GETTID(), g_IecCtrl.sockid); vPrtLogMsg(LOG_DEBUG, 0, "thread_recv_proc = %d startup, sockid=%d...", GETTID(), g_IecCtrl.sockid);
while (g_Running && g_IecCtrl.isConnect) while (g_Running )
{ {
pthread_testcancels(); pthread_testcancels();
memset(szBuf, 0x00, sizeof(szBuf)); if (!g_IecCtrl.isConnect)
recv_buflen = -1; {
pkgLen = 0; _SLEEP(1000);
recv_buflen = g_Tcp.tcpRecvBuffer(g_IecCtrl.sockid, (char*)szBuf, MAX_SBUFF_TCP, 5); // t1 = 15s continue;
if (recv_buflen == ErrException || (recv_buflen == 0 && errno == 2)) { }
vPrtLogMsg(LOG_WARNG, errno, "socket link exceptiond, do close sockid=%d, thread_recv_proc pthread_exit.", g_IecCtrl.sockid); memset(szBuf, 0x00, sizeof(szBuf));
g_Tcp.clear_tcp_buffer(g_IecCtrl.sockid, MAX_SBUFF_TCP); recv_buflen = -1;
int sockid = g_IecCtrl.sockid;
recv_buflen = g_Tcp.tcpRecvBuffer(sockid, (char*)szBuf, MAX_SBUFF_TCP, 5); // t1 = 15s
if (recv_buflen == ErrException || (recv_buflen == 0 && errno == 2)) {
vPrtLogMsg(LOG_WARNG, errno, "socket link exceptiond, do close sockid=%d, thread_recv_proc pthread_exit.", g_IecCtrl.sockid);
g_Tcp.clear_tcp_buffer(sockid, MAX_SBUFF_TCP);
mutex_lock(g_IecCtrl_mutex);
if (sockid == g_IecCtrl.sockid)
{
g_IecCtrl.isConnect = false;
g_IecCtrl.sockid = -1;
}
mutex_unlock(g_IecCtrl_mutex);
break;
}
else if (recv_buflen <= 0) {
vPrtLogMsg(LOG_WARNG, RET_OK, "socket link recv data length: %d,sockid=%d,errno=%d, sleep and continue", recv_buflen, g_IecCtrl.sockid, errno);
_SLEEP(1000);
continue;
}
mutex_lock(g_IecCtrl_mutex); mutex_lock(g_IecCtrl_mutex);
g_IecCtrl.isConnect = false; g_IecCtrl.last_time = time(NULL);
g_IecCtrl.sockid = -1; mutex_unlock(g_IecCtrl_mutex);
mutex_unlock(g_IecCtrl_mutex);
break;
}
else if (recv_buflen <= 0) {
vPrtLogMsg(LOG_WARNG, RET_OK,"socket link recv data length: %d,sockid=%d,errno=%d, sleep and continue",recv_buflen,g_IecCtrl.sockid,errno);
_SLEEP(1000);
continue;
}
mutex_lock(g_IecCtrl_mutex); bytes_in_buf = recv_buflen;
g_IecCtrl.last_time = time(NULL);
mutex_unlock(g_IecCtrl_mutex);
if (recv_buflen < (int)sizeof(ST_APCI)) // < 6byte offset = 0;
{ pkgLen = 0;
_SLEEP(1000); while (offset < bytes_in_buf)
continue; {
} // Find HEAD
if (recv_buflen >= (int)sizeof(ST_APCI) && (szBuf[0] & 0xff) == CMD_START_HEAD) // 0x68H if ((szBuf[offset] & 0xff) != CMD_START_HEAD)
{ {
pkgLen = 1 + 1 + (szBuf[1] & 0xff); // Ignore it
if (recv_buflen < pkgLen) offset++;
continue; continue;
} }
AddParserList(szBuf, pkgLen);
vPrtLogHex(LOG_PACK, g_IecCtrl.sockid, PRT_PACK_RECV, (unsigned char*)szBuf, pkgLen); if (bytes_in_buf - offset < (int)sizeof(ST_APCI)) // < 6byte
g_Tcp.clear_tcp_buffer(g_IecCtrl.sockid, pkgLen); {
break;
}
pkgLen = 1 + 1 + (szBuf[offset + 1] & 0xff);
if (bytes_in_buf - offset < pkgLen)
{
break;
}
AddParserList(szBuf + offset, pkgLen);
vPrtLogHex(LOG_PACK, g_IecCtrl.sockid, PRT_PACK_RECV, (unsigned char*)szBuf, pkgLen);
offset += pkgLen;
}
if (offset > 0)
{
g_Tcp.clear_tcp_buffer(g_IecCtrl.sockid, offset);
}
} }
vPrtLogMsg(LOG_DEBUG, RET_OK, "link exceptiond, thread_recv_proc pthread_exit."); vPrtLogMsg(LOG_DEBUG, RET_OK, "link exceptiond, thread_recv_proc pthread_exit.");
return NULL; return NULL;
@ -1343,7 +1429,7 @@ bool bSetPointTableValueYX(unsigned char v, unsigned int adr)
if (((m_pIter->second).stype & 0xff) == 0x01) if (((m_pIter->second).stype & 0xff) == 0x01)
{ {
(m_pIter->second).dtime = time(NULL)-1; (m_pIter->second).dtime = time(NULL)-1;
// 遥信量 // 遥信量tcpAcceptSocket
if ((v & 0xff) != ((m_pIter->second).cval & 0xff)) if ((v & 0xff) != ((m_pIter->second).cval & 0xff))
{ {
ovl = ((m_pIter->second).cval & 0xff); // 上一次的状态值 ovl = ((m_pIter->second).cval & 0xff); // 上一次的状态值
@ -1396,6 +1482,54 @@ bool bSetPointTableValueYX(const std::vector<IEC_OBJVAL_NEW>& values)
//遥测值入map //遥测值入map
bool bSetPointTableValueYC(float v, unsigned int adr) bool bSetPointTableValueYC(float v, unsigned int adr)
{ {
// Save the origin data into database first
IEC_OBJVAL_NEW objVal = { 0 };
objVal.stype = 2;
objVal.dtime = time(NULL);
objVal.fval = v;
objVal.sadr = adr;
addOriginDataListNew(objVal);
if (g_TConfig.shouldParseBusiData() != 0)
{
// time_t ts = 0;
std::map<unsigned int, IEC_POINT>::const_iterator itPoint;
map<unsigned int, CACHED_DEV_DATA>::iterator itCachedDev;
bool assigned = false;
list<std::string>::iterator itBusiData;
mutex_lock(g_map_iec_mutex_new);
itPoint = g_map_iec_new.find(adr);
if (itPoint != g_map_iec_new.end() || strlen((const char*)itPoint->second.fieldName) > 0)
{
itCachedDev = g_map_dev_data.find(itPoint->second.sensor_id);
if (itCachedDev != g_map_dev_data.end())
{
assigned = AssignValueToDeviceData(itCachedDev->second, objVal);
if (assigned)
{
if (itCachedDev->second.assignedFields == itCachedDev->second.fields.size())
{
std::string sql = BuildSqlForDeviceData(itCachedDev->second);
ResetCachedDeviceData(itCachedDev->second);
mutex_lock(g_list_busi_data_mutex);
itBusiData = g_list_busi_data.insert(g_list_busi_data.end(), std::string());
itBusiData->swap(sql);
mutex_unlock(g_list_busi_data_mutex);
vPrtLogMsg(LOG_DEBUG, 0, "Insert Busi Data into Table %s, sensor_id=%u", (const char*)itCachedDev->second.device->tableName, itCachedDev->second.device->sensor_id);
}
}
}
}
mutex_unlock(g_map_iec_mutex_new);
}
map<unsigned int, ST_IECPOINT_TABLE>::iterator m_pIter; map<unsigned int, ST_IECPOINT_TABLE>::iterator m_pIter;
double oldval = 0.0; double oldval = 0.0;
mutex_lock(g_map_iec_mutex); mutex_lock(g_map_iec_mutex);
@ -1454,61 +1588,10 @@ bool bSetPointTableValueYC(float v, unsigned int adr)
bool bSetPointTableValueYC(const std::vector<IEC_OBJVAL_NEW>& values) bool bSetPointTableValueYC(const std::vector<IEC_OBJVAL_NEW>& values)
{ {
// Save the origin data into database first
for (std::vector<IEC_OBJVAL_NEW>::const_iterator it = values.begin(); it != values.end(); ++it)
{
addOriginDataListNew(*it);
}
if (g_TConfig.shouldParseBusiData() != 0)
{
// time_t ts = 0;
std::map<unsigned int, IEC_POINT>::const_iterator itPoint;
bool assigned = false;
map<unsigned int, CACHED_DEV_DATA>::iterator itCachedDev;
std::string sql;
list<std::string>::iterator itBusiData;
mutex_lock(g_map_iec_mutex_new);
for (std::vector<IEC_OBJVAL_NEW>::const_iterator it = values.begin(); it != values.end(); ++it)
{
if (it->stype == 1)
{
continue;
}
itPoint = g_map_iec_new.find(it->sadr);
if (itPoint == g_map_iec_new.end() || strlen((const char*)itPoint->second.fieldName) == 0)
{
continue;
}
itCachedDev = g_map_dev_data.find(itPoint->second.sensor_id);
if (itCachedDev == g_map_dev_data.end())
{
continue;
}
assigned = AssignValueToDeviceData(itCachedDev->second, *it);
if (assigned)
{
if (itCachedDev->second.assignedFields == itCachedDev->second.fields.size())
{
sql = BuildSqlForDeviceData(itCachedDev->second);
ResetCachedDeviceData(itCachedDev->second);
mutex_lock(g_list_busi_data_mutex);
itBusiData = g_list_busi_data.insert(g_list_busi_data.end(), std::string());
itBusiData->swap(sql);
mutex_unlock(g_list_busi_data_mutex);
vPrtLogMsg(LOG_DEBUG, 0, "Insert Busi Data into Table %s, sensor_id=%u", (const char*)itCachedDev->second.device->tableName, itCachedDev->second.device->sensor_id);
}
}
}
mutex_unlock(g_map_iec_mutex_new);
}
/* /*
IEC_OBJVAL_NEW objVal = { 0 }; IEC_OBJVAL_NEW objVal = { 0 };
@ -1604,7 +1687,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen
// 类型标识值<0>未用,在本配套标准中定义了 1 至 127 的值128 至 255 未定义。136 // 类型标识值<0>未用,在本配套标准中定义了 1 至 127 的值128 至 255 未定义。136
// 至 255 可以由此标准的使用者彼此独立的进行定义,仅当使用具有类型标识号为 1 至 127 // 至 255 可以由此标准的使用者彼此独立的进行定义,仅当使用具有类型标识号为 1 至 127
// 的范围的应用服务数据单元才能达到全部互操作。 // 的范围的应用服务数据单元才能达到全部互操作。
std::vector<IEC_OBJVAL_NEW> ycItems; // std::vector<IEC_OBJVAL_NEW> ycItems;
IEC_OBJVAL_NEW ycItem = { 2, 0 }; IEC_OBJVAL_NEW ycItem = { 2, 0 };
time_t ts = time(NULL); time_t ts = time(NULL);
@ -1682,7 +1765,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen
ycItem.dtime = ts; ycItem.dtime = ts;
ycItem.sadr = adr; ycItem.sadr = adr;
ycItem.fval = *val; ycItem.fval = *val;
ycItems.push_back(ycItem); // ycItems.push_back(ycItem);
vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%d OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl); vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%d OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl);
pos += sizeof(SH104); pos += sizeof(SH104);
@ -1697,7 +1780,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen
ycItem.dtime = ts; ycItem.dtime = ts;
ycItem.sadr = adr; ycItem.sadr = adr;
ycItem.fval = *val; ycItem.fval = *val;
ycItems.push_back(ycItem); // ycItems.push_back(ycItem);
if (*val) if (*val)
vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%d OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pfv->qds.ov, pfv->qds.iv, pfv->qds.bl); vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%d OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pfv->qds.ov, pfv->qds.iv, pfv->qds.bl);
@ -1717,7 +1800,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen
ycItem.dtime = ts; ycItem.dtime = ts;
ycItem.sadr = adr; ycItem.sadr = adr;
ycItem.fval = *val; ycItem.fval = *val;
ycItems.push_back(ycItem); // ycItems.push_back(ycItem);
vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%d OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl); vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%d OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl);
pos += sizeof(SH104); pos += sizeof(SH104);
@ -1737,7 +1820,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen
ycItem.dtime = ts; ycItem.dtime = ts;
ycItem.sadr = adr; ycItem.sadr = adr;
ycItem.fval = *val; ycItem.fval = *val;
ycItems.push_back(ycItem); // ycItems.push_back(ycItem);
vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%-.4f OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl); vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%-.4f OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl);
pos += sizeof(SFP104); pos += sizeof(SFP104);
@ -1752,7 +1835,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen
ycItem.dtime = ts; ycItem.dtime = ts;
ycItem.sadr = adr; ycItem.sadr = adr;
ycItem.fval = *val; ycItem.fval = *val;
ycItems.push_back(ycItem); // ycItems.push_back(ycItem);
vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%-.4f OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pfv->qds.ov, pfv->qds.iv, pfv->qds.bl); vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%-.4f OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pfv->qds.ov, pfv->qds.iv, pfv->qds.bl);
pos += sizeof(SFP104V); pos += sizeof(SFP104V);
@ -1771,7 +1854,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen
ycItem.dtime = ts; ycItem.dtime = ts;
ycItem.sadr = adr; ycItem.sadr = adr;
ycItem.fval = *val; ycItem.fval = *val;
ycItems.push_back(ycItem); // ycItems.push_back(ycItem);
vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%-.4f OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl); vPrtLogMsg(LOG_DEBUG, RET_OK, "type=%d SFP_%d(0x%04x): val:%-.4f OFlow: %d Valid: %d Blocked: %d", header->type, adr, adr, *val, pf->qds.ov, pf->qds.iv, pf->qds.bl);
pos += sizeof(SFP104); pos += sizeof(SFP104);
@ -1817,7 +1900,7 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen
ycItem.dtime = t56.GetTime(); ycItem.dtime = t56.GetTime();
ycItem.sadr = adr; ycItem.sadr = adr;
ycItem.fval = *val; ycItem.fval = *val;
ycItems.push_back(ycItem); // ycItems.push_back(ycItem);
char buf[32]; char buf[32];
t56.GetTimeString(buf, sizeof(buf)); t56.GetTimeString(buf, sizeof(buf));
@ -1879,9 +1962,9 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen
break; break;
} }
if (!ycItems.empty()) // if (!ycItems.empty())
{ {
bSetPointTableValueYC(ycItems); // bSetPointTableValueYC(ycItems);
} }
return RET_OK; return RET_OK;
} }
@ -1908,7 +1991,11 @@ int SendMsgFormatIAction(unsigned char cmd)
stPack.asdu.header.commom_asdu2 = (g_iec_conf.iec_global_addr >> 8) & 0xFF; stPack.asdu.header.commom_asdu2 = (g_iec_conf.iec_global_addr >> 8) & 0xFF;
stPack.asdu.data[3] = 0x14; // 全召 stPack.asdu.data[3] = 0x14; // 全召
iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&stPack, stPack.apci.len + 2); #ifndef _WIN32
iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&stPack, stPack.apci.len + 2, g_IecCtrl.pipeFds[0]);
#else
iRet = g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&stPack, stPack.apci.len + 2, -1);
#endif
if (iRet == ErrException) if (iRet == ErrException)
{ {
vPrtLogMsg(LOG_ERROR, iRet, "send mesg failed, sockid:%d msg:%s will close socket", g_IecCtrl.sockid, strerror(errno)); vPrtLogMsg(LOG_ERROR, iRet, "send mesg failed, sockid:%d msg:%s will close socket", g_IecCtrl.sockid, strerror(errno));
@ -1996,13 +2083,14 @@ void * thread_parser_proc(void * arg)
continue; continue;
} }
memcpy(&pData, &(g_list_pack.back()), sizeof(ST_RECVPKG)); // 由尾取出 memcpy(&pData, &(g_list_pack.back()), sizeof(ST_RECVPKG)); // 由尾取出
g_list_pack.pop_back(); // 由尾删除
mutex_unlock(g_list_pack_mutex);
if (pData.pszBuff == NULL) { if (pData.pszBuff == NULL) {
mutex_unlock(g_list_pack_mutex);
_SLEEP(MAX_SLEEP_EMPTY * 10); _SLEEP(MAX_SLEEP_EMPTY * 10);
continue; continue;
} }
g_list_pack.pop_back(); // 由尾删除
mutex_unlock(g_list_pack_mutex);
// 解析104报文 // 解析104报文
if (pData.iLength >= MIN_APDU_LENGTH && (pData.pszBuff[0] & 0xff) == CMD_START_HEAD) // 0x68H if (pData.iLength >= MIN_APDU_LENGTH && (pData.pszBuff[0] & 0xff) == CMD_START_HEAD) // 0x68H
@ -3697,14 +3785,16 @@ void *thread_Timer_proc(void *arg)
time_t timeOrigin = time(NULL); time_t timeOrigin = time(NULL);
// 定期总召 // 定期总召
/*
if ((g_IecCtrl.time_action != -1) && ((timeOrigin - g_IecCtrl.time_action) > g_iec_conf.action_interval)) if ((g_IecCtrl.time_action != -1) && ((timeOrigin - g_IecCtrl.time_action) > g_iec_conf.action_interval))
{ {
vPrtLogMsg(LOG_DEBUG, RET_OK, "Send Activation"); vPrtLogMsg(LOG_DEBUG, RET_OK, "Send Activation");
SendMsgFormatIAction(CMD_CTL_64H); // 发送总召激活 SendMsgFormatIAction(CMD_CTL_64H); // 发送总召激活
} }
* */
_SLEEP(1000); _SLEEP(1000);
continue; // continue;
while ((time(NULL) - timeOrigin) < 1000) _SLEEP(1000); while ((time(NULL) - timeOrigin) < 1000) _SLEEP(1000);
@ -3789,7 +3879,11 @@ void *thread_active_proc(void *arg)
header.cntl3 = g_IecCtrl.usRecvNo & 0xFE; header.cntl3 = g_IecCtrl.usRecvNo & 0xFE;
header.cntl4 = (g_IecCtrl.usRecvNo >> 8) & 0xFF; header.cntl4 = (g_IecCtrl.usRecvNo >> 8) & 0xFF;
g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&header, 6); #ifndef _WIN32
g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&header, 6, g_IecCtrl.pipeFds[0]);
#else
g_Tcp.tcpSendBuffer(g_IecCtrl.sockid, (const char*)&header, 6, -1);
#endif
vPrtLogHex(LOG_PACK, g_IecCtrl.sockid, PRT_PACK_SEND, (unsigned char*)&header, sizeof(ST_APCI)); vPrtLogHex(LOG_PACK, g_IecCtrl.sockid, PRT_PACK_SEND, (unsigned char*)&header, sizeof(ST_APCI));
mutex_lock(g_IecCtrl_mutex); mutex_lock(g_IecCtrl_mutex);
g_IecCtrl.last_time = time(NULL); g_IecCtrl.last_time = time(NULL);

@ -32,6 +32,13 @@ static void cancel_thread_work(void)
} }
vPrtLogMsg(LOG_DEBUG, RET_OK,"thread_handle_linkmgr shutdown.") ; vPrtLogMsg(LOG_DEBUG, RET_OK,"thread_handle_linkmgr shutdown.") ;
if(thread_handle_recv) {
pthread_cancel(thread_handle_recv);
pthread_join(thread_handle_recv, NULL);
pthread_kill(thread_handle_recv, SIGURG);
}
vPrtLogMsg(LOG_DEBUG, RET_OK,"thread_handle_recv shutdown.") ;
if(thread_handle_active) { if(thread_handle_active) {
pthread_cancel(thread_handle_active); pthread_cancel(thread_handle_active);
pthread_join(thread_handle_active, NULL); pthread_join(thread_handle_active, NULL);
@ -99,6 +106,10 @@ void sig_chld(int signo)
// 创建守护进程环境 // 创建守护进程环境
static int HTStartBackgroundProcess() static int HTStartBackgroundProcess()
{ {
#ifdef __NETBEANS__
// If in Netbeans IDE
return 0;
#endif
#ifndef _WIN32 #ifndef _WIN32
pid_t pid; pid_t pid;
@ -325,6 +336,9 @@ int main(int argc, char* argv[])
ht_pthread_create_background(&thread_handle_gishold, thread_gis_hold_proc, NULL); // 评测数据入库线程 ht_pthread_create_background(&thread_handle_gishold, thread_gis_hold_proc, NULL); // 评测数据入库线程
ht_pthread_create_background(&thread_handle_linkmgr, thread_listen_proc, NULL); // 链路管理线程 ht_pthread_create_background(&thread_handle_linkmgr, thread_listen_proc, NULL); // 链路管理线程
ht_pthread_create_background(&thread_handle_recv, thread_recv_proc, NULL);
// ht_pthread_create_background(&thread_handle_linkmgr, thread_client_proc, NULL); // 链路管理线程 // ht_pthread_create_background(&thread_handle_linkmgr, thread_client_proc, NULL); // 链路管理线程
ht_pthread_create_background(&thread_handle_parse, thread_parser_proc, NULL); // 104报文解析线程 ht_pthread_create_background(&thread_handle_parse, thread_parser_proc, NULL); // 104报文解析线程
ht_pthread_create_background(&thread_handle_timer, thread_Timer_proc, NULL); // 104链路超时管理线程 ht_pthread_create_background(&thread_handle_timer, thread_Timer_proc, NULL); // 104链路超时管理线程

@ -257,7 +257,7 @@ int TTcpSocket::tcpAcceptSocket(int sockid)
** Return Code : >0 send data length ** Return Code : >0 send data length
** <0 error ** <0 error
************************************************************/ ************************************************************/
int TTcpSocket::tcpSendBuffer(int sockid, const char *pBuffer, int length) int TTcpSocket::tcpSendBuffer(int sockid, const char *pBuffer, int length, int pipeFd)
{ {
if ((sockid <= 0) || (pBuffer == NULL) ) if ((sockid <= 0) || (pBuffer == NULL) )
return -1 ; return -1 ;
@ -265,7 +265,7 @@ int TTcpSocket::tcpSendBuffer(int sockid, const char *pBuffer, int length)
int iSendLen = 0,iLen=0,i=0,iCnt=0,len=length ; int iSendLen = 0,iLen=0,i=0,iCnt=0,len=length ;
char *p = (char *)pBuffer; char *p = (char *)pBuffer;
fd_set fd_Write, fd_Except ; fd_set fd_Write, fd_Except, fd_read;
struct timeval st_TimeOut ; struct timeval st_TimeOut ;
if(length > MAX_SBUFF_TCP) if(length > MAX_SBUFF_TCP)
@ -275,6 +275,7 @@ int TTcpSocket::tcpSendBuffer(int sockid, const char *pBuffer, int length)
for(;;) for(;;)
{ {
FD_ZERO(&fd_Write) ; FD_ZERO(&fd_Write) ;
FD_ZERO(&fd_read);
FD_ZERO(&fd_Except) ; FD_ZERO(&fd_Except) ;
FD_SET((unsigned int)sockid, &fd_Write) ; FD_SET((unsigned int)sockid, &fd_Write) ;
FD_SET((unsigned int)sockid, &fd_Except) ; FD_SET((unsigned int)sockid, &fd_Except) ;
@ -282,12 +283,38 @@ int TTcpSocket::tcpSendBuffer(int sockid, const char *pBuffer, int length)
st_TimeOut.tv_sec = 0 ; st_TimeOut.tv_sec = 0 ;
st_TimeOut.tv_usec = 10 ; st_TimeOut.tv_usec = 10 ;
iSendLen = select (sockid+1 , NULL, &fd_Write, &fd_Except, &st_TimeOut) ; int seledtFd = (pipeFd > sockid ? pipeFd : sockid) + 1;
if (pipeFd != -1)
{
FD_SET(pipeFd, &fd_read);
iSendLen = select(seledtFd, &fd_read, &fd_Write, &fd_Except, &st_TimeOut);
}
else
{
iSendLen = select(seledtFd, NULL, &fd_Write, &fd_Except, &st_TimeOut);
}
if((iSendLen == 0 && errno == EPIPE) || (iSendLen < 0)) { // EPIPE =32 Broken pipe EBADF=9 Bad file number if((iSendLen == 0 && errno == EPIPE) || (iSendLen < 0)) { // EPIPE =32 Broken pipe EBADF=9 Bad file number
vPrtLogMsg(LOG_ERROR,errno, "socket execption close,sockid:%d ret:%d msg:%s", sockid, iSendLen, strerror(errno)); vPrtLogMsg(LOG_ERROR,errno, "socket execption close,sockid:%d ret:%d msg:%s", sockid, iSendLen, strerror(errno));
//add_monitor_mesg(len, 0x00, (unsigned char*)pBuffer); // 添加监控消息到监控发送队列 //add_monitor_mesg(len, 0x00, (unsigned char*)pBuffer); // 添加监控消息到监控发送队列
return ErrException; return ErrException;
} }
if (pipeFd != -1 && FD_ISSET(pipeFd, &fd_read))
{
int sid = -1;
#ifndef _WIN32
read(pipeFd, &sid, sizeof(sid)); // read from the read end of the pipe
#else
#endif
vPrtLogMsg(LOG_ERROR, errno, "socket closed,sockid:%d sid from pipe=%d", sockid, sid);
if (sockid == sid)
{
return ErrSendFail; //socket exception
}
else
{
continue;
}
}
if( FD_ISSET(sockid, &fd_Except) ) if( FD_ISSET(sockid, &fd_Except) )
{ {
vPrtLogMsg(LOG_ERROR,errno, "socket execption,sockid:%d msg:%s", sockid, strerror(errno)); vPrtLogMsg(LOG_ERROR,errno, "socket execption,sockid:%d msg:%s", sockid, strerror(errno));

Loading…
Cancel
Save