123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416 |
- #include "TcpServer.h"
- #include <time.h>
- #include <nlohmann/json.hpp>
- #include <stdint.h>
- #include <stdio.h>
- #include <stdlib.h>
- #include <hiredis/hiredis.h>
- #include <sqlite3.h>
- // 回调函数,用于处理查询结果
- static int sql_callback(void *NotUsed, int argc, char **argv, char **azColName) {
- for (int i = 0; i < argc; i++) {
- printf("%s = %s\n", azColName[i], argv[i] ? argv[i] : "NULL");
- }
- printf("\n");
- return 0;
- }
- // 计算CRC16的表格(预先计算以提高效率)
- uint16_t crc16_table[256];
- // 初始化CRC16表格
- void init_crc16_table() {
- uint16_t polynomial = 0xA001;
- for (uint16_t i = 0; i < 256; i++) {
- uint16_t crc = i;
- for (uint8_t j = 0; j < 8; j++) {
- if (crc & 0x0001) {
- crc = (crc >> 1) ^ polynomial;
- } else {
- crc >>= 1;
- }
- }
- crc16_table[i] = crc;
- }
- }
- // 计算给定数据缓冲区的CRC16校验值
- uint16_t calculate_crc16(uint8_t *data, size_t length) {
- uint16_t crc = 0xFFFF; // 初始值
- for (size_t i = 0; i < length; i++) {
- uint8_t table_index = (crc ^ data[i]) & 0xFF;
- crc = (crc >> 8) ^ crc16_table[table_index];
- }
- return crc;
- }
- // 构造函数
- TcpServer::TcpServer(int port) : conn_num(0), running(true) {
- server_fd = socket(AF_INET, SOCK_STREAM, 0);
- if (server_fd < 0) {
- perror("Socket creation failed");
- throw std::runtime_error("Socket creation failed");
- }
- address.sin_family = AF_INET;
- address.sin_addr.s_addr = INADDR_ANY;
- address.sin_port = htons(port);
- addrlen = sizeof(address);
- int opt = 1;
- // Set socket options
- if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
- perror("Setsockopt failed");
- std::cout << "port = " << port << std::endl;
- close(server_fd);
- throw std::runtime_error("Setsockopt failed");
- }
- // Bind socket to address
- if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
- perror("Bind failed");
- close(server_fd);
- throw std::runtime_error("Bind failed");
- }
- // Listen for incoming connections
- if (listen(server_fd, 3) < 0) {
- perror("Listen failed");
- close(server_fd);
- throw std::runtime_error("Listen failed");
- }
- std::cout << "Server listening on port " << port << std::endl;
- }
- // 析构函数
- TcpServer::~TcpServer() {
- running = false;
- if (accept_thread.joinable()) {
- accept_thread.join();
- }
- for (auto &th : threads) {
- if (th.joinable()) {
- th.join();
- }
- }
- close(server_fd);
- }
- // 启动服务器
- void TcpServer::start_server() {
- std::cout << "Starting server..." << std::endl;
- // 初始化CRC16表格
- init_crc16_table();
- accept_thread = std::thread([this]() {
- while (this->running) {
- int new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen);
- if (new_socket < 0) {
- // 打印错误信息
- std::cerr << "Accept failed with error: " << strerror(errno) << std::endl;
- if (errno == EINTR) {
- // 如果是信号中断,继续等待连接
- continue;
- } else {
- // 其他错误,继续等待连接
- continue;
- }
- }
- std::cout << "New client connected" << std::endl;
- {
- std::lock_guard<std::mutex> lock(mtx);
- this->conn_num++;
- std::cout << "Current connection count: " << this->conn_num << std::endl;
- }
- // Create a new thread to handle the client
- threads.emplace_back(&TcpServer::handle_client, this, new_socket);
- }
- });
- // accept_thread.detach(); // 让接受线程独立运行
- accept_thread.join();
- }
- // 处理客户端连接
- void TcpServer::handle_client(int client_socket) {
- uint8_t buffer[BUFFER_SIZE];
- while (true) {
- memset(buffer, 0, BUFFER_SIZE);
- int bytes_received = recv(client_socket, buffer, BUFFER_SIZE, 0);
- if (bytes_received <= 0) {
- // Client disconnected or error
- std::cout << "Client disconnected or error occurred. bytes_received = " << bytes_received << std::endl;
- close(client_socket);
- this->conn_num --;
- std::cout << "Current connection count: " << this->conn_num << std::endl;
- break;
- }
- std::cout << "Received from client, size = " << bytes_received << " data = ";
- for(int i = 0;i < bytes_received; i++){
- printf("0x%02x ", buffer[i]);
- }
- printf("\n");
- if(bytes_received < 11){
- std::cout << "too short msg!" << std::endl;
- continue;
- }
- // if(calculate_crc16(buffer, bytes_received - 2)){
- uint16_t CRC = calculate_crc16(buffer, bytes_received - 2);
- if(((buffer[bytes_received - 1]<<8) != (CRC&(0xff<<8))) || (buffer[bytes_received - 2] != (CRC&0xff))){
- std::cout << "error crc msg!" << std::endl;
- continue;
- }
- std::cout << "CRC = " << calculate_crc16(buffer, bytes_received - 2) <<std::endl;
- int dev_addr = buffer[0];
- int fun_code = buffer[1];
- int data_len = buffer[2];
- int dev_type = buffer[6];
- std::string device_name;
- nlohmann::json json_save;
- // 公共字段
- json_save["status"] = "run";
- json_save["gateway_id"] = 0;
- std::string dev_name = "device_" + std::to_string(dev_addr);
- if (dev_type == 1) { // 温湿度
- device_name = "温湿度-" + std::to_string(dev_addr);
- json_save["device_name"] = "温湿度-" + std::to_string(dev_addr);
- json_save["device_id"] = dev_addr;
- // 温度对象
- nlohmann::json temp_data;
- temp_data["name"] = "温度";
- temp_data["value"] = ((buffer[7] << 8) + buffer[8])/10.0;
- temp_data["format"] = "℃";
- temp_data["format_id"] = 40;
- json_save["data"].push_back(temp_data);
- // 湿度对象
- nlohmann::json humidity_data;
- humidity_data["name"] = "湿度";
- humidity_data["value"] = ((buffer[9] << 8) + buffer[10]);
- humidity_data["format"] = "%RH";
- humidity_data["format_id"] = 40;
- json_save["data"].push_back(humidity_data);
- } else if (dev_type == 2) { // 水浸
- device_name = "水浸-" + std::to_string(dev_addr);
- json_save["device_name"] = "水浸-" + std::to_string(dev_addr);
- json_save["device_id"] = dev_addr;
- // 水浸对象
- nlohmann::json water_data;
- water_data["name"] = "水浸状态";
- water_data["value"] = ((buffer[7] << 8) + buffer[8]); // 0 表示无水浸,1 表示有水浸
- water_data["format"] = "";
- water_data["format_id"] = 0;
- json_save["data"].push_back(water_data);
- } else if (dev_type == 3) { // 烟感
- device_name = "烟感-" + std::to_string(dev_addr);
- json_save["device_name"] = "烟感-" + std::to_string(dev_addr);
- json_save["device_id"] = dev_addr;
- // 烟感对象
- nlohmann::json smoke_data;
- smoke_data["name"] = "报警状态";
- smoke_data["value"] = ((buffer[7] << 8) + buffer[8]); // 0 表示无烟,1 表示有烟
- smoke_data["format"] = "";
- smoke_data["format_id"] = 0;
- json_save["data"].push_back(smoke_data);
- }
- // else if (dev_type == 7) { // 烟感
- // device_name = "空调-" + std::to_string(dev_addr);
- // json_save["device_name"] = "空调-" + std::to_string(dev_addr);
- // json_save["device_id"] = dev_addr;
- //
- // // 开关对象
- // nlohmann::json switch_data;
- // switch_data["name"] = "开关";
- // switch_data["value"] = ((buffer[7] << 8) + buffer[8]);
- // switch_data["format"] = "";
- // switch_data["format_id"] = 0;
- // json_save["data"].push_back(switch_data);
- // // 模式对象
- // nlohmann::json mode_data;
- // mode_data["name"] = "模式";
- // mode_data["value"] = ((buffer[9] << 8) + buffer[10]);
- // mode_data["format"] = "";
- // mode_data["format_id"] = 0;
- // json_save["data"].push_back(mode_data);
- // // 模式对象
- // nlohmann::json set_temperature_data;
- // set_temperature_data["name"] = "设定温度";
- // set_temperature_data["value"] = ((buffer[11] << 8) + buffer[12]);
- // set_temperature_data["format"] = "℃";
- // set_temperature_data["format_id"] = 40;
- // json_save["data"].push_back(set_temperature_data);
- // // 模式对象
- // nlohmann::json wind_speed_data;
- // wind_speed_data["name"] = "风速";
- // wind_speed_data["value"] = ((buffer[13] << 8) + buffer[14]);
- // wind_speed_data["format"] = "m/s";
- // wind_speed_data["format_id"] = 40;
- // json_save["data"].push_back(wind_speed_data);
- // // 模式对象
- // nlohmann::json wind_direction_data;
- // wind_direction_data["name"] = "风向";
- // wind_direction_data["value"] = ((buffer[15] << 8) + buffer[16]);
- // wind_direction_data["format"] = "";
- // wind_direction_data["format_id"] = 0;
- // json_save["data"].push_back(wind_direction_data);
- // }
- // 创建 Redis 连接
- redisContext *context = redisConnect("127.0.0.1", 6379);
- if (context == NULL || context->err) {
- if (context) {
- printf("Error: %s\n", context->errstr);
- redisFree(context);
- } else {
- printf("Can't allocate redis context\n");
- }
- return ;
- }
- // 设置键值对
- std::cout << "devaice_name " << dev_name << "json_save "<< json_save.dump(4) << std::endl;
- std::string out = json_save.dump(0);
- redisReply *reply = (redisReply *)redisCommand(context, "SET %s %s", dev_name.c_str(), out.c_str());
- if (reply == NULL) {
- printf("Error: %s\n", context->errstr);
- redisFree(context);
- return ;
- }
- // 检查 SET 命令的返回值
- if (reply->type == REDIS_REPLY_STATUS && strcmp(reply->str, "OK") == 0) {
- // printf("Key '%s' set to value '%s'\n", key, value);
- } else {
- // printf("Failed to set key '%s'\n", key);
- }
- // 释放 reply 结构
- freeReplyObject(reply);
- // 断开 Redis 连接
- redisFree(context);
- sqlite3 *db;
- char *zErrMsg = 0;
- int rc;
- // 打开数据库连接
- rc = sqlite3_open("/usr/local/bin/database/sqlite/history_data.db", &db);
- if (rc) {
- fprintf(stderr, "Can't open database: %s\n", sqlite3_errmsg(db));
- sqlite3_close(db);
- return;
- }
- // 创建表
- const char *sql_create_table =
- "CREATE TABLE IF NOT EXISTS history_data ("
- "id INTEGER PRIMARY KEY AUTOINCREMENT,"
- "data TEXT NOT NULL,"
- "time TEXT NOT NULL,"
- "device_id INTEGER NOT NULL,"
- "device_name TEXT NOT NULL,"
- "gateway_id INTEGER NOT NULL,"
- "status TEXT NOT NULL);";
- rc = sqlite3_exec(db, sql_create_table, sql_callback, 0, &zErrMsg);
- if (rc != SQLITE_OK) {
- fprintf(stderr, "SQL error: %s\n", zErrMsg);
- sqlite3_free(zErrMsg);
- } else {
- printf("Table created successfully\n");
- }
- // 插入数据
- const char *sql_insert_data =
- "INSERT INTO history_data (data, time, device_id, device_name, gateway_id, status) VALUES (?, ?, ?, ?, ?, ?);";
- // 准备插入语句
- sqlite3_stmt *stmt;
- rc = sqlite3_prepare_v2(db, sql_insert_data, -1, &stmt, 0);
- if (rc != SQLITE_OK) {
- fprintf(stderr, "Failed to prepare statement: %s\n", sqlite3_errmsg(db));
- sqlite3_close(db);
- return;
- }
- // 定义要插入的数据
- std::string sql_data = json_save["data"].dump();
- rc = sqlite3_bind_text(stmt, 1, sql_data.c_str(), -1, SQLITE_STATIC);
- // 获取当前时间
- time_t rawtime;
- struct tm *timeinfo;
- // 获取当前时间的时间戳
- time(&rawtime);
- // 将时间戳转换为本地时间
- timeinfo = localtime(&rawtime);
- // 提取年、月、日、时、分、秒
- int year = 1900 + timeinfo->tm_year; // tm_year 是从 1900 年开始的年数
- int month = 1 + timeinfo->tm_mon; // tm_mon 是从 0 开始的月份(0-11)
- int day = timeinfo->tm_mday; // tm_mday 是一个月中的第几天(1-31)
- int hour = timeinfo->tm_hour; // tm_hour 是小时(0-23)
- int minute = timeinfo->tm_min; // tm_min 是分钟(0-59)
- int second = timeinfo->tm_sec; // tm_sec 是秒(0-59)
- // 打印时间信息
- char time[30] = {};
- sprintf(time,"%d-%02d-%02d %02d:%02d:%02d",
- year, month, day, hour, minute, second);
- rc = sqlite3_bind_text(stmt, 2, time, -1, SQLITE_STATIC);
- rc = sqlite3_bind_int(stmt, 3, dev_addr);
- rc = sqlite3_bind_text(stmt, 4, device_name.c_str(), -1, SQLITE_STATIC);
- rc = sqlite3_bind_int(stmt, 5, 0);
- rc = sqlite3_bind_text(stmt, 6, "run", -1, SQLITE_STATIC);
- if (rc != SQLITE_OK) {
- fprintf(stderr, "Failed to bind parameters: %s\n", sqlite3_errmsg(db));
- sqlite3_finalize(stmt);
- sqlite3_close(db);
- return;
- }
- rc = sqlite3_step(stmt);
- if (rc != SQLITE_DONE) {
- fprintf(stderr, "Failed to execute insert: %s\n", sqlite3_errmsg(db));
- }
- // 清理语句
- sqlite3_finalize(stmt);
- // 查询数据
- // const char *sql_select_data = "SELECT * FROM history_data;";
- // rc = sqlite3_exec(db, sql_select_data, sql_callback, 0, &zErrMsg);
- // if (rc != SQLITE_OK) {
- // fprintf(stderr, "SQL error: %s\n", zErrMsg);
- // sqlite3_free(zErrMsg);
- // }
- // 关闭数据库连接
- sqlite3_close(db);
- }
- }
|