#include "TcpServer.h" #include #include #include #include #include #include #include // 回调函数,用于处理查询结果 static int sql_callback(void *NotUsed, int argc, char **argv, char **azColName) { for (int i = 0; i < argc; i++) { printf("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL"); } printf("\n"); return 0; } // 计算CRC16的表格(预先计算以提高效率) uint16_t crc16_table[256]; // 初始化CRC16表格 void init_crc16_table() { uint16_t polynomial = 0xA001; for (uint16_t i = 0; i < 256; i++) { uint16_t crc = i; for (uint8_t j = 0; j < 8; j++) { if (crc & 0x0001) { crc = (crc >> 1) ^ polynomial; } else { crc >>= 1; } } crc16_table[i] = crc; } } // 计算给定数据缓冲区的CRC16校验值 uint16_t calculate_crc16(uint8_t *data, size_t length) { uint16_t crc = 0xFFFF; // 初始值 for (size_t i = 0; i < length; i++) { uint8_t table_index = (crc ^ data[i]) & 0xFF; crc = (crc >> 8) ^ crc16_table[table_index]; } return crc; } // 构造函数 TcpServer::TcpServer(int port) : conn_num(0), running(true) { server_fd = socket(AF_INET, SOCK_STREAM, 0); if (server_fd < 0) { perror("Socket creation failed"); throw std::runtime_error("Socket creation failed"); } address.sin_family = AF_INET; address.sin_addr.s_addr = INADDR_ANY; address.sin_port = htons(port); addrlen = sizeof(address); int opt = 1; // Set socket options if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) { perror("Setsockopt failed"); std::cout << "port = " << port << std::endl; close(server_fd); throw std::runtime_error("Setsockopt failed"); } // Bind socket to address if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) { perror("Bind failed"); close(server_fd); throw std::runtime_error("Bind failed"); } // Listen for incoming connections if (listen(server_fd, 3) < 0) { perror("Listen failed"); close(server_fd); throw std::runtime_error("Listen failed"); } std::cout << "Server listening on port " << port << std::endl; } // 析构函数 TcpServer::~TcpServer() { running = false; if (accept_thread.joinable()) { accept_thread.join(); } for (auto &th : threads) { if (th.joinable()) { th.join(); } } close(server_fd); } // 启动服务器 void TcpServer::start_server() { std::cout << "Starting server..." << std::endl; // 初始化CRC16表格 init_crc16_table(); accept_thread = std::thread([this]() { while (this->running) { int new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen); if (new_socket < 0) { // 打印错误信息 std::cerr << "Accept failed with error: " << strerror(errno) << std::endl; if (errno == EINTR) { // 如果是信号中断,继续等待连接 continue; } else { // 其他错误,继续等待连接 continue; } } std::cout << "New client connected" << std::endl; { std::lock_guard lock(mtx); this->conn_num++; std::cout << "Current connection count: " << this->conn_num << std::endl; } // Create a new thread to handle the client threads.emplace_back(&TcpServer::handle_client, this, new_socket); } }); // accept_thread.detach(); // 让接受线程独立运行 accept_thread.join(); } // 处理客户端连接 void TcpServer::handle_client(int client_socket) { uint8_t buffer[BUFFER_SIZE]; while (true) { memset(buffer, 0, BUFFER_SIZE); int bytes_received = recv(client_socket, buffer, BUFFER_SIZE, 0); if (bytes_received <= 0) { // Client disconnected or error std::cout << "Client disconnected or error occurred. bytes_received = " << bytes_received << std::endl; close(client_socket); this->conn_num --; std::cout << "Current connection count: " << this->conn_num << std::endl; break; } std::cout << "Received from client, size = " << bytes_received << " data = "; for(int i = 0;i < bytes_received; i++){ printf("0x%02x ", buffer[i]); } printf("\n"); if(bytes_received < 11){ std::cout << "too short msg!" << std::endl; continue; } // if(calculate_crc16(buffer, bytes_received - 2)){ uint16_t CRC = calculate_crc16(buffer, bytes_received - 2); if(((buffer[bytes_received - 1]<<8) != (CRC&(0xff<<8))) || (buffer[bytes_received - 2] != (CRC&0xff))){ std::cout << "error crc msg!" << std::endl; continue; } std::cout << "CRC = " << calculate_crc16(buffer, bytes_received - 2) <err) { if (context) { printf("Error: %s\n", context->errstr); redisFree(context); } else { printf("Can't allocate redis context\n"); } return ; } // 设置键值对 std::cout << "devaice_name " << dev_name << "json_save "<< json_save.dump(4) << std::endl; std::string out = json_save.dump(0); redisReply *reply = (redisReply *)redisCommand(context, "SET %s %s", dev_name.c_str(), out.c_str()); if (reply == NULL) { printf("Error: %s\n", context->errstr); redisFree(context); return ; } // 检查 SET 命令的返回值 if (reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "OK") == 0) { // printf("Key '%s' set to value '%s'\n", key, value); } else { // printf("Failed to set key '%s'\n", key); } // 释放 reply 结构 freeReplyObject(reply); // 断开 Redis 连接 redisFree(context); sqlite3 *db; char *zErrMsg = 0; int rc; // 打开数据库连接 rc = sqlite3_open("/usr/local/bin/database/sqlite/history_data.db", &db); if (rc) { fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db)); sqlite3_close(db); return; } // 创建表 const char *sql_create_table = "CREATE TABLE IF NOT EXISTS history_data (" "id INTEGER PRIMARY KEY AUTOINCREMENT," "data TEXT NOT NULL," "time TEXT NOT NULL," "device_id INTEGER NOT NULL," "device_name TEXT NOT NULL," "gateway_id INTEGER NOT NULL," "status TEXT NOT NULL);"; rc = sqlite3_exec(db, sql_create_table, sql_callback, 0, &zErrMsg); if (rc != SQLITE_OK) { fprintf(stderr, "SQL error: %s\n", zErrMsg); sqlite3_free(zErrMsg); } else { printf("Table created successfully\n"); } // 插入数据 const char *sql_insert_data = "INSERT INTO history_data (data, time, device_id, device_name, gateway_id, status) VALUES (?, ?, ?, ?, ?, ?);"; // 准备插入语句 sqlite3_stmt *stmt; rc = sqlite3_prepare_v2(db, sql_insert_data, -1, &stmt, 0); if (rc != SQLITE_OK) { fprintf(stderr, "Failed to prepare statement: %s\n", sqlite3_errmsg(db)); sqlite3_close(db); return; } // 定义要插入的数据 std::string sql_data = json_save["data"].dump(); rc = sqlite3_bind_text(stmt, 1, sql_data.c_str(), -1, SQLITE_STATIC); // 获取当前时间 time_t rawtime; struct tm *timeinfo; // 获取当前时间的时间戳 time(&rawtime); // 将时间戳转换为本地时间 timeinfo = localtime(&rawtime); // 提取年、月、日、时、分、秒 int year = 1900 + timeinfo->tm_year; // tm_year 是从 1900 年开始的年数 int month = 1 + timeinfo->tm_mon; // tm_mon 是从 0 开始的月份(0-11) int day = timeinfo->tm_mday; // tm_mday 是一个月中的第几天(1-31) int hour = timeinfo->tm_hour; // tm_hour 是小时(0-23) int minute = timeinfo->tm_min; // tm_min 是分钟(0-59) int second = timeinfo->tm_sec; // tm_sec 是秒(0-59) // 打印时间信息 char time[30] = {}; sprintf(time,"%d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second); rc = sqlite3_bind_text(stmt, 2, time, -1, SQLITE_STATIC); rc = sqlite3_bind_int(stmt, 3, dev_addr); rc = sqlite3_bind_text(stmt, 4, device_name.c_str(), -1, SQLITE_STATIC); rc = sqlite3_bind_int(stmt, 5, 0); rc = sqlite3_bind_text(stmt, 6, "run", -1, SQLITE_STATIC); if (rc != SQLITE_OK) { fprintf(stderr, "Failed to bind parameters: %s\n", sqlite3_errmsg(db)); sqlite3_finalize(stmt); sqlite3_close(db); return; } rc = sqlite3_step(stmt); if (rc != SQLITE_DONE) { fprintf(stderr, "Failed to execute insert: %s\n", sqlite3_errmsg(db)); } // 清理语句 sqlite3_finalize(stmt); // 查询数据 // const char *sql_select_data = "SELECT * FROM history_data;"; // rc = sqlite3_exec(db, sql_select_data, sql_callback, 0, &zErrMsg); // if (rc != SQLITE_OK) { // fprintf(stderr, "SQL error: %s\n", zErrMsg); // sqlite3_free(zErrMsg); // } // 关闭数据库连接 sqlite3_close(db); } }