引言
标准的Modbus协议已经有很多成熟的库,偶尔项目上遇到这样的需求,记录一下以备后续使用。
功能说明
自动连接多个modbus从服务器,并定时读取数据保存到数据库中。
具体代码
import socket
import mysql.connector
import time
import struct
# 数据库配置
DB_CONFIG = {
'user': 'root',
'password': '数据库密码',
'host': 'localhost',
'database': 'modbus_data'
}
# Modbus服务器配置
SERVERS = [
{'id': 'A', 'ip': '192.168.1.111', 'port': 502},
{'id': 'B', 'ip': '192.168.1.112', 'port': 502},
{'id': 'C', 'ip': '192.168.1.113', 'port': 502},
]
# 寄存器地址
REGISTER_ADDRESSES = [0x0, 0x1, 0x2, 0x3, 0x46, 0x47, 0x4A, 0x4B, 0x4F, 0x50, 0x53, 0x54, 0x55]
# 创建数据库表
def create_table(cursor):
cursor.execute("""
CREATE TABLE IF NOT EXISTS modbus_records (
server_id VARCHAR(1),
address INT,
value FLOAT,
PRIMARY KEY (server_id, address)
)
""")
# 计算CRC
def calculate_crc(data):
crc = 0xFFFF
for pos in data:
crc ^= pos
for _ in range(8):
if (crc & 0x0001) != 0:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return struct.pack('<H', crc)
# 创建RTU数据帧
def create_rtu_frame(slave_id, function_code, start_address, quantity):
frame = bytes([slave_id, function_code]) + struct.pack('>HH', start_address, quantity)
crc = calculate_crc(frame)
return frame + crc
# 发送请求并接收响应
def send_request(sock, server, cursor):
for address in REGISTER_ADDRESSES:
rtu_frame = create_rtu_frame(int(server['id'], 36), 4, address, 1)
sock.sendall(rtu_frame)
# 接收响应
response = sock.recv(1024)
if len(response) >= 5:
# 提取值
byte_count = response[2]
value = struct.unpack('>f', response[3:3 + byte_count])[0]
# 插入或更新数据库
cursor.execute("""
INSERT INTO modbus_records (server_id, address, value)
VALUES (%s, %s, %s)
ON DUPLICATE KEY UPDATE value = %s
""", (server['id'], address, value, value))
print(f"Server: {server['id']}, Address: {address}, Value: {value}")
else:
print(f"Invalid response from server {server['id']}")
# 发送Modbus写指令
def send_write_request(sock, server, address, value):
if address == 0x0001:
# 布尔类型,值为0或1
value = 1 if value else 0
rtu_frame = create_rtu_frame(int(server['id'], 36), 6, address, value)
sock.sendall(rtu_frame)
print(f"Sent boolean value {value} to server {server['id']} at address {address}")
elif address == 0x0002:
# 浮点类型
rtu_frame = create_rtu_frame(int(server['id'], 36), 6, address, struct.unpack('>H', struct.pack('>f', value)))
sock.sendall(rtu_frame)
print(f"Sent float value {value} to server {server['id']} at address {address}")
# 主循环
def main():
# 连接数据库
db = mysql.connector.connect(**DB_CONFIG)
cursor = db.cursor()
create_table(cursor)
for server in SERVERS:
while True:
try:
with socket.create_connection((server['ip'], server['port']), timeout=5) as sock:
print(f"Connected to server {server['id']} at {server['ip']}")
while True:
send_request(sock, server, cursor)
# 示例:发送布尔值和浮点数
send_write_request(sock, server, 0x0001, True) # 发送布尔值True
send_write_request(sock, server, 0x0002, 123.45) # 发送浮点值123.45
db.commit() # 提交所有更改
time.sleep(1) # 每秒循环一次
except (socket.error, mysql.connector.Error) as e:
print(f"Error communicating with server {server['id']}: {e}")
time.sleep(5) # 等待5秒后重试连接
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("程序终止")
finally:
cursor.close()
db.close()
基于TCP连接的ModBUS RTU实现