#!/usr/bin/env python3
# pylint: disable=missing-type-doc
"""Modbus Message Generator.
The following is an example of how to generate example encoded messages
for the supplied modbus format:
* tcp - `./generate-messages.py -f tcp -m rx -b`
* ascii - `./generate-messages.py -f ascii -m tx -a`
* rtu - `./generate-messages.py -f rtu -m rx -b`
* binary - `./generate-messages.py -f binary -m tx -b`
"""
import codecs as c
import logging
from optparse import OptionParser # pylint: disable=deprecated-module
from pymodbus.bit_read_message import (
ReadCoilsRequest,
ReadCoilsResponse,
ReadDiscreteInputsRequest,
ReadDiscreteInputsResponse,
)
from pymodbus.bit_write_message import (
WriteMultipleCoilsRequest,
WriteMultipleCoilsResponse,
WriteSingleCoilRequest,
WriteSingleCoilResponse,
)
import pymodbus.diag_message as modbus_diag
from pymodbus.file_message import (
ReadFifoQueueRequest,
ReadFifoQueueResponse,
ReadFileRecordRequest,
ReadFileRecordResponse,
WriteFileRecordRequest,
WriteFileRecordResponse,
)
from pymodbus.mei_message import (
ReadDeviceInformationRequest,
ReadDeviceInformationResponse,
)
from pymodbus.other_message import (
GetCommEventCounterRequest,
GetCommEventCounterResponse,
GetCommEventLogRequest,
GetCommEventLogResponse,
ReadExceptionStatusRequest,
ReadExceptionStatusResponse,
ReportSlaveIdRequest,
ReportSlaveIdResponse,
)
from pymodbus.register_read_message import (
ReadHoldingRegistersRequest,
ReadHoldingRegistersResponse,
ReadInputRegistersRequest,
ReadInputRegistersResponse,
ReadWriteMultipleRegistersRequest,
ReadWriteMultipleRegistersResponse,
)
from pymodbus.register_write_message import (
MaskWriteRegisterRequest,
MaskWriteRegisterResponse,
WriteMultipleRegistersRequest,
WriteMultipleRegistersResponse,
WriteSingleRegisterRequest,
WriteSingleRegisterResponse,
)
# -------------------------------------------------------------------------- #
# import all the available framers
# -------------------------------------------------------------------------- #
from pymodbus.transaction import (
ModbusAsciiFramer,
ModbusBinaryFramer,
ModbusRtuFramer,
ModbusSocketFramer,
)
# -------------------------------------------------------------------------- #
# initialize logging
# -------------------------------------------------------------------------- #
modbus_log = logging.getLogger("pymodbus")
# -------------------------------------------------------------------------- #
# enumerate all request messages
# -------------------------------------------------------------------------- #
_request_messages = [
ReadHoldingRegistersRequest,
ReadDiscreteInputsRequest,
ReadInputRegistersRequest,
ReadCoilsRequest,
WriteMultipleCoilsRequest,
WriteMultipleRegistersRequest,
WriteSingleRegisterRequest,
WriteSingleCoilRequest,
ReadWriteMultipleRegistersRequest,
ReadExceptionStatusRequest,
GetCommEventCounterRequest,
GetCommEventLogRequest,
ReportSlaveIdRequest,
ReadFileRecordRequest,
WriteFileRecordRequest,
MaskWriteRegisterRequest,
ReadFifoQueueRequest,
ReadDeviceInformationRequest,
modbus_diag.ReturnQueryDataRequest,
modbus_diag.RestartCommunicationsOptionRequest,
modbus_diag.ReturnDiagnosticRegisterRequest,
modbus_diag.ChangeAsciiInputDelimiterRequest,
modbus_diag.ForceListenOnlyModeRequest,
modbus_diag.ClearCountersRequest,
modbus_diag.ReturnBusMessageCountRequest,
modbus_diag.ReturnBusCommunicationErrorCountRequest,
modbus_diag.ReturnBusExceptionErrorCountRequest,
modbus_diag.ReturnSlaveMessageCountRequest,
modbus_diag.ReturnSlaveNoResponseCountRequest,
modbus_diag.ReturnSlaveNAKCountRequest,
modbus_diag.ReturnSlaveBusyCountRequest,
modbus_diag.ReturnSlaveBusCharacterOverrunCountRequest,
modbus_diag.ReturnIopOverrunCountRequest,
modbus_diag.ClearOverrunCountRequest,
modbus_diag.GetClearModbusPlusRequest,
]
# -------------------------------------------------------------------------- #
# enumerate all response messages
# -------------------------------------------------------------------------- #
_response_messages = [
ReadHoldingRegistersResponse,
ReadDiscreteInputsResponse,
ReadInputRegistersResponse,
ReadCoilsResponse,
WriteMultipleCoilsResponse,
WriteMultipleRegistersResponse,
WriteSingleRegisterResponse,
WriteSingleCoilResponse,
ReadWriteMultipleRegistersResponse,
ReadExceptionStatusResponse,
GetCommEventCounterResponse,
GetCommEventLogResponse,
ReportSlaveIdResponse,
ReadFileRecordResponse,
WriteFileRecordResponse,
MaskWriteRegisterResponse,
ReadFifoQueueResponse,
ReadDeviceInformationResponse,
modbus_diag.ReturnQueryDataResponse,
modbus_diag.RestartCommunicationsOptionResponse,
modbus_diag.ReturnDiagnosticRegisterResponse,
modbus_diag.ChangeAsciiInputDelimiterResponse,
modbus_diag.ForceListenOnlyModeResponse,
modbus_diag.ClearCountersResponse,
modbus_diag.ReturnBusMessageCountResponse,
modbus_diag.ReturnBusCommunicationErrorCountResponse,
modbus_diag.ReturnBusExceptionErrorCountResponse,
modbus_diag.ReturnSlaveMessageCountResponse,
modbus_diag.ReturnSlaveNoReponseCountResponse,
modbus_diag.ReturnSlaveNAKCountResponse,
modbus_diag.ReturnSlaveBusyCountResponse,
modbus_diag.ReturnSlaveBusCharacterOverrunCountResponse,
modbus_diag.ReturnIopOverrunCountResponse,
modbus_diag.ClearOverrunCountResponse,
modbus_diag.GetClearModbusPlusResponse,
]
# -------------------------------------------------------------------------- #
# build an arguments singleton
# -------------------------------------------------------------------------- #
# Feel free to override any values here to generate a specific message
# in question. It should be noted that many argument names are reused
# between different messages, and a number of messages are simply using
# their default values.
# -------------------------------------------------------------------------- #
_arguments = {
"address": 0x12,
"count": 0x08,
"value": 0x01,
"values": [0x01] * 8,
"read_address": 0x12,
"read_count": 0x08,
"write_address": 0x12,
"write_registers": [0x01] * 8,
"transaction": 0x01,
"protocol": 0x00,
"unit": 0xFF,
}
# -------------------------------------------------------------------------- #
# generate all the requested messages
# -------------------------------------------------------------------------- #
def generate_messages(framer, options):
"""Parse the command line options
:param framer: The framer to encode the messages with
:param options: The message options to use
"""
if options.messages == "tx":
messages = _request_messages
else:
messages = _response_messages
for message in messages:
message = message(**_arguments)
print(
"%-44s = " # pylint: disable=consider-using-f-string
% message.__class__.__name__
)
packet = framer.buildPacket(message)
if not options.ascii:
packet = c.encode(packet, "hex_codec").decode("utf-8")
print(f"{packet}\n") # because ascii ends with a \r\n
# -------------------------------------------------------------------------- #
# initialize our program settings
# -------------------------------------------------------------------------- #
def get_options():
"""Parse the command line options
:returns: The options manager
"""
parser = OptionParser()
parser.add_option(
"-f",
"--framer",
help="The type of framer to use (tcp, rtu, binary, ascii)",
dest="framer",
default="tcp",
)
parser.add_option(
"-D",
"--debug",
help="Enable debug tracing",
action="store_true",
dest="debug",
default=False,
)
parser.add_option(
"-a",
"--ascii",
help="The indicates that the message is ascii",
action="store_true",
dest="ascii",
default=True,
)
parser.add_option(
"-b",
"--binary",
help="The indicates that the message is binary",
action="store_false",
dest="ascii",
)
parser.add_option(
"-m",
"--messages",
help="The messages to encode (rx, tx)",
dest="messages",
default="rx",
)
(opt, _) = parser.parse_args()
return opt
def main():
"""Run main runner function"""
option = get_options()
if option.debug:
try:
modbus_log.setLevel(logging.DEBUG)
except Exception: # pylint: disable=broad-except
print("Logging is not supported on this system")
framer = {
"tcp": ModbusSocketFramer,
"rtu": ModbusRtuFramer,
"binary": ModbusBinaryFramer,
"ascii": ModbusAsciiFramer,
}.get(option.framer, ModbusSocketFramer)(None)
generate_messages(framer, option)
if __name__ == "__main__":
main()