python 实现Modebus 通信(pyModbusTCP )
尽管python 并不实现实时控制应用程序,但是它编写程序实在是太方便了。至少在程序快速成型和验证过程中可以使用Python 来实现,另一方面,Python 强大的数值计算和图形显示能力也能够实现modbus 设备的远程监控,数据采集,甚至AI 训练。注意的是 auto_open和auto_close两个标记,它指明每次请求modbusTCP 时是否会自动打开和关闭TCP/IP 连接。如果auto
尽管python 并不适合实时控制,但是它编写程序实在是太方便了。至少在程序快速成型和验证过程中可以使用Python 来实现,另一方面,Python 强大的数值计算和图形显示能力也能够实现modbus 设备的远程监控,数据采集,甚至AI 训练。Python 在算法开法,仿真等场景是高效率的编程工具。
这里我们介绍Python 实现ModbusTCP的例子:
客户端(Client)
from pyModbusTCP.client import ModbusClient # Modbus TCP Client
import time
from pyModbusTCP import utils
import numpy as np
import matplotlib.pyplot as plt
# TCP auto connect on modbus request, close after it
ModbusBMS = ModbusClient(host="localhost", port=502, unit_id=1, auto_open=True, auto_close=False)
#time.sleep(5)
vaw=1
x = np.arange(0,1000,1,dtype=np.int16)
y=np.arange(-10,10,0.02,dtype=np.float32)
if __name__ == '__main__':
while True:
reg_l=ModbusBMS.read_input_registers(0,2)
val=utils.word_list_to_long(reg_l)
print(utils.decode_ieee(val[0],False))
y=np.append(y,utils.decode_ieee(val[0],False))
y=np.delete(y, 0, axis=0)
plt.clf()
plt.plot(x, y, ls="-", lw=2, label="plot figure")
plt.legend()
plt.show()
plt.pause(0.01)
注意的是 auto_open和auto_close两个标记,它指明每次请求modbusTCP 时是否会自动打开和关闭TCP/IP 连接。如果auto_close=True 表示每次都会自动关闭连接,经测试,它会引起2秒中的延时。
上述例子中使用plt 显示实时数据,在这里是一sin 曲线。是np.append 和np.delete 维持一个实时数据队列。
ModbusBMS具有下列几种读取modbus 的方法:
- ModbusBMS.read_holding_registers
- ModbusBMS.read_input_registers
- ModbusBMS.read_coils
- ModbusBMS.read_discrete_inputs
- ModbusBMS.write_single_register
- ModbusBMS.write_single_coil
- ModbusBMS.write_multiple_registers
- ModbusBMS.write_multiple_coils
服务器端(Server)
import argparse
from pyModbusTCP.server import ModbusServer, DataBank
from pyModbusTCP import utils
from datetime import datetime
import numpy as np
Fs = 8000
f = 50
x=0
coil_state=True
class MyDataBank(DataBank):
"""A custom ModbusServerDataBank for override get_holding_registers method."""
def __init__(self):
# turn off allocation of memory for standard modbus object types
# only "holding registers" space will be replaced by dynamic build values.
super().__init__(virtual_mode=True)
def get_coils(self, address, number=1, srv_info=None):
global coil_state
coil_state=not coil_state
return coil_state
def get_holding_registers(self, address, number=1, srv_info=None):
"""Get virtual holding registers."""
# populate virtual registers dict with current datetime values
now = datetime.now()
return now.second
def get_input_registers(self, address, number=1, srv_info=None):
global x
wave=np.sin(2 * np.pi * f * x / Fs)*10
x=x+1
b32_l=[utils.encode_ieee(wave,False)]
b16_l = utils.long_list_to_word(b32_l)
print(b16_l)
return b16_l
if __name__ == '__main__':
# parse args
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--host', type=str, default='localhost', help='Host (default: localhost)')
parser.add_argument('-p', '--port', type=int, default=502, help='TCP port (default: 502)')
args = parser.parse_args()
# init modbus server and start it
server = ModbusServer(host=args.host, port=args.port, data_bank=MyDataBank())
server.start()
上面的程序都经过测试。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)