调整业务数据入库的方式

main
BlueMatthew 2 years ago
parent 1d82a73560
commit 8349376e7a

@ -58,18 +58,18 @@ extern TTcpSocket g_Tcp;
extern ST_IEC104_CTRL g_IecCtrl; // socket linker info and status extern ST_IEC104_CTRL g_IecCtrl; // socket linker info and status
extern mutex g_IecCtrl_mutex; // socket linker on mutex extern mutex g_IecCtrl_mutex; // socket linker on mutex
extern map<string, ST_BYQWORK_STATUS> g_map_work; // 主变工作状态缓存 extern map<string, ST_BYQWORK_STATUS> g_map_work; // 主变工作状态缓存
extern mutex g_map_work_mutex; // 主变工作状态缓存队列锁 extern mutex g_map_work_mutex; // 主变工作状态缓存队列锁
//extern map<string,ST_BYQ_CACHE> g_map_byq; // 变压器缓存数据 //extern map<string,ST_BYQ_CACHE> g_map_byq; // 变压器缓存数据
extern map<string, ST_BYQ_RUN_STATE_THRESHOLD> g_map_thres_byq; // 变压器缓存数据 extern map<string, ST_BYQ_RUN_STATE_THRESHOLD> g_map_thres_byq; // 变压器缓存数据
extern mutex g_map_byq_mutex; // 变压器缓存数据信息队列锁 extern mutex g_map_byq_mutex; // 变压器缓存数据信息队列锁
//extern map<string,ST_GIS_CACHE> g_map_gis; // GIS缓存数据 //extern map<string,ST_GIS_CACHE> g_map_gis; // GIS缓存数据
//extern mutex g_map_gis_mutex; // GIS缓存数据信息队列锁 //extern mutex g_map_gis_mutex; // GIS缓存数据信息队列锁
extern multimap<string, ST_DEV_RELATION> g_map_relation; // 主设备与外挂设备关系 extern multimap<string, ST_DEV_RELATION> g_map_relation; // 主设备与外挂设备关系
extern mutex g_map_relation_mutex; // GIS缓存数据信息队列锁 extern mutex g_map_relation_mutex; // GIS缓存数据信息队列锁
//extern map<string, ST_GIS_IEC104_DATA> g_map_gis_104; // GIS最近一次104上传数据缓存 //extern map<string, ST_GIS_IEC104_DATA> g_map_gis_104; // GIS最近一次104上传数据缓存
//extern mutex g_map_gis_104_mutex; //extern mutex g_map_gis_104_mutex;
@ -77,26 +77,27 @@ extern mutex g_map_relation_mutex; // GIS
//extern map<string, ST_BLQ_CACHE> g_map_blq; // BLQ缓存数据 //extern map<string, ST_BLQ_CACHE> g_map_blq; // BLQ缓存数据
//extern mutex g_map_blq_mutex; // BLQ缓存数据信息队列锁 //extern mutex g_map_blq_mutex; // BLQ缓存数据信息队列锁
extern map<unsigned int, ST_IECPOINT_TABLE> g_map_iec; // 104报文解析缓存数据 extern map<unsigned int, ST_IECPOINT_TABLE> g_map_iec; // 104报文解析缓存数据
extern mutex g_map_iec_mutex; // 104报文解析缓存数据信息队列锁 extern mutex g_map_iec_mutex; // 104报文解析缓存数据信息队列锁
extern map<unsigned int, IEC_DEVICE> g_map_devices; // 104报文解析缓存数据 extern map<unsigned int, IEC_DEVICE> g_map_devices; // 104报文解析缓存数据
extern map<unsigned int, IEC_POINT> g_map_iec_new; // 104报文解析缓存数据 extern map<unsigned int, IEC_POINT> g_map_iec_new; // 104报文解析缓存数据
extern mutex g_map_iec_mutex_new; // 104报文解析缓存数据信息队列锁 extern map<unsigned int, CACHED_DEV_DATA> g_map_dev_data; // 104点表报文解析缓存数据
extern mutex g_map_iec_mutex_new; // 104报文解析缓存数据信息队列锁
extern map<unsigned int, ST_SADR_MATCH> g_map_sadr; // 点表匹配关系 extern map<unsigned int, ST_SADR_MATCH> g_map_sadr; // 点表匹配关系
extern mutex g_map_sadr_mutex; extern mutex g_map_sadr_mutex;
extern map<string, ST_IMG_THRESHOLD> g_map_img_thres; // 图片识别阈值 extern map<string, ST_IMG_THRESHOLD> g_map_img_thres; // 图片识别阈值
extern mutex g_map_img_thres_mutex; extern mutex g_map_img_thres_mutex;
//extern list<ST_LIST_MONITOR> g_list_monitor; // 监控消息数据队列 //extern list<ST_LIST_MONITOR> g_list_monitor; // 监控消息数据队列
//extern mutex g_list_monitor_mutex; //extern mutex g_list_monitor_mutex;
extern map<string, ST_DEVICE_TIME_STAT> g_map_dev_time_stat; extern map<string, ST_DEVICE_TIME_STAT> g_map_dev_time_stat;
extern mutex g_map_dev_time_stat_mutex; extern mutex g_map_dev_time_stat_mutex;
extern map<unsigned int, ST_BREAK_EQM_CODE> g_map_gis_state; // 断路器状态位与电流电压关系匹配表 extern map<unsigned int, ST_BREAK_EQM_CODE> g_map_gis_state; // 断路器状态位与电流电压关系匹配表
extern mutex g_map_gis_state_mutex; extern mutex g_map_gis_state_mutex;
extern int g_seqno; // 与MEC之间交换的流水号 extern int g_seqno; // 与MEC之间交换的流水号
extern mutex g_seq_mutex; // 流水号 extern mutex g_seq_mutex; // 流水号

@ -13,6 +13,8 @@
#ifndef __HP104_TABLE #ifndef __HP104_TABLE
#define __HP104_TABLE #define __HP104_TABLE
#include <map>
static const char IEC_CONFIMG_FILE[] = "../etc/ht_iec104.conf"; static const char IEC_CONFIMG_FILE[] = "../etc/ht_iec104.conf";
#pragma pack (push ,1) #pragma pack (push ,1)
@ -46,13 +48,25 @@ typedef struct tagIEC_DEVICE
unsigned char one_dtime; unsigned char one_dtime;
}IEC_DEVICE; }IEC_DEVICE;
typedef struct tagIEC_FIELD typedef struct tagIEC_FIELD
{ {
std::string name; std::string name;
float value; float fval;
time_t ts; time_t ts;
} IEC_FIELD; } IEC_FIELD;
typedef struct tagCACHED_DEV_DATA
{
IEC_DEVICE* device;
time_t firstTs;
size_t assignedFields;
std::map<unsigned int, std::pair<IEC_FIELD, bool> > fields;
}CACHED_DEV_DATA;
typedef struct tagIEC_POINT typedef struct tagIEC_POINT
{ {
unsigned int sadr; // 信息体地址map.key unsigned int sadr; // 信息体地址map.key

@ -105,6 +105,11 @@ unsigned char cGetByqDeviceFaultState(unsigned char *pszEqm_code);
void SethDevTimeStat(unsigned char *sys_code, unsigned char ws); void SethDevTimeStat(unsigned char *sys_code, unsigned char ws);
time_t GetDevTimeStat(unsigned char *sys_code, unsigned char *ws); time_t GetDevTimeStat(unsigned char *sys_code, unsigned char *ws);
void ResetCachedDeviceData();
std::string BuildSqlForDeviceData(CACHED_DEV_DATA& cachedDev);
bool AssignValueToDeviceData(CACHED_DEV_DATA& cachedDev, const IEC_OBJVAL_NEW& val);
#pragma pack (pop) #pragma pack (pop)
#endif // end #endif // end

@ -48,6 +48,7 @@ mutex g_map_iec_mutex; // 104报文解析
map<unsigned int, IEC_DEVICE> g_map_devices; // 104报文解析缓存数据 map<unsigned int, IEC_DEVICE> g_map_devices; // 104报文解析缓存数据
map<unsigned int, IEC_POINT> g_map_iec_new; // 104点表报文解析缓存数据 map<unsigned int, IEC_POINT> g_map_iec_new; // 104点表报文解析缓存数据
map<unsigned int, CACHED_DEV_DATA> g_map_dev_data; // 104点表报文解析缓存数据
mutex g_map_iec_mutex_new; // 104报文解析缓存数据信息队列锁 mutex g_map_iec_mutex_new; // 104报文解析缓存数据信息队列锁
map<unsigned int, ST_SADR_MATCH> g_map_sadr; // 点表匹配关系 map<unsigned int, ST_SADR_MATCH> g_map_sadr; // 点表匹配关系

@ -1429,17 +1429,13 @@ bool bSetPointTableValueYC(const std::vector<IEC_OBJVAL_NEW>& values)
if (g_TConfig.shouldParseBusiData() != 0) if (g_TConfig.shouldParseBusiData() != 0)
{ {
time_t ts = 0; // time_t ts = 0;
std::map<unsigned int, IEC_POINT>::const_iterator itPoint; std::map<unsigned int, IEC_POINT>::const_iterator itPoint;
std::map<unsigned int, IEC_DEVICE>::const_iterator itDev;
std::string fields, fieldValues; bool assigned = false;
map<unsigned int, CACHED_DEV_DATA>::iterator itCachedDev;
std::map<unsigned int, std::vector<IEC_FIELD> > records; std::string sql;
std::map<unsigned int, std::vector<IEC_FIELD> >::iterator itRecord; list<std::string>::iterator itBusiData;
std::vector<IEC_FIELD>::iterator itField;
char dataBuf[32] = { 0 };
IEC_FIELD field;
mutex_lock(g_map_iec_mutex_new); mutex_lock(g_map_iec_mutex_new);
for (std::vector<IEC_OBJVAL_NEW>::const_iterator it = values.begin(); it != values.end(); ++it) for (std::vector<IEC_OBJVAL_NEW>::const_iterator it = values.begin(); it != values.end(); ++it)
{ {
@ -1448,92 +1444,29 @@ bool bSetPointTableValueYC(const std::vector<IEC_OBJVAL_NEW>& values)
{ {
continue; continue;
} }
itRecord = records.find(itPoint->second.sensor_id);
if (itRecord == records.end())
{
itRecord = records.insert(records.end(), std::make_pair<>(itPoint->second.sensor_id, std::vector<IEC_FIELD>()));
}
field.name = (const char*)itPoint->second.fieldName;
field.ts = it->dtime;
field.value = it->fval;
itRecord->second.push_back(field);
}
std::string sql; itCachedDev = g_map_dev_data.find(itPoint->second.sensor_id);
for (itRecord = records.begin(); itRecord != records.end(); ++itRecord) if (itCachedDev == g_map_dev_data.end())
{
itDev = g_map_devices.find(itRecord->first);
if (itDev == g_map_devices.end() || strlen((const char*)itDev->second.tableName) == 0)
{ {
continue; continue;
} }
fields.clear(); assigned = AssignValueToDeviceData(itCachedDev->second, *it);
fieldValues.clear(); if (assigned)
sql.clear();
for (itField = itRecord->second.begin(); itField != itRecord->second.end(); ++itField)
{ {
ts = itField->ts; if (itCachedDev->second.assignedFields == itCachedDev->second.fields.size())
fields.append(itField->name);
fields.append(",");
fieldValues.append("'");
snprintf(dataBuf, sizeof(dataBuf), "%0.4f", itField->value);
fieldValues.append(dataBuf);
fieldValues.append("',");
if ((itDev->second.one_dtime == 0))
{ {
fields.append((const char *)itPoint->second.fieldName); sql = BuildSqlForDeviceData(itCachedDev->second);
fields.append("_time,");
vPrtLogMsg(LOG_DEBUG, RET_OK, "Insert busi data %s", sql.c_str());
fieldValues.append("FROM_UNIXTIME(");
snprintf(dataBuf, sizeof(dataBuf), "%lld", (long long)ts); mutex_lock(g_list_busi_data_mutex);
fieldValues.append(dataBuf); itBusiData = g_list_busi_data.insert(g_list_busi_data.end(), std::string());
fieldValues.append("),"); itBusiData->swap(sql);
mutex_unlock(g_list_busi_data_mutex);
} }
} }
sql = "INSERT INTO ";
sql.append((const char *)itDev->second.tableName);
sql.append("(");
sql.append((const char *)itDev->second.devidFieldName);
sql.append(",");
if (itDev->second.one_dtime != 0)
{
sql.append((const char *)itDev->second.dtimeFieldName);
sql.append(",");
}
sql.append(fields, 0, fields.size() - 1);
sql.append(") VALUES(");
snprintf(dataBuf, sizeof(dataBuf), "%u", itDev->second.sensor_id);
sql.append(dataBuf);
sql.append(",");
if (itDev->second.one_dtime != 0)
{
sql.append("FROM_UNIXTIME(");
snprintf(dataBuf, sizeof(dataBuf), "%lld", (long long)ts);
sql.append(dataBuf);
sql.append("),");
}
sql.append(fieldValues, 0, fieldValues.size() - 1);
sql.append(")");
vPrtLogMsg(LOG_DEBUG, RET_OK, "Insert busi data %s", sql.c_str());
std::list<std::string>::iterator it;
mutex_lock(g_list_busi_data_mutex);
it = g_list_busi_data.insert(g_list_busi_data.end(), std::string());
it->swap(sql);
mutex_unlock(g_list_busi_data_mutex);
} }
mutex_unlock(g_map_iec_mutex_new); mutex_unlock(g_map_iec_mutex_new);
} }
@ -1839,9 +1772,19 @@ int DecodeMsgFormatI(unsigned char *msgbuf, unsigned int len, unsigned short sen
break; break;
case 100: // 总召唤命令 case 100: // 总召唤命令
if (cause == 7) // 激活确认 if (cause == 7) // 激活确认
vPrtLogMsg(LOG_DEBUG, RET_OK, "Recv Activation Confirmation, %d.", msgbuf[9] - 20); {
if (g_TConfig.shouldParseBusiData() != 0)
{
ResetCachedDeviceData();
}
vPrtLogMsg(LOG_DEBUG, RET_OK, "Recv Activation Confirmation, %d.", msgbuf[9] - 20);
}
else if (cause == 10) // 激活结束 else if (cause == 10) // 激活结束
{ {
if (g_TConfig.shouldParseBusiData() != 0)
{
ResetCachedDeviceData();
}
//SendMsgFormatS(sendno); // 总召唤结束发送S帧确认 //SendMsgFormatS(sendno); // 总召唤结束发送S帧确认
vPrtLogMsg(LOG_DEBUG, RET_OK, "Recv Activation Termination, %d", msgbuf[9] - 20); vPrtLogMsg(LOG_DEBUG, RET_OK, "Recv Activation Termination, %d", msgbuf[9] - 20);
} }

@ -398,6 +398,8 @@ static void vLoadIECPointTableNew()
IEC_POINT stPoint; IEC_POINT stPoint;
IEC_DEVICE stSensor; IEC_DEVICE stSensor;
size_t idx = 0; size_t idx = 0;
map<unsigned int, CACHED_DEV_DATA>::iterator itCachedDev;
map<unsigned int, IEC_DEVICE>::iterator itDev;
while (row = pdbHandle->GetRecord(res)) while (row = pdbHandle->GetRecord(res))
{ {
memset(&stPoint, 0x00, sizeof(IEC_POINT)); memset(&stPoint, 0x00, sizeof(IEC_POINT));
@ -453,13 +455,141 @@ static void vLoadIECPointTableNew()
// vPrtLogMsg(LOG_WARNG, RET_OK, "----TABLE %s oneTime=%u", (const char*)stSensor.tableName, (unsigned int)stSensor.one_dtime); // vPrtLogMsg(LOG_WARNG, RET_OK, "----TABLE %s oneTime=%u", (const char*)stSensor.tableName, (unsigned int)stSensor.one_dtime);
mutex_lock(g_map_iec_mutex_new); mutex_lock(g_map_iec_mutex_new);
itDev = g_map_devices.find(stSensor.sensor_id);
if (itDev == g_map_devices.end())
{
itDev = g_map_devices.insert(g_map_devices.end(), std::make_pair<>(stSensor.sensor_id, stSensor));
}
g_map_devices[stSensor.sensor_id] = stSensor; g_map_devices[stSensor.sensor_id] = stSensor;
g_map_iec_new[stPoint.sadr] = stPoint; g_map_iec_new[stPoint.sadr] = stPoint;
itCachedDev = g_map_dev_data.find(stSensor.sensor_id);
if (itCachedDev == g_map_dev_data.end())
{
CACHED_DEV_DATA cachedData;
cachedData.device = &(itDev->second);
cachedData.firstTs = 0;
cachedData.assignedFields = 0;
itCachedDev = g_map_dev_data.insert(g_map_dev_data.end(), std::make_pair<>(stSensor.sensor_id, cachedData));
}
IEC_FIELD stField;
stField.name = (const char*)stPoint.fieldName;
stField.ts = 0;
stField.fval = 0.0;
itCachedDev->second.fields[stPoint.sadr] = std::make_pair<>(stField, false);
mutex_unlock(g_map_iec_mutex_new); mutex_unlock(g_map_iec_mutex_new);
} }
pdbHandle->FreeRecord(res); pdbHandle->FreeRecord(res);
} }
void ResetCachedDeviceData()
{
map<unsigned int, CACHED_DEV_DATA>::iterator it;
std::map<unsigned int, std::pair<IEC_FIELD, bool> >::iterator itField;
mutex_lock(g_map_iec_mutex_new);
for (it = g_map_dev_data.begin(); it != g_map_dev_data.end(); ++it)
{
it->second.firstTs = 0;
it->second.assignedFields = 0;
for (itField = it->second.fields.begin(); itField != it->second.fields.end(); ++itField)
{
itField->second.first.ts = 0;
itField->second.first.fval = 0.0;
itField->second.second = false;
}
}
mutex_unlock(g_map_iec_mutex_new);
}
std::string BuildSqlForDeviceData(CACHED_DEV_DATA& cachedDev)
{
std::string fields, fieldValues, sql;
char dataBuf[32] = { 0 };
#if __cplusplus < 201103L
std::map<unsigned int, std::pair<IEC_FIELD, bool> >::iterator it;
for (it = cachedDev.fields.begin(); it != cachedDev.fields.end(); ++it)
#else
std::map<unsigned int, std::pair<IEC_FIELD, bool> >::const_iterator it;
for (it = cachedDev.fields.cbegin(); it != cachedDev.fields.cend(); ++it)
#endif
{
fields.append(it->second.first.name);
fields.append(",");
fieldValues.append("'");
snprintf(dataBuf, sizeof(dataBuf), "%0.4f", it->second.first.fval);
fieldValues.append(dataBuf);
fieldValues.append("',");
if ((cachedDev.device->one_dtime == 0))
{
fields.append(it->second.first.name);
fields.append("_time,");
fieldValues.append("FROM_UNIXTIME(");
snprintf(dataBuf, sizeof(dataBuf), "%lld", (long long)it->second.first.ts);
fieldValues.append(dataBuf);
fieldValues.append("),");
}
}
sql = "INSERT INTO ";
sql.append((const char *)cachedDev.device->tableName);
sql.append("(");
sql.append((const char *)cachedDev.device->devidFieldName);
sql.append(",");
if (cachedDev.device->one_dtime != 0)
{
sql.append((const char *)cachedDev.device->dtimeFieldName);
sql.append(",");
}
sql.append(fields, 0, fields.size() - 1);
sql.append(") VALUES(");
snprintf(dataBuf, sizeof(dataBuf), "%u", cachedDev.device->sensor_id);
sql.append(dataBuf);
sql.append(",");
if (cachedDev.device->one_dtime != 0)
{
sql.append("FROM_UNIXTIME(");
snprintf(dataBuf, sizeof(dataBuf), "%lld", (long long)cachedDev.firstTs);
sql.append(dataBuf);
sql.append("),");
}
sql.append(fieldValues, 0, fieldValues.size() - 1);
sql.append(")");
return sql;
}
bool AssignValueToDeviceData(CACHED_DEV_DATA& cachedDev, const IEC_OBJVAL_NEW& val)
{
std::map<unsigned int, std::pair<IEC_FIELD, bool> >::iterator it = cachedDev.fields.find(val.sadr);
if (it != cachedDev.fields.end())
{
if (!(it->second.second))
{
if (cachedDev.firstTs == 0)
{
cachedDev.firstTs = val.dtime;
}
cachedDev.assignedFields++;
it->second.first.fval = val.fval;
it->second.first.ts = val.dtime;
return true;
}
}
return false;
}
// 刷新设备更新时间 // 刷新设备更新时间
void SethDevTimeStat(unsigned char *sys_code, unsigned char ws) void SethDevTimeStat(unsigned char *sys_code, unsigned char ws)
{ {

Loading…
Cancel
Save