#!/usr/bin/env python3
"""Pymodbus Synchronous Client extended calls rxample.
This example uses client_sync.py to handle connection, and have the same options.
The example shows how to use the synchronous modbus client
implementation from pymodbus to perform the extended portions of the
modbus protocol.
If you are performing a request that is not available in the client
mixin, you have to perform the request like this instead:
from pymodbus.diag_message import ClearCountersRequest
from pymodbus.diag_message import ClearCountersResponse
request = ClearCountersRequest()
response = client.execute(request)
if isinstance(response, ClearCountersResponse):
... do something with the response
The corresponding server must be started before e.g. as:
python3 server_sync.py
"""
import logging
from examples.client_sync import run_sync_client, setup_sync_client
from pymodbus.diag_message import (
ChangeAsciiInputDelimiterRequest,
ClearCountersRequest,
ClearOverrunCountRequest,
ForceListenOnlyModeRequest,
GetClearModbusPlusRequest,
RestartCommunicationsOptionRequest,
ReturnBusCommunicationErrorCountRequest,
ReturnBusExceptionErrorCountRequest,
ReturnDiagnosticRegisterRequest,
ReturnIopOverrunCountRequest,
ReturnQueryDataRequest,
ReturnSlaveBusCharacterOverrunCountRequest,
ReturnSlaveBusyCountRequest,
ReturnSlaveMessageCountRequest,
ReturnSlaveNAKCountRequest,
ReturnSlaveNoResponseCountRequest,
)
from pymodbus.mei_message import ReadDeviceInformationRequest
from pymodbus.other_message import (
GetCommEventCounterRequest,
GetCommEventLogRequest,
ReadExceptionStatusRequest,
ReportSlaveIdRequest,
)
UNIT = 0x01
def _execute_information_requests(client):
"""Execute extended information requests."""
_logger.info("### Running ReadDeviceInformationRequest")
rr = client.execute(ReadDeviceInformationRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
_logger.info("Running ReportSlaveIdRequest")
rr = client.execute(ReportSlaveIdRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
assert rr.status # test that the status is ok
_logger.info("Running ReadExceptionStatusRequest")
rr = client.execute(ReadExceptionStatusRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
assert not rr.status # test the status code
_logger.info("Running GetCommEventCounterRequest")
rr = client.execute(GetCommEventCounterRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
assert rr.status # test the status code
assert not rr.count # test the count returned
_logger.info("Running GetCommEventLogRequest")
rr = client.execute(GetCommEventLogRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
assert rr.status # test the status code
assert not rr.event_count # test the number of events
assert not rr.message_count # test the number of messages
assert not rr.events # test the number of events
def _execute_diagnostic_requests(client):
"""Execute extended diagnostic requests."""
_logger.info("### Running ReturnQueryDataRequest")
rr = client.execute(ReturnQueryDataRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
assert not rr.message[0] # test the resulting message
_logger.info("Running RestartCommunicationsOptionRequest")
rr = client.execute(RestartCommunicationsOptionRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
assert not rr.message[0] # test the resulting message
_logger.info("Running ReturnDiagnosticRegisterRequest")
rr = client.execute(ReturnDiagnosticRegisterRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
_logger.info("Running ChangeAsciiInputDelimiterRequest")
rr = client.execute(ChangeAsciiInputDelimiterRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
_logger.info("Running ForceListenOnlyModeRequest")
_logger.info("NOT WORKING")
_logger.info(str(ForceListenOnlyModeRequest))
# rr = client.execute(
# ForceListenOnlyModeRequest(unit=UNIT)
# ) # does not send a response
# assert rr and not rr.isError() # test that calls was OK
_logger.info("Running ClearCountersRequest")
rr = client.execute(ClearCountersRequest())
assert rr and not rr.isError() # test that calls was OK
_logger.info("Running ReturnBusCommunicationErrorCountRequest")
rr = client.execute(ReturnBusCommunicationErrorCountRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
_logger.info("Running ReturnBusExceptionErrorCountRequest")
rr = client.execute(ReturnBusExceptionErrorCountRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
_logger.info("Running ReturnSlaveMessageCountRequest")
rr = client.execute(ReturnSlaveMessageCountRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
_logger.info("Running ReturnSlaveNoResponseCountRequest")
rr = client.execute(ReturnSlaveNoResponseCountRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
_logger.info("Running ReturnSlaveNAKCountRequest")
rr = client.execute(ReturnSlaveNAKCountRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
_logger.info("Running ReturnSlaveBusyCountRequest")
rr = client.execute(ReturnSlaveBusyCountRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
_logger.info("Running ReturnSlaveBusCharacterOverrunCountRequest")
rr = client.execute(ReturnSlaveBusCharacterOverrunCountRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
_logger.info("Running ReturnIopOverrunCountRequest")
rr = client.execute(ReturnIopOverrunCountRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
_logger.info("Running ClearOverrunCountRequest")
rr = client.execute(ClearOverrunCountRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
_logger.info("Running GetClearModbusPlusRequest")
rr = client.execute(GetClearModbusPlusRequest(unit=UNIT))
assert rr and not rr.isError() # test that calls was OK
def run_sync_ext_calls(client):
"""Demonstrate basic read/write calls."""
_execute_information_requests(client)
_execute_diagnostic_requests(client)
# --------------------------------------------------------------------------- #
# Extra code, to allow commandline parameters instead of changing the code
# --------------------------------------------------------------------------- #
FORMAT = "%(asctime)-15s %(levelname)-8s %(module)-15s:%(lineno)-8s %(message)s"
logging.basicConfig(format=FORMAT)
_logger = logging.getLogger()
if __name__ == "__main__":
testclient = setup_sync_client()
run_sync_client(testclient, modbus_calls=run_sync_ext_calls)