#include "TcpServer.h" #include #include #include #include #include #include #include #include "DatabaseWorker.h" #include "../logger/Logger.h" // 回调函数,用于处理查询结果 static int sql_callback(void *NotUsed, int argc, char **argv, char **azColName) { for (int i = 0; i < argc; i++) { printf("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL"); } printf("\n"); return 0; } // 计算CRC16的表格(预先计算以提高效率) static uint16_t crc16_table[256]; // 初始化CRC16表格 static void init_crc16_table() { uint16_t polynomial = 0xA001; for (uint16_t i = 0; i < 256; i++) { uint16_t crc = i; for (uint8_t j = 0; j < 8; j++) { if (crc & 0x0001) { crc = (crc >> 1) ^ polynomial; } else { crc >>= 1; } } crc16_table[i] = crc; } } // 计算给定数据缓冲区的CRC16校验值 static uint16_t calculate_crc16(uint8_t *data, size_t length) { uint16_t crc = 0xFFFF; // 初始值 for (size_t i = 0; i < length; i++) { uint8_t table_index = (crc ^ data[i]) & 0xFF; crc = (crc >> 8) ^ crc16_table[table_index]; } return crc; } // 构造函数 TcpServer::TcpServer(int port) : db_queue(), // 默认构造 db_queue db_worker(db_queue), // 用 db_queue 初始化 db_worker running(true), conn_num(0) { server_fd = socket(AF_INET, SOCK_STREAM, 0); if (server_fd < 0) { perror("Socket creation failed"); throw std::runtime_error("Socket creation failed"); } address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons(port); addrlen = sizeof(address); int opt = 1; // Set socket options if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) { perror("Setsockopt failed"); std::cout << "port = " << port << std::endl; close(server_fd); throw std::runtime_error("Setsockopt failed"); } // Bind socket to address if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) { perror("Bind failed"); close(server_fd); throw std::runtime_error("Bind failed"); } // Listen for incoming connections if (listen(server_fd, SOMAXCONN) < 0) { perror("Listen failed"); close(server_fd); throw std::runtime_error("Listen failed"); } std::cout << "Server listening on port " << port << std::endl; } // 析构函数 TcpServer::~TcpServer() { running = false; if (accept_thread.joinable()) { accept_thread.join(); } for (auto &th : threads) { if (th.joinable()) { th.join(); } } close(server_fd); } // 启动服务器 void TcpServer::start_server() { std::cout << "Starting server..." << std::endl; // 初始化CRC16表格 init_crc16_table(); accept_thread = std::thread([this]() { while (this->running) { char client_ip[INET_ADDRSTRLEN]; int new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen); if (new_socket < 0) { // 打印错误信息 std::cerr << "Accept failed with error: " << strerror(errno) << std::endl; std::cout << "当前连接数量: " << this->conn_num << std::endl; if (errno == EINTR) { // 如果是信号中断,继续等待连接 continue; } else { // 其他错误,继续等待连接 continue; } } // 获取客户端 IP 和端口 inet_ntop(AF_INET, &(address.sin_addr), client_ip, INET_ADDRSTRLEN); int client_port = ntohs(address.sin_port); std::string ip_str(client_ip); std::cout << "客户端已连接: " << ip_str << ":" << client_port << std::endl; { std::lock_guard lock(mtx_client_addrs); if (ip_str == "172.16.111.60" || ip_str == "172.16.111.61" || ip_str == "172.16.111.62") { for (auto it = client_addrs.begin(); it != client_addrs.end();) { if (it->ip == ip_str) { it = client_addrs.erase(it); // 删除当前元素,返回下一个迭代器 } else { ++it; // 不删,继续往后走 } } } client_addrs.push_back({ip_str, client_port, new_socket}); } { std::lock_guard lock(mtx); this->conn_num++; LOG_Debug("客户端已连接:%s %d 当前连接数量:%d", ip_str.c_str(), client_port, this->conn_num); std::cout << "当前连接数量: " << this->conn_num << std::endl; // LOG_Debug("", this->conn_num); // 不会输出 } threads.emplace_back([this, new_socket, ip_str]() { handle_client(new_socket, ip_str); }); // std::thread(&TcpServer::handle_client, this, new_socket).detach(); } }); accept_thread.detach(); // 让接受线程独立运行 // accept_thread.join(); } std::vector TcpServer::get_connected_clients() { std::lock_guard lock(mtx_client_addrs); return client_addrs; } void TcpServer::remove_connected_clients(std::string ip) { std::lock_guard lock(mtx_client_addrs); auto it = client_addrs.begin(); while (it != client_addrs.end()) { if (it->ip == ip) { // close(it->sockfd); // 关闭 socket it = client_addrs.erase(it); // 删除元素并更新迭代器 } else { ++it; } } } nlohmann::json decode_modbus_msg(const uint8_t * buffer, std::string& device_name){ int dev_addr = buffer[0]; int fun_code = buffer[1]; int data_len = buffer[2]; int dev_type = buffer[6]; nlohmann::json json_save; if(dev_type == 9) { std::cout << "门锁" << std::endl; } if(dev_type == 10) { std::cout << "噪声" << std::endl; } if(dev_type == 12) { std::cout << "蓄电池" << std::endl; } std::string postfix = "NARI-"; // 公共字段 json_save["status"] = "run"; json_save["gateway_id"] = 0; std::string dev_name = "device_" + std::to_string(dev_addr); if (dev_type == 1) { // 温湿度 device_name = "温湿度-" + postfix + std::to_string(dev_addr); json_save["device_name"] = "温湿度-" + postfix + std::to_string(dev_addr); json_save["device_id"] = dev_addr; // 温度对象 nlohmann::json temp_data; temp_data["name"] = "温度"; temp_data["value"] = ((buffer[7] << 8) + buffer[8])/10.0; temp_data["format"] = "℃"; temp_data["format_id"] = 40; json_save["data"].push_back(temp_data); // 湿度对象 nlohmann::json humidity_data; humidity_data["name"] = "湿度"; humidity_data["value"] = ((buffer[9] << 8) + buffer[10]); humidity_data["format"] = "%RH"; humidity_data["format_id"] = 40; json_save["data"].push_back(humidity_data); } else if (dev_type == 2) { // 水浸 device_name = "水浸-" + postfix + std::to_string(dev_addr); json_save["device_name"] = "水浸-" + postfix + std::to_string(dev_addr); json_save["device_id"] = dev_addr; // 水浸对象 nlohmann::json water_data; water_data["name"] = "水浸状态"; water_data["value"] = ((buffer[7] << 8) + buffer[8]); // 0 表示无水浸,1 表示有水浸 // if((buffer[7] << 8) + buffer[8] == 0) // water_data["value"] = "正常"; // else // water_data["value"] = "告警"; water_data["format"] = ""; water_data["format_id"] = 0; json_save["data"].push_back(water_data); } else if (dev_type == 3) { // 烟感 device_name = "烟感-" + postfix + std::to_string(dev_addr); json_save["device_name"] = "烟感-" + postfix + std::to_string(dev_addr); json_save["device_id"] = dev_addr; // 烟感对象 nlohmann::json smoke_data; smoke_data["name"] = "报警状态"; smoke_data["value"] = (buffer[7] << 8) + buffer[8]; // if((buffer[7] << 8) + buffer[8] == 0) // smoke_data["value"] = "正常"; // else // smoke_data["value"] = "告警"; smoke_data["format"] = ""; smoke_data["format_id"] = 0; json_save["data"].push_back(smoke_data); }else if (dev_type == 7) { device_name = "空调-" + postfix + std::to_string(dev_addr); json_save["device_name"] = "空调-" + postfix + std::to_string(dev_addr); json_save["device_id"] = dev_addr; // 开关对象 nlohmann::json switch_data; switch_data["name"] = "开关"; switch_data["value"] = ((buffer[7] << 8) + buffer[8]); switch_data["format"] = ""; switch_data["format_id"] = 0; json_save["data"].push_back(switch_data); // 模式对象 nlohmann::json mode_data; mode_data["name"] = "模式"; mode_data["value"] = ((buffer[9] << 8) + buffer[10]); mode_data["format"] = ""; mode_data["format_id"] = 0; json_save["data"].push_back(mode_data); // 模式对象 nlohmann::json set_temperature_data; set_temperature_data["name"] = "设定温度"; set_temperature_data["value"] = ((buffer[11] << 8) + buffer[12]); set_temperature_data["format"] = "℃"; set_temperature_data["format_id"] = 40; json_save["data"].push_back(set_temperature_data); // 模式对象 nlohmann::json wind_speed_data; wind_speed_data["name"] = "风速"; wind_speed_data["value"] = ((buffer[13] << 8) + buffer[14]); wind_speed_data["format"] = ""; wind_speed_data["format_id"] = 40; json_save["data"].push_back(wind_speed_data); // 模式对象 nlohmann::json wind_direction_data; wind_direction_data["name"] = "风向"; wind_direction_data["value"] = ((buffer[15] << 8) + buffer[16]); wind_direction_data["format"] = ""; wind_direction_data["format_id"] = 0; json_save["data"].push_back(wind_direction_data); // 上报周期 nlohmann::json report_period; report_period["name"] = "上报周期"; report_period["value"] = ((buffer[17] << 8) + buffer[18]); report_period["format"] = "s"; report_period["format_id"] = 0; json_save["data"].push_back(report_period); }else if (dev_type == 6) { // 照明 device_name = "照明-" + postfix + std::to_string(dev_addr); json_save["device_name"] = "照明-" + postfix + std::to_string(dev_addr); json_save["device_id"] = dev_addr; //开关 nlohmann::json switch_data; switch_data["name"] = "开关"; switch_data["value"] = (buffer[7] << 8) + buffer[8]; switch_data["format"] = ""; switch_data["format_id"] = 0; json_save["data"].push_back(switch_data); // 上报周期 nlohmann::json report_period; report_period["name"] = "上报周期"; report_period["value"] = ((buffer[9] << 8) + buffer[10]); report_period["format"] = "s"; report_period["format_id"] = 0; json_save["data"].push_back(report_period); }else if (dev_type == 8) { // 门磁 device_name = "门磁-" + postfix + std::to_string(dev_addr); json_save["device_name"] = "门磁-" + postfix + std::to_string(dev_addr); json_save["device_id"] = dev_addr; //开关 nlohmann::json switch_data; switch_data["name"] = "开关"; switch_data["value"] = (buffer[7] << 8) + buffer[8]; switch_data["format"] = ""; switch_data["format_id"] = 0; json_save["data"].push_back(switch_data); // 上报周期 nlohmann::json report_period; report_period["name"] = "上报周期"; report_period["value"] = ((buffer[9] << 8) + buffer[10]); report_period["format"] = "s"; report_period["format_id"] = 0; json_save["data"].push_back(report_period); }else if (dev_type == 5) { // 风机 device_name = "风机-" + postfix + std::to_string(dev_addr); json_save["device_name"] = "风机-" + postfix + std::to_string(dev_addr); json_save["device_id"] = dev_addr; //开关 nlohmann::json switch_data; switch_data["name"] = "开关"; switch_data["value"] = (buffer[7] << 8) + buffer[8]; switch_data["format"] = ""; switch_data["format_id"] = 0; json_save["data"].push_back(switch_data); // 上报周期 nlohmann::json report_period; report_period["name"] = "上报周期"; report_period["value"] = ((buffer[9] << 8) + buffer[10]); report_period["format"] = "s"; report_period["format_id"] = 0; json_save["data"].push_back(report_period); }else if (dev_type == 9) { // 智能门锁 device_name = "智能门锁-" + postfix + std::to_string(dev_addr); json_save["device_name"] = "智能门锁-" + postfix + std::to_string(dev_addr); json_save["device_id"] = dev_addr; //开关 nlohmann::json switch_data; switch_data["name"] = "门状态"; switch_data["value"] = (buffer[7] << 8) + buffer[8]; switch_data["format"] = ""; switch_data["format_id"] = 0; json_save["data"].push_back(switch_data); switch_data["name"] = "门控制"; switch_data["value"] = (buffer[9] << 8) + buffer[10]; switch_data["format"] = ""; switch_data["format_id"] = 0; json_save["data"].push_back(switch_data); switch_data["name"] = "上报周期"; switch_data["value"] = (buffer[11] << 8) + buffer[12]; switch_data["format"] = "s"; switch_data["format_id"] = 0; json_save["data"].push_back(switch_data); }else if (dev_type == 4) { // 双气 device_name = "双气-" + postfix + std::to_string(dev_addr); json_save["device_name"] = "双气-" + postfix + std::to_string(dev_addr); json_save["device_id"] = dev_addr; // 氧气浓度 nlohmann::json temp_data; temp_data["name"] = "氧气浓度"; temp_data["value"] = ((buffer[7] << 8) + buffer[8])/10.0; temp_data["format"] = "%"; temp_data["format_id"] = 0; json_save["data"].push_back(temp_data); // SF6浓度 nlohmann::json humidity_data; humidity_data["name"] = "SF6浓度"; humidity_data["value"] = ((buffer[9] << 8) + buffer[10])/10.0; humidity_data["format"] = "ppm"; humidity_data["format_id"] = 0; json_save["data"].push_back(humidity_data); // 上报周期 nlohmann::json report_period; report_period["name"] = "上报周期"; report_period["value"] = ((buffer[11] << 8) + buffer[12]); report_period["format"] = "s"; report_period["format_id"] = 0; json_save["data"].push_back(report_period); }else if (dev_type == 10) { // 噪声 device_name = "噪声-" + postfix + std::to_string(dev_addr); json_save["device_name"] = "噪声-" + postfix + std::to_string(dev_addr); json_save["device_id"] = dev_addr; // 噪声值 nlohmann::json temp_data; temp_data["name"] = "噪声值"; temp_data["value"] = ((buffer[7] << 8) + buffer[8])/10.0; temp_data["format"] = "db"; temp_data["format_id"] = 40; json_save["data"].push_back(temp_data); // 上报周期 nlohmann::json report_period; report_period["name"] = "上报周期"; report_period["value"] = ((buffer[9] << 8) + buffer[10]); report_period["format"] = "s"; report_period["format_id"] = 0; json_save["data"].push_back(report_period); }else if (dev_type == 11) { // 噪声 device_name = "臭氧-" + postfix + std::to_string(dev_addr); json_save["device_name"] = "臭氧-" + postfix + std::to_string(dev_addr); json_save["device_id"] = dev_addr; // 臭氧浓度 nlohmann::json temp_data; temp_data["name"] = "臭氧浓度"; temp_data["value"] = ((buffer[7] << 8) + buffer[8])/10.0; temp_data["format"] = "0.1ppm"; temp_data["format_id"] = 40; json_save["data"].push_back(temp_data); // 上报周期 nlohmann::json report_period; report_period["name"] = "上报周期"; report_period["value"] = ((buffer[11] << 8) + buffer[12]); report_period["format"] = "s"; report_period["format_id"] = 0; json_save["data"].push_back(report_period); } else if (dev_type == 12) { // 蓄电池 device_name = "蓄电池-" + std::to_string(dev_addr); json_save["device_name"] = "蓄电池-" + std::to_string(dev_addr); json_save["device_id"] = dev_addr; // 开关对象 nlohmann::json group_voltage; group_voltage["name"] = "组电压"; group_voltage["value"] = (((buffer[7] << 8) + buffer[8]))/10.0; group_voltage["format"] = "V"; group_voltage["format_id"] = 0; json_save["data"].push_back(group_voltage); // 模式对象 nlohmann::json group_current; group_current["name"] = "组电流"; group_current["value"] = ((buffer[9] << 8) + buffer[10]) / 100.0; group_current["format"] = "A"; group_current["format_id"] = 0; json_save["data"].push_back(group_current); for(int i = 0; i <= 8; i++) { nlohmann::json sigle_voltage; sigle_voltage["name"] = "电池-" + std::to_string(i+1) + " 电压"; sigle_voltage["value"] = ((buffer[11 + i * 2] << 8) + buffer[12 + i * 2]) / 1000.0; sigle_voltage["format"] = "V"; sigle_voltage["format_id"] = 40; json_save["data"].push_back(sigle_voltage); } // 模式对象 for(int i = 0; i <= 8; i++) { nlohmann::json sigle_temp; sigle_temp["name"] = "电池-" + std::to_string(i+1) + " 温度"; sigle_temp["value"] = ((buffer[29 + i * 2] << 8) + buffer[30 + i * 2]) / 100.0; sigle_temp["format"] = "℃"; sigle_temp["format_id"] = 40; json_save["data"].push_back(sigle_temp); } // 模式对象 for(int i = 0; i <= 8; i++) { nlohmann::json sigle_resistance; sigle_resistance["name"] = "电池-" + std::to_string(i+1) + " 内阻"; sigle_resistance["value"] = ((buffer[47 + i * 2] << 8) + buffer[48 + i * 2]) / 1000.0; sigle_resistance["format"] = "mΩ"; sigle_resistance["format_id"] = 0; json_save["data"].push_back(sigle_resistance); } for(int i = 0; i <= 8; i++) { nlohmann::json sigle_volumn; sigle_volumn["name"] = "电池-" + std::to_string(i+1) + " 容量"; sigle_volumn["value"] = ((buffer[65 + i * 2] << 8) + buffer[66 + i * 2]) / 100.0; sigle_volumn["format"] = ""; sigle_volumn["format_id"] = 0; json_save["data"].push_back(sigle_volumn); } // 上报周期 nlohmann::json report_period; report_period["name"] = "上报周期"; report_period["value"] = ((buffer[83] << 8) + buffer[84]); report_period["format"] = "s"; report_period["format_id"] = 0; json_save["data"].push_back(report_period); }else{ std::cout << "未知设备类型: " << dev_type << std::endl; } // std::cout << "devaice_name: " << dev_name << std::endl; // std::cout << "json_save: "<< json_save.dump(1) << std::endl; return json_save; } // 处理客户端连接 void TcpServer::handle_client(int client_socket, const std::string& ip) { uint8_t buffer[BUFFER_SIZE]; while (true) { // 检测连接状态 fd_set read_fds; FD_ZERO(&read_fds); FD_SET(client_socket, &read_fds); struct timeval timeout = {1, 0}; // 1秒超时 int activity = select(client_socket + 1, &read_fds, nullptr, nullptr, &timeout); if (activity < 0 && errno != EINTR) { close(client_socket); this->conn_num--; std::cout << "连接已断开: " << this->conn_num << std::endl; LOG_Debug("%s 连接已断开,当前连接数量:%d", ip.c_str(), this->conn_num); break; // 出错退出 } if (FD_ISSET(client_socket, &read_fds)) { memset(buffer, 0, BUFFER_SIZE); int bytes_received = recv(client_socket, buffer, BUFFER_SIZE, 0); if (bytes_received <= 0) { close(client_socket); this->conn_num--; std::cout << "连接已断开,当前连接数量是: " << this->conn_num << std::endl; LOG_Debug("%s 连接已断开,当前连接数量:%d", ip.c_str(), this->conn_num); break; } std::cout << "接收到对端消息, size = " << bytes_received << " data = "; for (int i = 0; i < bytes_received; i++) { printf("0x%02x ", buffer[i]); } printf("\n"); if (bytes_received < 11) { std::cout << "消息太短,丢弃!" << std::endl; continue; } // if(calculate_crc16(buffer, bytes_received - 2)){ uint16_t CRC = calculate_crc16(buffer, bytes_received - 2); if (((buffer[bytes_received - 1] << 8) != (CRC & (0xff << 8))) || (buffer[bytes_received - 2] != (CRC & 0xff))) { std::cout << "crc错误,丢弃!" << std::endl; continue; } // std::cout << "CRC = " << CRC < 0); } sqlite3_finalize(stmt); if (exists) { std::cout << "设备 [" << deviceName << "] 已存在,跳过插入。 ip=" << ip << std::endl; if(ip == "172.16.111.60" || ip == "172.16.111.61" || ip == "172.16.111.62" || ip == "172.16.111.64" || ip == "172.16.111.63" || ip == "172.16.111.65" || ip == "172.16.111.68" || ip == "172.16.111.70") { sqlite3_close(db); close(client_socket); this->conn_num--; std::cout << "连接已经断开: " << ip << "当前连接数量:" << this->conn_num << std::endl; LOG_Debug("%s 连接已断开,当前连接数量:%d", ip.c_str(), this->conn_num); break; } sqlite3_close(db); continue; } // 插入新设备 std::string insertSQL = "INSERT INTO device_position (name, x, y) VALUES (?, 0.5, 0.25);"; if (sqlite3_prepare_v2(db, insertSQL.c_str(), -1, &stmt, nullptr) != SQLITE_OK) { std::cerr << "Insert prepare failed: " << sqlite3_errmsg(db) << std::endl; sqlite3_close(db); continue; } sqlite3_bind_text(stmt, 1, deviceName.c_str(), -1, SQLITE_STATIC); if (sqlite3_step(stmt) != SQLITE_DONE) { std::cerr << "插入失败: " << sqlite3_errmsg(db) << std::endl; sqlite3_finalize(stmt); sqlite3_close(db); continue; } sqlite3_finalize(stmt); sqlite3_close(db); std::cout << "设备 [" << deviceName << "] 插入成功。" << std::endl; if(ip == "172.16.111.60" || ip == "172.16.111.61" || ip == "172.16.111.62" || ip == "172.16.111.64" || ip == "172.16.111.64" || ip == "172.16.111.63" || ip == "172.16.111.65" || ip == "172.16.111.68" || ip == "172.16.111.70") { close(client_socket); this->conn_num--; std::cout << "连接已经断开: " << ip << "当前连接数量:" << this->conn_num << std::endl; LOG_Debug("%s 连接已经断开 当前连接数量:%d", ip.c_str(), this->conn_num); break; } } } }