基于TCP连接的ModBUS RTU实现

引言

标准的Modbus协议已经有很多成熟的库,偶尔项目上遇到这样的需求,记录一下以备后续使用。

功能说明

自动连接多个modbus从服务器,并定时读取数据保存到数据库中。

具体代码

import socket
import mysql.connector
import time
import struct

# 数据库配置
DB_CONFIG = {
    'user': 'root',
    'password': '数据库密码',
    'host': 'localhost',
    'database': 'modbus_data'
}

# Modbus服务器配置
SERVERS = [
    {'id': 'A', 'ip': '192.168.1.111', 'port': 502},
    {'id': 'B', 'ip': '192.168.1.112', 'port': 502},
    {'id': 'C', 'ip': '192.168.1.113', 'port': 502},
]

# 寄存器地址
REGISTER_ADDRESSES = [0x0, 0x1, 0x2, 0x3, 0x46, 0x47, 0x4A, 0x4B, 0x4F, 0x50, 0x53, 0x54, 0x55]

# 创建数据库表
def create_table(cursor):
    cursor.execute("""
    CREATE TABLE IF NOT EXISTS modbus_records (
        server_id VARCHAR(1),
        address INT,
        value FLOAT,
        PRIMARY KEY (server_id, address)
    )
    """)

# 计算CRC
def calculate_crc(data):
    crc = 0xFFFF
    for pos in data:
        crc ^= pos
        for _ in range(8):
            if (crc & 0x0001) != 0:
                crc >>= 1
                crc ^= 0xA001
            else:
                crc >>= 1
    return struct.pack('<H', crc)

# 创建RTU数据帧
def create_rtu_frame(slave_id, function_code, start_address, quantity):
    frame = bytes([slave_id, function_code]) + struct.pack('>HH', start_address, quantity)
    crc = calculate_crc(frame)
    return frame + crc

# 发送请求并接收响应
def send_request(sock, server, cursor):
    for address in REGISTER_ADDRESSES:
        rtu_frame = create_rtu_frame(int(server['id'], 36), 4, address, 1)
        sock.sendall(rtu_frame)

        # 接收响应
        response = sock.recv(1024)
        if len(response) >= 5:
            # 提取值
            byte_count = response[2]
            value = struct.unpack('>f', response[3:3 + byte_count])[0]

            # 插入或更新数据库
            cursor.execute("""
            INSERT INTO modbus_records (server_id, address, value)
            VALUES (%s, %s, %s)
            ON DUPLICATE KEY UPDATE value = %s
            """, (server['id'], address, value, value))
            print(f"Server: {server['id']}, Address: {address}, Value: {value}")
        else:
            print(f"Invalid response from server {server['id']}")

# 发送Modbus写指令
def send_write_request(sock, server, address, value):
    if address == 0x0001:
        # 布尔类型,值为0或1
        value = 1 if value else 0
        rtu_frame = create_rtu_frame(int(server['id'], 36), 6, address, value)
        sock.sendall(rtu_frame)
        print(f"Sent boolean value {value} to server {server['id']} at address {address}")

    elif address == 0x0002:
        # 浮点类型
        rtu_frame = create_rtu_frame(int(server['id'], 36), 6, address, struct.unpack('>H', struct.pack('>f', value)))
        sock.sendall(rtu_frame)
        print(f"Sent float value {value} to server {server['id']} at address {address}")

# 主循环
def main():
    # 连接数据库
    db = mysql.connector.connect(**DB_CONFIG)
    cursor = db.cursor()
    create_table(cursor)

    for server in SERVERS:
        while True:
            try:
                with socket.create_connection((server['ip'], server['port']), timeout=5) as sock:
                    print(f"Connected to server {server['id']} at {server['ip']}")

                    while True:
                        send_request(sock, server, cursor)

                        # 示例:发送布尔值和浮点数
                        send_write_request(sock, server, 0x0001, True)  # 发送布尔值True
                        send_write_request(sock, server, 0x0002, 123.45)  # 发送浮点值123.45

                        db.commit()  # 提交所有更改
                        time.sleep(1)  # 每秒循环一次

            except (socket.error, mysql.connector.Error) as e:
                print(f"Error communicating with server {server['id']}: {e}")
                time.sleep(5)  # 等待5秒后重试连接

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("程序终止")
    finally:
        cursor.close()
        db.close()
基于TCP连接的ModBUS RTU实现

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

Scroll to top